Loading system/common/timer.cc +57 −40 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ namespace common { constexpr base::TimeDelta kMinimumPeriod = base::TimeDelta::FromMicroseconds(1); // This runs on user thread Timer::~Timer() { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); if (message_loop_thread_ != nullptr && message_loop_thread_->IsRunning()) { Loading Loading @@ -64,15 +65,16 @@ bool Timer::ScheduleTaskHelper(const base::WeakPtr<MessageLoopThread>& thread, CancelAndWait(); expected_time_next_task_us_ = time_next_task_us; task_ = std::move(task); task_wrapper_.Reset(base::Bind(&Timer::RunTask, base::Unretained(this))); uint64_t time_until_next_us = time_next_task_us - time_get_os_boottime_us(); if (!thread->DoInThreadDelayed( from_here, task_wrapper_, from_here, task_wrapper_.callback(), base::TimeDelta::FromMicroseconds(time_until_next_us))) { LOG(ERROR) << __func__ << ": failed to post task to message loop for thread " << *thread << ", from " << from_here.ToString(); expected_time_next_task_us_ = 0; task_.Reset(); task_wrapper_.Cancel(); return false; } message_loop_thread_ = thread; Loading @@ -83,60 +85,71 @@ bool Timer::ScheduleTaskHelper(const base::WeakPtr<MessageLoopThread>& thread, // This runs on user thread void Timer::Cancel() { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); CancelHelper(false); std::promise<void> promise; CancelHelper(std::move(promise)); } // This runs on user thread void Timer::CancelAndWait() { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); CancelHelper(true); std::promise<void> promise; auto future = promise.get_future(); CancelHelper(std::move(promise)); future.wait(); } // This runs on user thread void Timer::CancelHelper(bool is_synchronous) { if (message_loop_thread_ == nullptr) { void Timer::CancelHelper(std::promise<void> promise) { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); MessageLoopThread* scheduled_thread = message_loop_thread_.get(); if (scheduled_thread == nullptr) { promise.set_value(); return; } std::promise<void> promise; auto future = promise.get_future(); if (message_loop_thread_->GetThreadId() == base::PlatformThread::CurrentId()) { if (scheduled_thread->GetThreadId() == base::PlatformThread::CurrentId()) { CancelClosure(std::move(promise)); return; } message_loop_thread_->DoInThread( scheduled_thread->DoInThread( FROM_HERE, base::BindOnce(&Timer::CancelClosure, base::Unretained(this), std::move(promise))); if (is_synchronous) { future.wait(); } } // This runs on message loop thread void Timer::CancelClosure(std::promise<void> promise) { message_loop_thread_ = nullptr; task_.Reset(); task_wrapper_.Cancel(); task_ = {}; period_ = base::TimeDelta(); is_periodic_ = false; expected_time_next_task_us_ = 0; promise.set_value(); } // This runs in user thread // This runs on user thread bool Timer::IsScheduled() const { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); return message_loop_thread_ != nullptr && message_loop_thread_->IsRunning(); } // This runs in message loop thread // This runs on message loop thread void Timer::RunTask() { if (message_loop_thread_ == nullptr || !message_loop_thread_->IsRunning()) { LOG(ERROR) << __func__ << ": message_loop_thread_ is null or is not running"; return; } CHECK_EQ(message_loop_thread_->GetThreadId(), base::PlatformThread::CurrentId()) << ": task must run on message loop thread"; if (is_periodic_) { RunPeriodicTask(); } else { RunSingleTask(); } } // This runs on message loop thread void Timer::RunPeriodicTask() { int64_t period_us = period_.InMicroseconds(); expected_time_next_task_us_ += period_us; uint64_t time_now_us = time_get_os_boottime_us(); Loading @@ -144,29 +157,33 @@ void Timer::RunTask() { if (remaining_time_us < 0) { // if remaining_time_us is negative, schedule the task to the nearest // multiple of period remaining_time_us = (remaining_time_us % period_us + period_us) % period_us; remaining_time_us = (remaining_time_us % period_us + period_us) % period_us; } message_loop_thread_->DoInThreadDelayed( FROM_HERE, task_wrapper_, FROM_HERE, task_wrapper_.callback(), base::TimeDelta::FromMicroseconds(remaining_time_us)); } uint64_t time_before_task_us = time_get_os_boottime_us(); task_.Run(); uint64_t time_after_task_us = time_get_os_boottime_us(); int64_t task_time_us = auto task_time_us = static_cast<int64_t>(time_after_task_us - time_before_task_us); if (is_periodic_ && task_time_us > period_.InMicroseconds()) { if (task_time_us > period_.InMicroseconds()) { LOG(ERROR) << __func__ << ": Periodic task execution took " << task_time_us << " microseconds, longer than interval " << period_.InMicroseconds() << " microseconds"; } if (!is_periodic_) { message_loop_thread_ = nullptr; task_.Reset(); } // This runs on message loop thread void Timer::RunSingleTask() { base::OnceClosure current_task = std::move(task_); task_wrapper_.Cancel(); task_ = {}; period_ = base::TimeDelta(); expected_time_next_task_us_ = 0; } std::move(current_task).Run(); message_loop_thread_ = nullptr; } } // namespace common Loading system/common/timer.h +5 −6 Original line number Diff line number Diff line Loading @@ -36,10 +36,7 @@ class MessageLoopThread; */ class Timer final { public: Timer() : task_wrapper_(base::Bind(&Timer::RunTask, base::Unretained(this))), is_periodic_(false), expected_time_next_task_us_(0) {} Timer() : is_periodic_(false), expected_time_next_task_us_(0) {} ~Timer(); Timer(const Timer&) = delete; Timer& operator=(const Timer&) = delete; Loading Loading @@ -95,7 +92,7 @@ class Timer final { private: base::WeakPtr<MessageLoopThread> message_loop_thread_; const base::Closure task_wrapper_; base::CancelableClosure task_wrapper_; base::Closure task_; base::TimeDelta period_; bool is_periodic_; Loading @@ -105,13 +102,15 @@ class Timer final { const tracked_objects::Location& from_here, base::Closure task, base::TimeDelta delay, bool is_periodic); void CancelHelper(bool is_synchronous); void CancelHelper(std::promise<void> promise); void CancelClosure(std::promise<void> promise); /** * Wraps a task. It posts another same task if the scheduled task is periodic. */ void RunTask(); void RunSingleTask(); void RunPeriodicTask(); }; } // namespace common Loading system/common/timer_unittest.cc +63 −0 Original line number Diff line number Diff line Loading @@ -220,6 +220,18 @@ TEST_F(TimerTest, cancel_single_task) { std::this_thread::sleep_for(std::chrono::milliseconds(delay_error_ms)); } TEST_F(TimerTest, cancel_single_task_near_fire_no_race_condition) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); uint32_t delay_ms = 5; timer_->SchedulePeriodic(message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind([]() {}), base::TimeDelta::FromMilliseconds(delay_ms)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); timer_->CancelAndWait(); } TEST_F(TimerTest, cancel_periodic_task) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); Loading Loading @@ -335,3 +347,54 @@ TEST_F(TimerTest, schedule_task_cancel_previous_task) { future.wait(); ASSERT_EQ(name, my_name); } TEST_F(TimerTest, reschedule_task_dont_invoke_new_task_early) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); uint32_t delay_ms = 5; timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind([]() {}), base::TimeDelta::FromMilliseconds(delay_ms)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms - 2)); timer_->Schedule( message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)), base::TimeDelta::FromMilliseconds(delay_ms + 1000)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); } TEST_F(TimerTest, reschedule_task_when_firing_dont_invoke_new_task_early) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); uint32_t delay_ms = 5; timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind([]() {}), base::TimeDelta::FromMilliseconds(delay_ms)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); timer_->Schedule( message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)), base::TimeDelta::FromMilliseconds(delay_ms + 1000)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); } TEST_F(TimerTest, reschedule_task_when_firing_must_schedule_new_task) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); uint32_t delay_ms = 5; std::string my_name; auto future = promise_->get_future(); timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind([]() {}), base::TimeDelta::FromMilliseconds(delay_ms)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind(&TimerTest::GetName, base::Unretained(this), &my_name, promise_), base::TimeDelta::FromMilliseconds(delay_ms)); future.wait_for(std::chrono::milliseconds(delay_ms + delay_error_ms)); ASSERT_EQ(name, my_name); } Loading
system/common/timer.cc +57 −40 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ namespace common { constexpr base::TimeDelta kMinimumPeriod = base::TimeDelta::FromMicroseconds(1); // This runs on user thread Timer::~Timer() { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); if (message_loop_thread_ != nullptr && message_loop_thread_->IsRunning()) { Loading Loading @@ -64,15 +65,16 @@ bool Timer::ScheduleTaskHelper(const base::WeakPtr<MessageLoopThread>& thread, CancelAndWait(); expected_time_next_task_us_ = time_next_task_us; task_ = std::move(task); task_wrapper_.Reset(base::Bind(&Timer::RunTask, base::Unretained(this))); uint64_t time_until_next_us = time_next_task_us - time_get_os_boottime_us(); if (!thread->DoInThreadDelayed( from_here, task_wrapper_, from_here, task_wrapper_.callback(), base::TimeDelta::FromMicroseconds(time_until_next_us))) { LOG(ERROR) << __func__ << ": failed to post task to message loop for thread " << *thread << ", from " << from_here.ToString(); expected_time_next_task_us_ = 0; task_.Reset(); task_wrapper_.Cancel(); return false; } message_loop_thread_ = thread; Loading @@ -83,60 +85,71 @@ bool Timer::ScheduleTaskHelper(const base::WeakPtr<MessageLoopThread>& thread, // This runs on user thread void Timer::Cancel() { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); CancelHelper(false); std::promise<void> promise; CancelHelper(std::move(promise)); } // This runs on user thread void Timer::CancelAndWait() { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); CancelHelper(true); std::promise<void> promise; auto future = promise.get_future(); CancelHelper(std::move(promise)); future.wait(); } // This runs on user thread void Timer::CancelHelper(bool is_synchronous) { if (message_loop_thread_ == nullptr) { void Timer::CancelHelper(std::promise<void> promise) { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); MessageLoopThread* scheduled_thread = message_loop_thread_.get(); if (scheduled_thread == nullptr) { promise.set_value(); return; } std::promise<void> promise; auto future = promise.get_future(); if (message_loop_thread_->GetThreadId() == base::PlatformThread::CurrentId()) { if (scheduled_thread->GetThreadId() == base::PlatformThread::CurrentId()) { CancelClosure(std::move(promise)); return; } message_loop_thread_->DoInThread( scheduled_thread->DoInThread( FROM_HERE, base::BindOnce(&Timer::CancelClosure, base::Unretained(this), std::move(promise))); if (is_synchronous) { future.wait(); } } // This runs on message loop thread void Timer::CancelClosure(std::promise<void> promise) { message_loop_thread_ = nullptr; task_.Reset(); task_wrapper_.Cancel(); task_ = {}; period_ = base::TimeDelta(); is_periodic_ = false; expected_time_next_task_us_ = 0; promise.set_value(); } // This runs in user thread // This runs on user thread bool Timer::IsScheduled() const { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); return message_loop_thread_ != nullptr && message_loop_thread_->IsRunning(); } // This runs in message loop thread // This runs on message loop thread void Timer::RunTask() { if (message_loop_thread_ == nullptr || !message_loop_thread_->IsRunning()) { LOG(ERROR) << __func__ << ": message_loop_thread_ is null or is not running"; return; } CHECK_EQ(message_loop_thread_->GetThreadId(), base::PlatformThread::CurrentId()) << ": task must run on message loop thread"; if (is_periodic_) { RunPeriodicTask(); } else { RunSingleTask(); } } // This runs on message loop thread void Timer::RunPeriodicTask() { int64_t period_us = period_.InMicroseconds(); expected_time_next_task_us_ += period_us; uint64_t time_now_us = time_get_os_boottime_us(); Loading @@ -144,29 +157,33 @@ void Timer::RunTask() { if (remaining_time_us < 0) { // if remaining_time_us is negative, schedule the task to the nearest // multiple of period remaining_time_us = (remaining_time_us % period_us + period_us) % period_us; remaining_time_us = (remaining_time_us % period_us + period_us) % period_us; } message_loop_thread_->DoInThreadDelayed( FROM_HERE, task_wrapper_, FROM_HERE, task_wrapper_.callback(), base::TimeDelta::FromMicroseconds(remaining_time_us)); } uint64_t time_before_task_us = time_get_os_boottime_us(); task_.Run(); uint64_t time_after_task_us = time_get_os_boottime_us(); int64_t task_time_us = auto task_time_us = static_cast<int64_t>(time_after_task_us - time_before_task_us); if (is_periodic_ && task_time_us > period_.InMicroseconds()) { if (task_time_us > period_.InMicroseconds()) { LOG(ERROR) << __func__ << ": Periodic task execution took " << task_time_us << " microseconds, longer than interval " << period_.InMicroseconds() << " microseconds"; } if (!is_periodic_) { message_loop_thread_ = nullptr; task_.Reset(); } // This runs on message loop thread void Timer::RunSingleTask() { base::OnceClosure current_task = std::move(task_); task_wrapper_.Cancel(); task_ = {}; period_ = base::TimeDelta(); expected_time_next_task_us_ = 0; } std::move(current_task).Run(); message_loop_thread_ = nullptr; } } // namespace common Loading
system/common/timer.h +5 −6 Original line number Diff line number Diff line Loading @@ -36,10 +36,7 @@ class MessageLoopThread; */ class Timer final { public: Timer() : task_wrapper_(base::Bind(&Timer::RunTask, base::Unretained(this))), is_periodic_(false), expected_time_next_task_us_(0) {} Timer() : is_periodic_(false), expected_time_next_task_us_(0) {} ~Timer(); Timer(const Timer&) = delete; Timer& operator=(const Timer&) = delete; Loading Loading @@ -95,7 +92,7 @@ class Timer final { private: base::WeakPtr<MessageLoopThread> message_loop_thread_; const base::Closure task_wrapper_; base::CancelableClosure task_wrapper_; base::Closure task_; base::TimeDelta period_; bool is_periodic_; Loading @@ -105,13 +102,15 @@ class Timer final { const tracked_objects::Location& from_here, base::Closure task, base::TimeDelta delay, bool is_periodic); void CancelHelper(bool is_synchronous); void CancelHelper(std::promise<void> promise); void CancelClosure(std::promise<void> promise); /** * Wraps a task. It posts another same task if the scheduled task is periodic. */ void RunTask(); void RunSingleTask(); void RunPeriodicTask(); }; } // namespace common Loading
system/common/timer_unittest.cc +63 −0 Original line number Diff line number Diff line Loading @@ -220,6 +220,18 @@ TEST_F(TimerTest, cancel_single_task) { std::this_thread::sleep_for(std::chrono::milliseconds(delay_error_ms)); } TEST_F(TimerTest, cancel_single_task_near_fire_no_race_condition) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); uint32_t delay_ms = 5; timer_->SchedulePeriodic(message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind([]() {}), base::TimeDelta::FromMilliseconds(delay_ms)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); timer_->CancelAndWait(); } TEST_F(TimerTest, cancel_periodic_task) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); Loading Loading @@ -335,3 +347,54 @@ TEST_F(TimerTest, schedule_task_cancel_previous_task) { future.wait(); ASSERT_EQ(name, my_name); } TEST_F(TimerTest, reschedule_task_dont_invoke_new_task_early) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); uint32_t delay_ms = 5; timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind([]() {}), base::TimeDelta::FromMilliseconds(delay_ms)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms - 2)); timer_->Schedule( message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)), base::TimeDelta::FromMilliseconds(delay_ms + 1000)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); } TEST_F(TimerTest, reschedule_task_when_firing_dont_invoke_new_task_early) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); uint32_t delay_ms = 5; timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind([]() {}), base::TimeDelta::FromMilliseconds(delay_ms)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); timer_->Schedule( message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)), base::TimeDelta::FromMilliseconds(delay_ms + 1000)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); } TEST_F(TimerTest, reschedule_task_when_firing_must_schedule_new_task) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); uint32_t delay_ms = 5; std::string my_name; auto future = promise_->get_future(); timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind([]() {}), base::TimeDelta::FromMilliseconds(delay_ms)); std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE, base::Bind(&TimerTest::GetName, base::Unretained(this), &my_name, promise_), base::TimeDelta::FromMilliseconds(delay_ms)); future.wait_for(std::chrono::milliseconds(delay_ms + delay_error_ms)); ASSERT_EQ(name, my_name); }