Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 51530fc0 authored by Myles Watson's avatar Myles Watson
Browse files

Queue: Store callbacks in the runnable

Test: bluetooth_test_gd \
  --gtest_filter=QueueTest.pass_smart_pointer_and_unregister
Change-Id: Ib7dc12d8e767125e51d83810ade77c5187a4c174
parent 9d19ea64
Loading
Loading
Loading
Loading
+7 −13
Original line number Diff line number Diff line
@@ -16,24 +16,22 @@

template <typename T>
Queue<T>::Queue(size_t capacity)
    : enqueue_callback_(nullptr), dequeue_callback_(nullptr), enqueue_(capacity), dequeue_(0){};
    : enqueue_(capacity), dequeue_(0){};

template <typename T>
Queue<T>::~Queue() {
  ASSERT(enqueue_callback_ == nullptr);
  ASSERT(dequeue_callback_ == nullptr);
  ASSERT(enqueue_.handler_ == nullptr);
  ASSERT(dequeue_.handler_ == nullptr);
};

template <typename T>
void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) {
  std::lock_guard<std::mutex> lock(mutex_);
  ASSERT(enqueue_.handler_ == nullptr);
  ASSERT(enqueue_callback_ == nullptr);
  ASSERT(enqueue_.reactable_ == nullptr);
  enqueue_.handler_ = handler;
  enqueue_callback_ = callback;
  enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register(
      enqueue_.reactive_semaphore_.GetFd(), [this] { EnqueueCallbackInternal(); }, nullptr);
      enqueue_.reactive_semaphore_.GetFd(), [this, callback] { EnqueueCallbackInternal(callback); }, nullptr);
}

template <typename T>
@@ -42,7 +40,6 @@ void Queue<T>::UnregisterEnqueue() {
  ASSERT(enqueue_.reactable_ != nullptr);
  enqueue_.handler_->thread_->GetReactor()->Unregister(enqueue_.reactable_);
  enqueue_.reactable_ = nullptr;
  enqueue_callback_ = nullptr;
  enqueue_.handler_ = nullptr;
}

@@ -50,12 +47,10 @@ template <typename T>
void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) {
  std::lock_guard<std::mutex> lock(mutex_);
  ASSERT(dequeue_.handler_ == nullptr);
  ASSERT(dequeue_callback_ == nullptr);
  ASSERT(dequeue_.reactable_ == nullptr);
  dequeue_.handler_ = handler;
  dequeue_callback_ = callback;
  dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register(dequeue_.reactive_semaphore_.GetFd(),
                                                                           [this] { dequeue_callback_(); }, nullptr);
                                                                           [callback] { callback(); }, nullptr);
}

template <typename T>
@@ -64,7 +59,6 @@ void Queue<T>::UnregisterDequeue() {
  ASSERT(dequeue_.reactable_ != nullptr);
  dequeue_.handler_->thread_->GetReactor()->Unregister(dequeue_.reactable_);
  dequeue_.reactable_ = nullptr;
  dequeue_callback_ = nullptr;
  dequeue_.handler_ = nullptr;
}

@@ -87,11 +81,11 @@ std::unique_ptr<T> Queue<T>::TryDequeue() {
}

template <typename T>
void Queue<T>::EnqueueCallbackInternal() {
void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) {
  enqueue_.reactive_semaphore_.Decrease();

  {
    std::unique_ptr<T> data = enqueue_callback_();
    std::unique_ptr<T> data = callback();
    ASSERT(data != nullptr);
    std::lock_guard<std::mutex> lock(mutex_);
    queue_.push(std::move(data));
+36 −19
Original line number Diff line number Diff line
@@ -62,26 +62,22 @@ class TestEnqueueEnd {
  explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
      : count(0), handler_(handler), queue_(queue), delay_(0) {}

  ~TestEnqueueEnd() {
    LOG_INFO("~TestEnqueueEnd");  // Debug log, will be removed
  }
  ~TestEnqueueEnd() {}

  void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
    LOG_INFO("RegisterEnqueue");  // Debug log, will be removed
    promise_map_ = promise_map;
    handler_->Post([this] { queue_->RegisterEnqueue(handler_, [this] { return EnqueueCallbackForTest(); }); });
  }

  void UnregisterEnqueue() {
    LOG_INFO("UnregisterEnqueue");  // Debug log, will be removed
    std::promise<void> promise;
    auto feature = promise.get_future();
    auto future = promise.get_future();

    handler_->Post([this, &promise] {
      queue_->UnregisterEnqueue();
      promise.set_value();
    });
    feature.wait();
    future.wait();
  }

  std::unique_ptr<std::string> EnqueueCallbackForTest() {
@@ -93,14 +89,12 @@ class TestEnqueueEnd {
    std::unique_ptr<std::string> data = std::move(buffer_.front());
    buffer_.pop();
    std::string copy = *data;
    LOG_INFO(": pop %s, size %d", copy.c_str(), (int)buffer_.size());  // Debug log, will be removed
    if (buffer_.empty()) {
      queue_->UnregisterEnqueue();
    }

    auto pair = promise_map_->find(buffer_.size());
    if (pair != promise_map_->end()) {
      LOG_INFO("promises : %d", pair->first);  // Debug log, will be removed
      pair->second.set_value(pair->first);
      promise_map_->erase(pair->first);
    }
@@ -126,26 +120,22 @@ class TestDequeueEnd {
  explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
      : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}

  ~TestDequeueEnd() {
    LOG_INFO("~TestDequeueEnd");  // Debug log, will be removed
  }
  ~TestDequeueEnd() {}

  void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
    LOG_INFO("RegisterDequeue");  // Debug log, will be removed
    promise_map_ = promise_map;
    handler_->Post([this] { queue_->RegisterDequeue(handler_, [this] { DequeueCallbackForTest(); }); });
  }

  void UnregisterDequeue() {
    LOG_INFO("UnregisterDequeue");  // Debug log, will be removed
    std::promise<void> promise;
    auto feature = promise.get_future();
    auto future = promise.get_future();

    handler_->Post([this, &promise] {
      queue_->UnregisterDequeue();
      promise.set_value();
    });
    feature.wait();
    future.wait();
  }

  void DequeueCallbackForTest() {
@@ -155,9 +145,7 @@ class TestDequeueEnd {

    count++;
    std::unique_ptr<std::string> data = queue_->TryDequeue();
    std::string copy = *data;  // Debug log, will be removed
    buffer_.push(std::move(data));
    LOG_INFO("push %s, size %d", copy.c_str(), (int)buffer_.size());  // Debug log, will be removed

    if (buffer_.size() == capacity_) {
      queue_->UnregisterDequeue();
@@ -165,7 +153,6 @@ class TestDequeueEnd {

    auto pair = promise_map_->find(buffer_.size());
    if (pair != promise_map_->end()) {
      LOG_INFO("promises : %d", pair->first);  // Debug log, will be removed
      pair->second.set_value(pair->first);
      promise_map_->erase(pair->first);
    }
@@ -678,6 +665,36 @@ TEST_F(QueueTest, queue_becomes_non_empty_during_test) {
  EXPECT_EQ(dequeue_future.get(), kQueueSize);
}

TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
  Queue<std::string>* queue = new Queue<std::string>(kQueueSize);

  // Enqueue a string
  std::string valid = "Valid String";
  std::shared_ptr<std::string> shared = std::make_shared<std::string>(valid);
  queue->RegisterEnqueue(enqueue_handler_, [queue, shared]() {
    queue->UnregisterEnqueue();
    return std::make_unique<std::string>(*shared);
  });

  // Dequeue the string
  queue->RegisterDequeue(dequeue_handler_, [queue, valid]() {
    queue->UnregisterDequeue();
    auto answer = *queue->TryDequeue();
    ASSERT_EQ(answer, valid);
  });

  // Wait for both handlers to finish and delete the Queue
  std::promise<void> promise;
  auto future = promise.get_future();

  enqueue_handler_->Post([this, queue, &promise]() {
    dequeue_handler_->Post([queue, &promise] {
      delete queue;
      promise.set_value();
    });
  });
  future.wait();
}
}  // namespace
}  // namespace os
}  // namespace bluetooth
+1 −5
Original line number Diff line number Diff line
@@ -78,15 +78,11 @@ class Queue : public IQueueEnqueue<T>, public IQueueDequeue<T> {
  std::unique_ptr<T> TryDequeue() override;

 private:
  void EnqueueCallbackInternal();
  void EnqueueCallbackInternal(EnqueueCallback callback);
  // An internal queue that holds at most |capacity| pieces of data
  std::queue<std::unique_ptr<T>> queue_;
  // A mutex that guards data in this queue
  std::mutex mutex_;
  // Current dequeue callback
  EnqueueCallback enqueue_callback_;
  // Current enqueue callback
  DequeueCallback dequeue_callback_;

  class QueueEndpoint {
   public: