Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 17e28e35 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

Improve blocking queue

Instead of take_for(), add a helper function to wait until the queue is
non-empty, so we don't require T to have a empty constructor.

Test: bluetooth_test_gd
Change-Id: Ia30a8149646fc75ecf19778888636c4129dbf100
parent cf9464cf
Loading
Loading
Loading
Loading
+2 −4
Original line number Diff line number Diff line
@@ -45,16 +45,14 @@ class BlockingQueue {
    return data;
  };

  bool take_for(std::chrono::milliseconds time, T& data) {
  // Returns true if take() will not block within a time period
  bool wait_to_take(std::chrono::milliseconds time) {
    std::unique_lock<std::mutex> lock(mutex_);
    while (queue_.empty()) {
      if (not_empty_.wait_for(lock, time) == std::cv_status::timeout) {
        return false;
      }
    }
    data = queue_.front();
    queue_.pop();

    return true;
  }

+19 −0
Original line number Diff line number Diff line
@@ -86,6 +86,25 @@ TEST_F(BlockingQueueTest, wait_for_non_empty) {
  EXPECT_TRUE(queue_.empty());
}

TEST_F(BlockingQueueTest, wait_to_take_fail) {
  EXPECT_FALSE(queue_.wait_to_take(std::chrono::milliseconds(3)));
}

TEST_F(BlockingQueueTest, wait_to_take_after_non_empty) {
  int data = 1;
  queue_.push(data);
  EXPECT_TRUE(queue_.wait_to_take(std::chrono::milliseconds(3)));
  queue_.clear();
}

TEST_F(BlockingQueueTest, wait_to_take_before_non_empty) {
  int data = 1;
  std::thread waiter_thread([this] { EXPECT_TRUE(queue_.wait_to_take(std::chrono::milliseconds(3))); });
  queue_.push(data);
  waiter_thread.join();
  queue_.clear();
}

TEST_F(BlockingQueueTest, wait_for_non_empty_batch) {
  std::thread waiter_thread([this] {
    for (int data = 0; data < 10; data++) {
+2 −2
Original line number Diff line number Diff line
@@ -64,10 +64,10 @@ class GrpcEventStream {

    if (fetch_mode == ::bluetooth::facade::AT_LEAST_ONE) {
      RES response;
      EVENT event;
      if (!event_queue_.take_for(std::chrono::milliseconds(timeout_ms), event)) {
      if (!event_queue_.wait_to_take(std::chrono::milliseconds(timeout_ms))) {
        return ::grpc::Status(::grpc::StatusCode::DEADLINE_EXCEEDED, "timeout exceeded");
      }
      EVENT event = event_queue_.take();
      callback_->OnWriteResponse(&response, event);
      writer->Write(response);
    }