Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 79520d64 authored by Hansong Zhang's avatar Hansong Zhang Committed by android-build-merger
Browse files

OS Queue: Add Enqueue Buffer am: 37b320cd

am: 5b405751

Change-Id: Ib892ec21163312fc6d972094dd25f010ede234da
parents 51effeb6 5b405751
Loading
Loading
Loading
Loading
+79 −0
Original line number Diff line number Diff line
@@ -750,6 +750,85 @@ TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) {
TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) {
  EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue");
}

class MockIQueueEnqueue : public IQueueEnqueue<int> {
 public:
  void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override {
    EXPECT_FALSE(registered_);
    registered_ = true;
    handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue, common::Unretained(this), callback));
  }

  void handle_register_enqueue(EnqueueCallback callback) {
    if (dont_handle_register_enqueue_) {
      return;
    }
    while (registered_) {
      std::unique_ptr<int> front = callback.Run();
      queue_.push(*front);
    }
  }

  void UnregisterEnqueue() override {
    EXPECT_TRUE(registered_);
    registered_ = false;
  }

  bool dont_handle_register_enqueue_ = false;
  bool registered_ = false;
  std::queue<int> queue_;
};

class EnqueueBufferTest : public ::testing::Test {
 protected:
  void SetUp() override {
    thread_ = new Thread("test_thread", Thread::Priority::NORMAL);
    handler_ = new Handler(thread_);
  }

  void TearDown() override {
    handler_->Clear();
    delete handler_;
    delete thread_;
  }

  void SynchronizeHandler() {
    std::promise<void> promise;
    auto future = promise.get_future();
    handler_->Post(common::BindOnce([](std::promise<void> promise) { promise.set_value(); }, std::move(promise)));
    future.wait();
  }

  MockIQueueEnqueue enqueue_;
  EnqueueBuffer<int> enqueue_buffer_{&enqueue_};
  Thread* thread_;
  Handler* handler_;
};

TEST_F(EnqueueBufferTest, enqueue) {
  int num_items = 10;
  for (int i = 0; i < num_items; i++) {
    enqueue_buffer_.Enqueue(i, handler_);
  }
  SynchronizeHandler();
  for (int i = 0; i < num_items; i++) {
    ASSERT_EQ(enqueue_.queue_.front(), i);
    enqueue_.queue_.pop();
  }
  ASSERT_FALSE(enqueue_.registered_);
}

TEST_F(EnqueueBufferTest, clear) {
  enqueue_.dont_handle_register_enqueue_ = true;
  int num_items = 10;
  for (int i = 0; i < num_items; i++) {
    enqueue_buffer_.Enqueue(i, handler_);
  }
  ASSERT_TRUE(enqueue_.registered_);
  enqueue_buffer_.Clear();
  ASSERT_FALSE(enqueue_.registered_);
}

}  // namespace
}  // namespace os
}  // namespace bluetooth
+38 −0
Original line number Diff line number Diff line
@@ -101,6 +101,44 @@ class Queue : public IQueueEnqueue<T>, public IQueueDequeue<T> {
  QueueEndpoint dequeue_;
};

template <typename T>
class EnqueueBuffer {
 public:
  EnqueueBuffer(IQueueEnqueue<T>* queue) : queue_(queue) {}

  void Enqueue(T t, os::Handler* handler) {
    std::lock_guard<std::mutex> lock(mutex_);
    buffer_.push(t);
    if (buffer_.size() == 1) {
      queue_->RegisterEnqueue(handler, common::Bind(&EnqueueBuffer<T>::enqueue_callback, common::Unretained(this)));
    }
  }

  void Clear() {
    std::lock_guard<std::mutex> lock(mutex_);
    if (!buffer_.empty()) {
      queue_->UnregisterEnqueue();
      std::queue<T> empty;
      std::swap(buffer_, empty);
    }
  }

 private:
  std::unique_ptr<T> enqueue_callback() {
    std::lock_guard<std::mutex> lock(mutex_);
    auto t = std::move(buffer_.front());
    buffer_.pop();
    if (buffer_.empty()) {
      queue_->UnregisterEnqueue();
    }
    return std::make_unique<T>(t);
  }

  mutable std::mutex mutex_;
  IQueueEnqueue<T>* queue_;
  std::queue<T> buffer_;
};

#ifdef OS_LINUX_GENERIC
#include "os/linux_generic/queue.tpp"
#endif