Loading tools/rootcanal/model/setup/async_manager.cc +20 −3 Original line number Diff line number Diff line Loading @@ -364,6 +364,7 @@ class AsyncManager::AsyncTaskManager { std::chrono::steady_clock::time_point time; bool periodic; std::chrono::milliseconds period{}; std::mutex in_callback; // Taken when the callback is active TaskCallback callback; AsyncTaskId task_id; AsyncUserId user_id; Loading @@ -381,8 +382,22 @@ class AsyncManager::AsyncTaskManager { if (tasks_by_id_.count(async_task_id) == 0) { return false; } // Now make sure we are not running this task. // 2 cases: // - This is called from thread_, this means a running // scheduled task is actually unregistering. All bets are off. // - Another thread is calling us, let's make sure the task is not active. if (thread_.get_id() != std::this_thread::get_id()) { auto task = tasks_by_id_[async_task_id]; const std::lock_guard<std::mutex> lock(task->in_callback); task_queue_.erase(task); tasks_by_id_.erase(async_task_id); } else { task_queue_.erase(tasks_by_id_[async_task_id]); tasks_by_id_.erase(async_task_id); } return true; } Loading Loading @@ -437,11 +452,12 @@ class AsyncManager::AsyncTaskManager { void ThreadRoutine() { while (running_) { TaskCallback callback; std::shared_ptr<Task> task_p; bool run_it = false; { std::unique_lock<std::mutex> guard(internal_mutex_); if (!task_queue_.empty()) { std::shared_ptr<Task> task_p = *(task_queue_.begin()); task_p = *(task_queue_.begin()); if (task_p->time < std::chrono::steady_clock::now()) { run_it = true; callback = task_p->callback; Loading @@ -458,6 +474,7 @@ class AsyncManager::AsyncTaskManager { } } if (run_it) { const std::lock_guard<std::mutex> lock(task_p->in_callback); callback(); } { Loading tools/rootcanal/model/setup/async_manager.h +10 −10 Original line number Diff line number Diff line Loading @@ -77,18 +77,18 @@ class AsyncManager { std::chrono::milliseconds period, const TaskCallback& callback); // Cancels the/every future occurrence of the action specified by this id. It // is guaranteed that the associated callback will not be called after this // method returns (it could be called during the execution of the method). // The calling thread may block until the scheduling thread acknowledges the // cancellation. // Cancels the/every future occurrence of the action specified by this id. // The following invariants will hold: // - The task will not be invoked after this method returns // - If the task is currently running it will block until the task is // completed, unless cancel is called from the running task. bool CancelAsyncTask(AsyncTaskId async_task_id); // Cancels the/every future occurrence of the action specified by this id. It // is guaranteed that the associated callback will not be called after this // method returns (it could be called during the execution of the method). // The calling thread may block until the scheduling thread acknowledges the // cancellation. // Cancels the/every future occurrence of the action specified by this id. // The following invariants will hold: // - The task will not be invoked after this method returns // - If the task is currently running it will block until the task is // completed, unless cancel is called from the running task. bool CancelAsyncTasksFromUser(AsyncUserId user_id); // Execs the given code in a synchronized manner. It is guaranteed that code Loading tools/rootcanal/model/setup/test_model.cc +4 −0 Original line number Diff line number Diff line Loading @@ -52,6 +52,10 @@ TestModel::TestModel( model_user_id_ = get_user_id_(); } TestModel::~TestModel() { StopTimer(); } void TestModel::SetTimerPeriod(std::chrono::milliseconds new_period) { timer_period_ = new_period; Loading tools/rootcanal/model/setup/test_model.h +1 −1 Original line number Diff line number Diff line Loading @@ -49,7 +49,7 @@ class TestModel { std::function<void(AsyncTaskId)> cancel, std::function<std::shared_ptr<Device>(const std::string&, int, Phy::Type)> connect_to_remote); ~TestModel() = default; ~TestModel(); TestModel(TestModel& model) = delete; TestModel& operator=(const TestModel& model) = delete; Loading tools/rootcanal/test/async_manager_unittest.cc +43 −1 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ #include <ratio> // for ratio #include <string> // for string #include <tuple> // for tuple #include <thread> #include "osi/include/osi.h" // for OSI_NO_INTR Loading Loading @@ -123,6 +124,7 @@ class AsyncManagerSocketTest : public ::testing::Test { void SetUp() override { memset(server_buffer_, 0, kBufferSize); memset(client_buffer_, 0, kBufferSize); socket_fd_ = StartServer(); Loading @@ -137,7 +139,7 @@ class AsyncManagerSocketTest : public ::testing::Test { void TearDown() override { async_manager_.StopWatchingFileDescriptor(socket_fd_); close(socket_fd_); ASSERT_TRUE(CheckBufferEquals()); ASSERT_EQ(std::string_view(server_buffer_, kBufferSize), std::string_view(client_buffer_, kBufferSize)); } int ConnectClient() { Loading Loading @@ -215,6 +217,46 @@ TEST_F(AsyncManagerSocketTest, CanUnsubscribeInCallback) { close(socket_cli_fd); } TEST_F(AsyncManagerSocketTest, CanUnsubscribeTaskFromWithinTask) { Event running; using namespace std::chrono_literals; async_manager_.ExecAsyncPeriodically(1, 1ms, 2ms, [&running, this]() { EXPECT_TRUE(async_manager_.CancelAsyncTask(1)) << "We were scheduled, so cancel should return true"; EXPECT_FALSE(async_manager_.CancelAsyncTask(1)) << "We were not scheduled, so cancel should return false"; running.set(true); }); EXPECT_TRUE(running.wait_for(10ms)); } TEST_F(AsyncManagerSocketTest, UnsubScribeWaitsUntilCompletion) { using namespace std::chrono_literals; Event running; bool cancel_done = false; bool task_complete = false; async_manager_.ExecAsyncPeriodically(1, 1ms, 2ms, [&running, &cancel_done, &task_complete]() { // Let the other thread now we are in the callback.. running.set(true); // Wee bit of a hack that relies on timing.. std::this_thread::sleep_for(20ms); EXPECT_FALSE(cancel_done) << "Task cancellation did not wait for us to complete!"; task_complete = true; }); EXPECT_TRUE(running.wait_for(10ms)); auto start = std::chrono::system_clock::now(); // There is a 20ms wait.. so we know that this should take some time. EXPECT_TRUE(async_manager_.CancelAsyncTask(1)) << "We were scheduled, so cancel should return true"; cancel_done = true; EXPECT_TRUE(task_complete) << "We managed to cancel a task while it was not yet finished."; auto end = std::chrono::system_clock::now(); auto passed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start); EXPECT_GT(passed_ms.count(), 10); } TEST_F(AsyncManagerSocketTest, NoEventsAfterUnsubscribe) { // This tests makes sure the AsyncManager never fires an event // after calling StopWatchingFileDescriptor. Loading Loading
tools/rootcanal/model/setup/async_manager.cc +20 −3 Original line number Diff line number Diff line Loading @@ -364,6 +364,7 @@ class AsyncManager::AsyncTaskManager { std::chrono::steady_clock::time_point time; bool periodic; std::chrono::milliseconds period{}; std::mutex in_callback; // Taken when the callback is active TaskCallback callback; AsyncTaskId task_id; AsyncUserId user_id; Loading @@ -381,8 +382,22 @@ class AsyncManager::AsyncTaskManager { if (tasks_by_id_.count(async_task_id) == 0) { return false; } // Now make sure we are not running this task. // 2 cases: // - This is called from thread_, this means a running // scheduled task is actually unregistering. All bets are off. // - Another thread is calling us, let's make sure the task is not active. if (thread_.get_id() != std::this_thread::get_id()) { auto task = tasks_by_id_[async_task_id]; const std::lock_guard<std::mutex> lock(task->in_callback); task_queue_.erase(task); tasks_by_id_.erase(async_task_id); } else { task_queue_.erase(tasks_by_id_[async_task_id]); tasks_by_id_.erase(async_task_id); } return true; } Loading Loading @@ -437,11 +452,12 @@ class AsyncManager::AsyncTaskManager { void ThreadRoutine() { while (running_) { TaskCallback callback; std::shared_ptr<Task> task_p; bool run_it = false; { std::unique_lock<std::mutex> guard(internal_mutex_); if (!task_queue_.empty()) { std::shared_ptr<Task> task_p = *(task_queue_.begin()); task_p = *(task_queue_.begin()); if (task_p->time < std::chrono::steady_clock::now()) { run_it = true; callback = task_p->callback; Loading @@ -458,6 +474,7 @@ class AsyncManager::AsyncTaskManager { } } if (run_it) { const std::lock_guard<std::mutex> lock(task_p->in_callback); callback(); } { Loading
tools/rootcanal/model/setup/async_manager.h +10 −10 Original line number Diff line number Diff line Loading @@ -77,18 +77,18 @@ class AsyncManager { std::chrono::milliseconds period, const TaskCallback& callback); // Cancels the/every future occurrence of the action specified by this id. It // is guaranteed that the associated callback will not be called after this // method returns (it could be called during the execution of the method). // The calling thread may block until the scheduling thread acknowledges the // cancellation. // Cancels the/every future occurrence of the action specified by this id. // The following invariants will hold: // - The task will not be invoked after this method returns // - If the task is currently running it will block until the task is // completed, unless cancel is called from the running task. bool CancelAsyncTask(AsyncTaskId async_task_id); // Cancels the/every future occurrence of the action specified by this id. It // is guaranteed that the associated callback will not be called after this // method returns (it could be called during the execution of the method). // The calling thread may block until the scheduling thread acknowledges the // cancellation. // Cancels the/every future occurrence of the action specified by this id. // The following invariants will hold: // - The task will not be invoked after this method returns // - If the task is currently running it will block until the task is // completed, unless cancel is called from the running task. bool CancelAsyncTasksFromUser(AsyncUserId user_id); // Execs the given code in a synchronized manner. It is guaranteed that code Loading
tools/rootcanal/model/setup/test_model.cc +4 −0 Original line number Diff line number Diff line Loading @@ -52,6 +52,10 @@ TestModel::TestModel( model_user_id_ = get_user_id_(); } TestModel::~TestModel() { StopTimer(); } void TestModel::SetTimerPeriod(std::chrono::milliseconds new_period) { timer_period_ = new_period; Loading
tools/rootcanal/model/setup/test_model.h +1 −1 Original line number Diff line number Diff line Loading @@ -49,7 +49,7 @@ class TestModel { std::function<void(AsyncTaskId)> cancel, std::function<std::shared_ptr<Device>(const std::string&, int, Phy::Type)> connect_to_remote); ~TestModel() = default; ~TestModel(); TestModel(TestModel& model) = delete; TestModel& operator=(const TestModel& model) = delete; Loading
tools/rootcanal/test/async_manager_unittest.cc +43 −1 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ #include <ratio> // for ratio #include <string> // for string #include <tuple> // for tuple #include <thread> #include "osi/include/osi.h" // for OSI_NO_INTR Loading Loading @@ -123,6 +124,7 @@ class AsyncManagerSocketTest : public ::testing::Test { void SetUp() override { memset(server_buffer_, 0, kBufferSize); memset(client_buffer_, 0, kBufferSize); socket_fd_ = StartServer(); Loading @@ -137,7 +139,7 @@ class AsyncManagerSocketTest : public ::testing::Test { void TearDown() override { async_manager_.StopWatchingFileDescriptor(socket_fd_); close(socket_fd_); ASSERT_TRUE(CheckBufferEquals()); ASSERT_EQ(std::string_view(server_buffer_, kBufferSize), std::string_view(client_buffer_, kBufferSize)); } int ConnectClient() { Loading Loading @@ -215,6 +217,46 @@ TEST_F(AsyncManagerSocketTest, CanUnsubscribeInCallback) { close(socket_cli_fd); } TEST_F(AsyncManagerSocketTest, CanUnsubscribeTaskFromWithinTask) { Event running; using namespace std::chrono_literals; async_manager_.ExecAsyncPeriodically(1, 1ms, 2ms, [&running, this]() { EXPECT_TRUE(async_manager_.CancelAsyncTask(1)) << "We were scheduled, so cancel should return true"; EXPECT_FALSE(async_manager_.CancelAsyncTask(1)) << "We were not scheduled, so cancel should return false"; running.set(true); }); EXPECT_TRUE(running.wait_for(10ms)); } TEST_F(AsyncManagerSocketTest, UnsubScribeWaitsUntilCompletion) { using namespace std::chrono_literals; Event running; bool cancel_done = false; bool task_complete = false; async_manager_.ExecAsyncPeriodically(1, 1ms, 2ms, [&running, &cancel_done, &task_complete]() { // Let the other thread now we are in the callback.. running.set(true); // Wee bit of a hack that relies on timing.. std::this_thread::sleep_for(20ms); EXPECT_FALSE(cancel_done) << "Task cancellation did not wait for us to complete!"; task_complete = true; }); EXPECT_TRUE(running.wait_for(10ms)); auto start = std::chrono::system_clock::now(); // There is a 20ms wait.. so we know that this should take some time. EXPECT_TRUE(async_manager_.CancelAsyncTask(1)) << "We were scheduled, so cancel should return true"; cancel_done = true; EXPECT_TRUE(task_complete) << "We managed to cancel a task while it was not yet finished."; auto end = std::chrono::system_clock::now(); auto passed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start); EXPECT_GT(passed_ms.count(), 10); } TEST_F(AsyncManagerSocketTest, NoEventsAfterUnsubscribe) { // This tests makes sure the AsyncManager never fires an event // after calling StopWatchingFileDescriptor. Loading