Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4c8a7701 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

Timer: Don't run new task at old timepoint

* Fix a race condition that the task is just finished but IsScheduled
  is true
* Fix a bug where cancelling the old task will make new task run at
  the previous task's scheduled timepoint
* Protect racing read/write to message_loop_thread_ in RunTask() and
  Cancel() by storing a local reference to message_loop_thread_ in
  RunTask()

Bug: 116081383
Test: Run bluetooth_test_common.TimerTest
      reschedule_task_dont_invoke_new_task_early
      schedule_task
      for multiple times
Change-Id: I737a864e1b061adbcd41245711e0d3e8adf448d4
parent 66f3b87d
Loading
Loading
Loading
Loading
+15 −12
Original line number Diff line number Diff line
@@ -65,15 +65,16 @@ bool Timer::ScheduleTaskHelper(const base::WeakPtr<MessageLoopThread>& thread,
  CancelAndWait();
  expected_time_next_task_us_ = time_next_task_us;
  task_ = std::move(task);
  task_wrapper_.Reset(base::Bind(&Timer::RunTask, base::Unretained(this)));
  uint64_t time_until_next_us = time_next_task_us - time_get_os_boottime_us();
  if (!thread->DoInThreadDelayed(
          from_here, task_wrapper_,
          from_here, task_wrapper_.callback(),
          base::TimeDelta::FromMicroseconds(time_until_next_us))) {
    LOG(ERROR) << __func__
               << ": failed to post task to message loop for thread " << *thread
               << ", from " << from_here.ToString();
    expected_time_next_task_us_ = 0;
    task_.Reset();
    task_wrapper_.Cancel();
    return false;
  }
  message_loop_thread_ = thread;
@@ -99,17 +100,16 @@ void Timer::CancelAndWait() {
// This runs on user thread
void Timer::CancelHelper(std::promise<void> promise) {
  std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
  if (message_loop_thread_ == nullptr) {
  MessageLoopThread* scheduled_thread = message_loop_thread_.get();
  if (scheduled_thread == nullptr) {
    promise.set_value();
    return;
  }
  if (!message_loop_thread_->IsRunning() ||
      message_loop_thread_->GetThreadId() ==
          base::PlatformThread::CurrentId()) {
  if (scheduled_thread->GetThreadId() == base::PlatformThread::CurrentId()) {
    CancelClosure(std::move(promise));
    return;
  }
  message_loop_thread_->DoInThread(
  scheduled_thread->DoInThread(
      FROM_HERE, base::BindOnce(&Timer::CancelClosure, base::Unretained(this),
                                std::move(promise)));
}
@@ -117,7 +117,8 @@ void Timer::CancelHelper(std::promise<void> promise) {
// This runs on message loop thread
void Timer::CancelClosure(std::promise<void> promise) {
  message_loop_thread_ = nullptr;
  task_.Reset();
  task_wrapper_.Cancel();
  task_ = {};
  period_ = base::TimeDelta();
  is_periodic_ = false;
  expected_time_next_task_us_ = 0;
@@ -159,7 +160,7 @@ void Timer::RunPeriodicTask() {
    remaining_time_us = (remaining_time_us % period_us + period_us) % period_us;
  }
  message_loop_thread_->DoInThreadDelayed(
      FROM_HERE, task_wrapper_,
      FROM_HERE, task_wrapper_.callback(),
      base::TimeDelta::FromMicroseconds(remaining_time_us));

  uint64_t time_before_task_us = time_get_os_boottime_us();
@@ -176,11 +177,13 @@ void Timer::RunPeriodicTask() {

// This runs on message loop thread
void Timer::RunSingleTask() {
  task_.Run();
  message_loop_thread_ = nullptr;
  task_.Reset();
  base::OnceClosure current_task = std::move(task_);
  task_wrapper_.Cancel();
  task_ = {};
  period_ = base::TimeDelta();
  expected_time_next_task_us_ = 0;
  std::move(current_task).Run();
  message_loop_thread_ = nullptr;
}

}  // namespace common
+2 −5
Original line number Diff line number Diff line
@@ -36,10 +36,7 @@ class MessageLoopThread;
 */
class Timer final {
 public:
  Timer()
      : task_wrapper_(base::Bind(&Timer::RunTask, base::Unretained(this))),
        is_periodic_(false),
        expected_time_next_task_us_(0) {}
  Timer() : is_periodic_(false), expected_time_next_task_us_(0) {}
  ~Timer();
  Timer(const Timer&) = delete;
  Timer& operator=(const Timer&) = delete;
@@ -95,7 +92,7 @@ class Timer final {

 private:
  base::WeakPtr<MessageLoopThread> message_loop_thread_;
  const base::Closure task_wrapper_;
  base::CancelableClosure task_wrapper_;
  base::Closure task_;
  base::TimeDelta period_;
  bool is_periodic_;
+63 −0
Original line number Diff line number Diff line
@@ -220,6 +220,18 @@ TEST_F(TimerTest, cancel_single_task) {
  std::this_thread::sleep_for(std::chrono::milliseconds(delay_error_ms));
}

TEST_F(TimerTest, cancel_single_task_near_fire_no_race_condition) {
  std::string name = "test_thread";
  MessageLoopThread message_loop_thread(name);
  message_loop_thread.StartUp();
  uint32_t delay_ms = 5;
  timer_->SchedulePeriodic(message_loop_thread.GetWeakPtr(), FROM_HERE,
                           base::Bind([]() {}),
                           base::TimeDelta::FromMilliseconds(delay_ms));
  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
  timer_->CancelAndWait();
}

TEST_F(TimerTest, cancel_periodic_task) {
  std::string name = "test_thread";
  MessageLoopThread message_loop_thread(name);
@@ -335,3 +347,54 @@ TEST_F(TimerTest, schedule_task_cancel_previous_task) {
  future.wait();
  ASSERT_EQ(name, my_name);
}

TEST_F(TimerTest, reschedule_task_dont_invoke_new_task_early) {
  std::string name = "test_thread";
  MessageLoopThread message_loop_thread(name);
  message_loop_thread.StartUp();
  uint32_t delay_ms = 5;
  timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE,
                   base::Bind([]() {}),
                   base::TimeDelta::FromMilliseconds(delay_ms));
  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms - 2));
  timer_->Schedule(
      message_loop_thread.GetWeakPtr(), FROM_HERE,
      base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)),
      base::TimeDelta::FromMilliseconds(delay_ms + 1000));
  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
}

TEST_F(TimerTest, reschedule_task_when_firing_dont_invoke_new_task_early) {
  std::string name = "test_thread";
  MessageLoopThread message_loop_thread(name);
  message_loop_thread.StartUp();
  uint32_t delay_ms = 5;
  timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE,
                   base::Bind([]() {}),
                   base::TimeDelta::FromMilliseconds(delay_ms));
  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
  timer_->Schedule(
      message_loop_thread.GetWeakPtr(), FROM_HERE,
      base::Bind(&TimerTest::ShouldNotHappen, base::Unretained(this)),
      base::TimeDelta::FromMilliseconds(delay_ms + 1000));
  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
}

TEST_F(TimerTest, reschedule_task_when_firing_must_schedule_new_task) {
  std::string name = "test_thread";
  MessageLoopThread message_loop_thread(name);
  message_loop_thread.StartUp();
  uint32_t delay_ms = 5;
  std::string my_name;
  auto future = promise_->get_future();
  timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE,
                   base::Bind([]() {}),
                   base::TimeDelta::FromMilliseconds(delay_ms));
  std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
  timer_->Schedule(message_loop_thread.GetWeakPtr(), FROM_HERE,
                   base::Bind(&TimerTest::GetName, base::Unretained(this),
                              &my_name, promise_),
                   base::TimeDelta::FromMilliseconds(delay_ms));
  future.wait_for(std::chrono::milliseconds(delay_ms + delay_error_ms));
  ASSERT_EQ(name, my_name);
}