Loading system/gd/os/linux_generic/queue.tpp +32 −10 Original line number Diff line number Diff line Loading @@ -37,12 +37,23 @@ void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) { template <typename T> void Queue<T>::UnregisterEnqueue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.reactable_ != nullptr); enqueue_.handler_->thread_->GetReactor()->Unregister(enqueue_.reactable_); reactor = enqueue_.handler_->thread_->GetReactor(); wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread()); to_unregister = enqueue_.reactable_; enqueue_.reactable_ = nullptr; enqueue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) { Loading @@ -56,12 +67,23 @@ void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) { template <typename T> void Queue<T>::UnregisterDequeue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.reactable_ != nullptr); dequeue_.handler_->thread_->GetReactor()->Unregister(dequeue_.reactable_); reactor = dequeue_.handler_->thread_->GetReactor(); wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread()); to_unregister = dequeue_.reactable_; dequeue_.reactable_ = nullptr; dequeue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> std::unique_ptr<T> Queue<T>::TryDequeue() { Loading system/gd/os/linux_generic/queue_unittest.cc +63 −0 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ #include "os/queue.h" #include <sys/eventfd.h> #include <atomic> #include <future> #include <unordered_map> Loading Loading @@ -721,6 +722,68 @@ TEST_F(QueueTest, pass_smart_pointer_and_unregister) { future.wait(); } std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); (*to_increase)++; return std::make_unique<std::string>("Hello"); } TEST_F(QueueTest, unregister_enqueue_and_wait) { Queue<std::string> queue(10); int* indicator = new int(100); queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator))); std::this_thread::sleep_for(std::chrono::milliseconds(50)); queue.UnregisterEnqueue(); EXPECT_EQ(*indicator, 101); delete indicator; } std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); (*to_increase)++; if (is_registered->exchange(false)) { queue->UnregisterEnqueue(); } return std::make_unique<std::string>("Hello"); } TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) { Queue<std::string> queue(10); int* indicator = new int(100); std::atomic_bool is_registered = true; queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback_and_unregister, common::Unretained(indicator), common::Unretained(&queue), common::Unretained(&is_registered))); std::this_thread::sleep_for(std::chrono::milliseconds(50)); if (is_registered.exchange(false)) { queue.UnregisterEnqueue(); } EXPECT_EQ(*indicator, 101); delete indicator; } void sleep_and_dequeue_callback(int* to_increase) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); (*to_increase)++; } TEST_F(QueueTest, unregister_dequeue_and_wait) { int* indicator = new int(100); Queue<std::string> queue(10); queue.RegisterEnqueue(enqueue_handler_, common::Bind( [](Queue<std::string>* queue) { queue->UnregisterEnqueue(); return std::make_unique<std::string>("Hello"); }, common::Unretained(&queue))); queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator))); std::this_thread::sleep_for(std::chrono::milliseconds(50)); queue.UnregisterDequeue(); EXPECT_EQ(*indicator, 101); delete indicator; } // Create all threads for death tests in the function that dies class QueueDeathTest : public ::testing::Test { public: Loading Loading
system/gd/os/linux_generic/queue.tpp +32 −10 Original line number Diff line number Diff line Loading @@ -37,12 +37,23 @@ void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) { template <typename T> void Queue<T>::UnregisterEnqueue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.reactable_ != nullptr); enqueue_.handler_->thread_->GetReactor()->Unregister(enqueue_.reactable_); reactor = enqueue_.handler_->thread_->GetReactor(); wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread()); to_unregister = enqueue_.reactable_; enqueue_.reactable_ = nullptr; enqueue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) { Loading @@ -56,12 +67,23 @@ void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) { template <typename T> void Queue<T>::UnregisterDequeue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.reactable_ != nullptr); dequeue_.handler_->thread_->GetReactor()->Unregister(dequeue_.reactable_); reactor = dequeue_.handler_->thread_->GetReactor(); wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread()); to_unregister = dequeue_.reactable_; dequeue_.reactable_ = nullptr; dequeue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> std::unique_ptr<T> Queue<T>::TryDequeue() { Loading
system/gd/os/linux_generic/queue_unittest.cc +63 −0 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ #include "os/queue.h" #include <sys/eventfd.h> #include <atomic> #include <future> #include <unordered_map> Loading Loading @@ -721,6 +722,68 @@ TEST_F(QueueTest, pass_smart_pointer_and_unregister) { future.wait(); } std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); (*to_increase)++; return std::make_unique<std::string>("Hello"); } TEST_F(QueueTest, unregister_enqueue_and_wait) { Queue<std::string> queue(10); int* indicator = new int(100); queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator))); std::this_thread::sleep_for(std::chrono::milliseconds(50)); queue.UnregisterEnqueue(); EXPECT_EQ(*indicator, 101); delete indicator; } std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); (*to_increase)++; if (is_registered->exchange(false)) { queue->UnregisterEnqueue(); } return std::make_unique<std::string>("Hello"); } TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) { Queue<std::string> queue(10); int* indicator = new int(100); std::atomic_bool is_registered = true; queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback_and_unregister, common::Unretained(indicator), common::Unretained(&queue), common::Unretained(&is_registered))); std::this_thread::sleep_for(std::chrono::milliseconds(50)); if (is_registered.exchange(false)) { queue.UnregisterEnqueue(); } EXPECT_EQ(*indicator, 101); delete indicator; } void sleep_and_dequeue_callback(int* to_increase) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); (*to_increase)++; } TEST_F(QueueTest, unregister_dequeue_and_wait) { int* indicator = new int(100); Queue<std::string> queue(10); queue.RegisterEnqueue(enqueue_handler_, common::Bind( [](Queue<std::string>* queue) { queue->UnregisterEnqueue(); return std::make_unique<std::string>("Hello"); }, common::Unretained(&queue))); queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator))); std::this_thread::sleep_for(std::chrono::milliseconds(50)); queue.UnregisterDequeue(); EXPECT_EQ(*indicator, 101); delete indicator; } // Create all threads for death tests in the function that dies class QueueDeathTest : public ::testing::Test { public: Loading