Loading system/gd/common/blocking_queue.h +2 −4 Original line number Diff line number Diff line Loading @@ -45,16 +45,14 @@ class BlockingQueue { return data; }; bool take_for(std::chrono::milliseconds time, T& data) { // Returns true if take() will not block within a time period bool wait_to_take(std::chrono::milliseconds time) { std::unique_lock<std::mutex> lock(mutex_); while (queue_.empty()) { if (not_empty_.wait_for(lock, time) == std::cv_status::timeout) { return false; } } data = queue_.front(); queue_.pop(); return true; } Loading system/gd/common/blocking_queue_unittest.cc +19 −0 Original line number Diff line number Diff line Loading @@ -86,6 +86,25 @@ TEST_F(BlockingQueueTest, wait_for_non_empty) { EXPECT_TRUE(queue_.empty()); } TEST_F(BlockingQueueTest, wait_to_take_fail) { EXPECT_FALSE(queue_.wait_to_take(std::chrono::milliseconds(3))); } TEST_F(BlockingQueueTest, wait_to_take_after_non_empty) { int data = 1; queue_.push(data); EXPECT_TRUE(queue_.wait_to_take(std::chrono::milliseconds(3))); queue_.clear(); } TEST_F(BlockingQueueTest, wait_to_take_before_non_empty) { int data = 1; std::thread waiter_thread([this] { EXPECT_TRUE(queue_.wait_to_take(std::chrono::milliseconds(3))); }); queue_.push(data); waiter_thread.join(); queue_.clear(); } TEST_F(BlockingQueueTest, wait_for_non_empty_batch) { std::thread waiter_thread([this] { for (int data = 0; data < 10; data++) { Loading system/gd/grpc/grpc_event_stream.h +2 −2 Original line number Diff line number Diff line Loading @@ -64,10 +64,10 @@ class GrpcEventStream { if (fetch_mode == ::bluetooth::facade::AT_LEAST_ONE) { RES response; EVENT event; if (!event_queue_.take_for(std::chrono::milliseconds(timeout_ms), event)) { if (!event_queue_.wait_to_take(std::chrono::milliseconds(timeout_ms))) { return ::grpc::Status(::grpc::StatusCode::DEADLINE_EXCEEDED, "timeout exceeded"); } EVENT event = event_queue_.take(); callback_->OnWriteResponse(&response, event); writer->Write(response); } Loading Loading
system/gd/common/blocking_queue.h +2 −4 Original line number Diff line number Diff line Loading @@ -45,16 +45,14 @@ class BlockingQueue { return data; }; bool take_for(std::chrono::milliseconds time, T& data) { // Returns true if take() will not block within a time period bool wait_to_take(std::chrono::milliseconds time) { std::unique_lock<std::mutex> lock(mutex_); while (queue_.empty()) { if (not_empty_.wait_for(lock, time) == std::cv_status::timeout) { return false; } } data = queue_.front(); queue_.pop(); return true; } Loading
system/gd/common/blocking_queue_unittest.cc +19 −0 Original line number Diff line number Diff line Loading @@ -86,6 +86,25 @@ TEST_F(BlockingQueueTest, wait_for_non_empty) { EXPECT_TRUE(queue_.empty()); } TEST_F(BlockingQueueTest, wait_to_take_fail) { EXPECT_FALSE(queue_.wait_to_take(std::chrono::milliseconds(3))); } TEST_F(BlockingQueueTest, wait_to_take_after_non_empty) { int data = 1; queue_.push(data); EXPECT_TRUE(queue_.wait_to_take(std::chrono::milliseconds(3))); queue_.clear(); } TEST_F(BlockingQueueTest, wait_to_take_before_non_empty) { int data = 1; std::thread waiter_thread([this] { EXPECT_TRUE(queue_.wait_to_take(std::chrono::milliseconds(3))); }); queue_.push(data); waiter_thread.join(); queue_.clear(); } TEST_F(BlockingQueueTest, wait_for_non_empty_batch) { std::thread waiter_thread([this] { for (int data = 0; data < 10; data++) { Loading
system/gd/grpc/grpc_event_stream.h +2 −2 Original line number Diff line number Diff line Loading @@ -64,10 +64,10 @@ class GrpcEventStream { if (fetch_mode == ::bluetooth::facade::AT_LEAST_ONE) { RES response; EVENT event; if (!event_queue_.take_for(std::chrono::milliseconds(timeout_ms), event)) { if (!event_queue_.wait_to_take(std::chrono::milliseconds(timeout_ms))) { return ::grpc::Status(::grpc::StatusCode::DEADLINE_EXCEEDED, "timeout exceeded"); } EVENT event = event_queue_.take(); callback_->OnWriteResponse(&response, event); writer->Write(response); } Loading