Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit dd11ff18 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

L2cap scheduler: Implement high priority channel

Add a high priority queue for these channels in scheduler.

Introduce MockQueue to help test with BidiQueueEnd callbacks.

Test: cert/run
Test: bluetooth_test_gd
Tag: #gd-refactor
Bug: 141555841
Change-Id: I3b60002f0d2ac2fb4d3307717dce128ccb06247c
parent 3a319de5
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -54,6 +54,12 @@ class Scheduler {
   */
  virtual void OnPacketsReady(Cid cid, int number_packets) {}

  /**
   * Let the scheduler send the specified cid first.
   * Used by A2dp software encoding.
   */
  virtual void SetChannelTxPriority(Cid cid, bool high_priority) {}

  virtual ~Scheduler() = default;
};

+11 −1
Original line number Diff line number Diff line
@@ -43,10 +43,20 @@ void Fifo::OnPacketsReady(Cid cid, int number_packets) {
  if (number_packets == 0) {
    return;
  }
  next_to_dequeue_and_num_packets.push(std::make_pair(cid, number_packets));
  int priority = high_priority_cids_.count(cid) != 0;
  next_to_dequeue_and_num_packets.push(std::make_pair(cid, number_packets), priority);
  try_register_link_queue_enqueue();
}

// Invoked within L2CAP Handler context
void Fifo::SetChannelTxPriority(Cid cid, bool high_priority) {
  if (high_priority) {
    high_priority_cids_.emplace(cid);
  } else {
    high_priority_cids_.erase(cid);
  }
}

// Invoked from some external Queue Reactable context
std::unique_ptr<Fifo::UpperDequeue> Fifo::link_queue_enqueue_callback() {
  ASSERT(!next_to_dequeue_and_num_packets.empty());
+6 −1
Original line number Diff line number Diff line
@@ -19,9 +19,11 @@
#include <atomic>
#include <string>
#include <unordered_map>
#include <unordered_set>

#include "common/bidi_queue.h"
#include "common/bind.h"
#include "common/multi_priority_queue.h"
#include "l2cap/cid.h"
#include "l2cap/internal/channel_impl.h"
#include "l2cap/internal/scheduler.h"
@@ -39,12 +41,15 @@ class Fifo : public Scheduler {
  Fifo(DataPipelineManager* data_pipeline_manager, LowerQueueUpEnd* link_queue_up_end, os::Handler* handler);
  ~Fifo();
  void OnPacketsReady(Cid cid, int number_packets) override;
  void SetChannelTxPriority(Cid cid, bool high_priority) override;

 private:
  DataPipelineManager* data_pipeline_manager_;
  LowerQueueUpEnd* link_queue_up_end_;
  os::Handler* handler_;
  std::queue<std::pair<Cid, int>> next_to_dequeue_and_num_packets;
  using ChannelAndNumPackets = std::pair<Cid, int>;
  common::MultiPriorityQueue<ChannelAndNumPackets, 2> next_to_dequeue_and_num_packets;
  std::unordered_set<Cid> high_priority_cids_;
  std::atomic_bool link_queue_enqueue_registered_ = false;

  void try_register_link_queue_enqueue();
+50 −25
Original line number Diff line number Diff line
@@ -18,13 +18,12 @@

#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <future>

#include "l2cap/internal/channel_impl_mock.h"
#include "l2cap/internal/data_controller_mock.h"
#include "l2cap/internal/data_pipeline_manager_mock.h"
#include "os/handler.h"
#include "os/queue.h"
#include "os/mock_queue.h"
#include "os/thread.h"
#include "packet/raw_builder.h"

@@ -50,67 +49,93 @@ PacketView<kLittleEndian> GetPacketView(std::unique_ptr<packet::BasePacketBuilde
  return packet::PacketView<packet::kLittleEndian>(bytes);
}

void sync_handler(os::Handler* handler) {
  std::promise<void> promise;
  auto future = promise.get_future();
  handler->Post(common::BindOnce(&std::promise<void>::set_value, common::Unretained(&promise)));
  auto status = future.wait_for(std::chrono::milliseconds(300));
  EXPECT_EQ(status, std::future_status::ready);
}

class MyDataController : public testing::MockDataController {
 public:
  std::unique_ptr<BasePacketBuilder> GetNextPacket() override {
    return std::move(next_packet);
    auto next = std::move(next_packets.front());
    next_packets.pop();
    return next;
  }

  std::unique_ptr<BasePacketBuilder> next_packet;
  std::queue<std::unique_ptr<BasePacketBuilder>> next_packets;
};

class L2capSchedulerFifoTest : public ::testing::Test {
 protected:
  void SetUp() override {
    thread_ = new os::Thread("test_thread", os::Thread::Priority::NORMAL);
    user_handler_ = new os::Handler(thread_);
    queue_handler_ = new os::Handler(thread_);
    mock_data_pipeline_manager_ = new testing::MockDataPipelineManager(queue_handler_, link_queue_.GetUpEnd());
    fifo_ = new Fifo(mock_data_pipeline_manager_, link_queue_.GetUpEnd(), queue_handler_);
    mock_data_pipeline_manager_ = new testing::MockDataPipelineManager(queue_handler_, &queue_end_);
    fifo_ = new Fifo(mock_data_pipeline_manager_, &queue_end_, queue_handler_);
  }

  void TearDown() override {
    delete fifo_;
    delete mock_data_pipeline_manager_;
    queue_handler_->Clear();
    user_handler_->Clear();
    delete queue_handler_;
    delete user_handler_;
    delete thread_;
  }

  os::Thread* thread_ = nullptr;
  os::Handler* user_handler_ = nullptr;
  os::Handler* queue_handler_ = nullptr;
  common::BidiQueue<Scheduler::LowerDequeue, Scheduler::LowerEnqueue> link_queue_{10};
  os::MockIQueueDequeue<Scheduler::LowerDequeue> dequeue_;
  os::MockIQueueEnqueue<Scheduler::LowerEnqueue> enqueue_;
  common::BidiQueueEnd<Scheduler::LowerEnqueue, Scheduler::LowerDequeue> queue_end_{&enqueue_, &dequeue_};
  testing::MockDataPipelineManager* mock_data_pipeline_manager_ = nullptr;
  MyDataController data_controller_;
  MyDataController data_controller_1_;
  MyDataController data_controller_2_;
  Fifo* fifo_ = nullptr;
};

TEST_F(L2capSchedulerFifoTest, send_packet) {
  auto frame = BasicFrameBuilder::Create(1, CreateSdu({'a', 'b', 'c'}));
  data_controller_.next_packet = std::move(frame);
  EXPECT_CALL(*mock_data_pipeline_manager_, GetDataController(_)).WillOnce(Return(&data_controller_));
  data_controller_1_.next_packets.push(std::move(frame));
  EXPECT_CALL(*mock_data_pipeline_manager_, GetDataController(_)).WillOnce(Return(&data_controller_1_));
  EXPECT_CALL(*mock_data_pipeline_manager_, OnPacketSent(1));
  fifo_->OnPacketsReady(1, 1);
  sync_handler(queue_handler_);
  sync_handler(user_handler_);
  auto packet = link_queue_.GetDownEnd()->TryDequeue();
  enqueue_.run_enqueue();
  auto&& packet = enqueue_.enqueued.front();
  auto packet_view = GetPacketView(std::move(packet));
  auto basic_frame_view = BasicFrameView::Create(packet_view);
  EXPECT_TRUE(basic_frame_view.IsValid());
  EXPECT_EQ(basic_frame_view.GetChannelId(), 1);
  auto payload = basic_frame_view.GetPayload();
  EXPECT_EQ(std::string(payload.begin(), payload.end()), "abc");
  enqueue_.enqueued.pop();
}

TEST_F(L2capSchedulerFifoTest, prioritize_channel) {
  auto frame = BasicFrameBuilder::Create(1, CreateSdu({'a', 'b', 'c'}));
  data_controller_1_.next_packets.push(std::move(frame));
  frame = BasicFrameBuilder::Create(2, CreateSdu({'d', 'e', 'f'}));
  data_controller_2_.next_packets.push(std::move(frame));

  EXPECT_CALL(*mock_data_pipeline_manager_, GetDataController(1)).WillRepeatedly(Return(&data_controller_1_));
  EXPECT_CALL(*mock_data_pipeline_manager_, GetDataController(2)).WillRepeatedly(Return(&data_controller_2_));
  EXPECT_CALL(*mock_data_pipeline_manager_, OnPacketSent(1));
  EXPECT_CALL(*mock_data_pipeline_manager_, OnPacketSent(2));
  fifo_->SetChannelTxPriority(1, true);
  fifo_->OnPacketsReady(2, 1);
  fifo_->OnPacketsReady(1, 1);
  enqueue_.run_enqueue(2);
  auto packet1 = std::move(enqueue_.enqueued.front());
  auto packet_view = GetPacketView(std::move(packet1));
  auto basic_frame_view = BasicFrameView::Create(packet_view);
  EXPECT_TRUE(basic_frame_view.IsValid());
  EXPECT_EQ(basic_frame_view.GetChannelId(), 1);
  auto payload = basic_frame_view.GetPayload();
  EXPECT_EQ(std::string(payload.begin(), payload.end()), "abc");
  enqueue_.enqueued.pop();

  auto packet2 = std::move(enqueue_.enqueued.front());
  packet_view = GetPacketView(std::move(packet2));
  basic_frame_view = BasicFrameView::Create(packet_view);
  EXPECT_TRUE(basic_frame_view.IsValid());
  EXPECT_EQ(basic_frame_view.GetChannelId(), 2);
  payload = basic_frame_view.GetPayload();
  EXPECT_EQ(std::string(payload.begin(), payload.end()), "def");
  enqueue_.enqueued.pop();
}

}  // namespace
+100 −0
Original line number Diff line number Diff line
/*
 * Copyright 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <unistd.h>

#include <functional>
#include <mutex>
#include <queue>

#include "common/bidi_queue.h"
#include "common/bind.h"
#include "common/callback.h"
#include "os/handler.h"
#include "os/log.h"
#include "os/queue.h"

namespace bluetooth {
namespace os {

template <typename T>
class MockIQueueEnqueue : public IQueueEnqueue<T> {
 public:
  using EnqueueCallback = common::Callback<std::unique_ptr<T>()>;

  virtual void RegisterEnqueue(Handler* handler, EnqueueCallback callback) {
    ASSERT(registered_handler == nullptr);
    registered_handler = handler;
    registered_enqueue_callback = callback;
  }

  virtual void UnregisterEnqueue() {
    ASSERT(registered_handler != nullptr);
    registered_handler = nullptr;
    registered_enqueue_callback = {};
  }

  void run_enqueue(unsigned times = 1) {
    while (registered_handler != nullptr && times > 0) {
      times--;
      enqueued.push(registered_enqueue_callback.Run());
    }
  }

  Handler* registered_handler = nullptr;
  EnqueueCallback registered_enqueue_callback = {};
  std::queue<std::unique_ptr<T>> enqueued = {};
};

template <typename T>
class MockIQueueDequeue : public IQueueDequeue<T> {
 public:
  using DequeueCallback = common::Callback<void()>;

  virtual void RegisterDequeue(Handler* handler, DequeueCallback callback) {
    ASSERT(registered_handler == nullptr);
    registered_handler = handler;
    registered_dequeue_callback = callback;
  }

  virtual void UnregisterDequeue() {
    ASSERT(registered_handler != nullptr);
    registered_handler = nullptr;
    registered_dequeue_callback = {};
  }

  virtual std::unique_ptr<T> TryDequeue() {
    std::unique_ptr<T> front = std::move(enqueued.front());
    enqueued.pop();
    return front;
  }

  void run_dequeue(unsigned times = 1) {
    while (registered_handler != nullptr && times > 0) {
      times--;
      registered_dequeue_callback.Run();
    }
  }

  Handler* registered_handler = nullptr;
  DequeueCallback registered_dequeue_callback = {};
  std::queue<std::unique_ptr<T>> enqueued = {};
};

}  // namespace os
}  // namespace bluetooth