Loading system/common/message_loop_thread.cc +61 −43 Original line number Diff line number Diff line Loading @@ -35,25 +35,24 @@ MessageLoopThread::MessageLoopThread(const std::string& thread_name) thread_(nullptr), thread_id_(-1), linux_tid_(-1), weak_ptr_factory_(this) {} weak_ptr_factory_(this), shutting_down_(false) {} MessageLoopThread::~MessageLoopThread() { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); if (thread_ != nullptr) { ShutDown(); } } MessageLoopThread::~MessageLoopThread() { ShutDown(); } void MessageLoopThread::StartUp() { std::promise<void> start_up_promise; std::future<void> start_up_future = start_up_promise.get_future(); { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); if (thread_ != nullptr) { LOG(WARNING) << __func__ << ": thread " << *this << " is already started"; return; } std::promise<void> start_up_promise; std::future<void> start_up_future = start_up_promise.get_future(); thread_ = new std::thread(&MessageLoopThread::RunThread, this, std::move(start_up_promise)); } start_up_future.wait(); } Loading Loading @@ -82,15 +81,24 @@ bool MessageLoopThread::DoInThreadDelayed( } void MessageLoopThread::ShutDown() { { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); if (thread_ == nullptr) { LOG(WARNING) << __func__ << ": thread " << *this << " is already stopped"; LOG(INFO) << __func__ << ": thread " << *this << " is already stopped"; return; } if (message_loop_ == nullptr) { LOG(INFO) << __func__ << ": message_loop_ is null. Already stopping"; return; } if (shutting_down_) { LOG(INFO) << __func__ << ": waiting for thread to join"; return; } shutting_down_ = true; CHECK_NE(thread_id_, base::PlatformThread::CurrentId()) << __func__ << " should not be called on the thread itself. " << "Otherwise, deadlock may happen."; if (message_loop_ != nullptr) { if (!message_loop_->task_runner()->PostTask( FROM_HERE, run_loop_->QuitWhenIdleClosure())) { LOG(FATAL) << __func__ Loading @@ -99,8 +107,12 @@ void MessageLoopThread::ShutDown() { } } thread_->join(); { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); delete thread_; thread_ = nullptr; shutting_down_ = false; } } base::PlatformThreadId MessageLoopThread::GetThreadId() const { Loading @@ -109,7 +121,6 @@ base::PlatformThreadId MessageLoopThread::GetThreadId() const { } std::string MessageLoopThread::GetName() const { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); return thread_name_; } Loading Loading @@ -158,8 +169,9 @@ base::WeakPtr<MessageLoopThread> MessageLoopThread::GetWeakPtr() { return weak_ptr_factory_.GetWeakPtr(); } // Non API method, should NOT be protected by API mutex to avoid deadlock void MessageLoopThread::Run(std::promise<void> start_up_promise) { { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); LOG(INFO) << __func__ << ": message loop starting for thread " << thread_name_; base::PlatformThread::SetName(thread_name_); Loading @@ -168,8 +180,13 @@ void MessageLoopThread::Run(std::promise<void> start_up_promise) { thread_id_ = base::PlatformThread::CurrentId(); linux_tid_ = static_cast<pid_t>(syscall(SYS_gettid)); start_up_promise.set_value(); } // Blocking until ShutDown() is called run_loop_->Run(); { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); thread_id_ = -1; linux_tid_ = -1; delete message_loop_; Loading @@ -179,6 +196,7 @@ void MessageLoopThread::Run(std::promise<void> start_up_promise) { LOG(INFO) << __func__ << ": message loop finished for thread " << thread_name_; } } } // namespace common Loading system/common/message_loop_thread.h +2 −1 Original line number Diff line number Diff line Loading @@ -188,7 +188,7 @@ class MessageLoopThread final { void Run(std::promise<void> start_up_promise); mutable std::recursive_mutex api_mutex_; std::string thread_name_; const std::string thread_name_; base::MessageLoop* message_loop_; base::RunLoop* run_loop_; std::thread* thread_; Loading @@ -196,6 +196,7 @@ class MessageLoopThread final { // Linux specific abstractions pid_t linux_tid_; base::WeakPtrFactory<MessageLoopThread> weak_ptr_factory_; bool shutting_down_; DISALLOW_COPY_AND_ASSIGN(MessageLoopThread); }; Loading system/common/message_loop_thread_unittest.cc +50 −0 Original line number Diff line number Diff line Loading @@ -280,3 +280,53 @@ TEST_F(MessageLoopThreadTest, shut_down_while_in_callback) { std::string my_name = name_future.get(); ASSERT_EQ(name, my_name); } // Verify the message loop thread will shutdown after callback finishes TEST_F(MessageLoopThreadTest, shut_down_while_in_callback_check_lock) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); message_loop_thread.DoInThread( FROM_HERE, base::BindOnce([](MessageLoopThread* thread) { thread->IsRunning(); }, &message_loop_thread)); message_loop_thread.ShutDown(); } // Verify multiple threads try shutdown, no deadlock/crash TEST_F(MessageLoopThreadTest, shut_down_multi_thread) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); auto thread = std::thread(&MessageLoopThread::ShutDown, &message_loop_thread); message_loop_thread.ShutDown(); thread.join(); } // Verify multiple threads try startup, no deadlock/crash TEST_F(MessageLoopThreadTest, start_up_multi_thread) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); auto thread = std::thread(&MessageLoopThread::StartUp, &message_loop_thread); thread.join(); } // Verify multiple threads try startup/shutdown, no deadlock/crash TEST_F(MessageLoopThreadTest, start_up_shut_down_multi_thread) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); auto thread = std::thread(&MessageLoopThread::ShutDown, &message_loop_thread); thread.join(); } // Verify multiple threads try shutdown/startup, no deadlock/crash TEST_F(MessageLoopThreadTest, shut_down_start_up_multi_thread) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); message_loop_thread.ShutDown(); auto thread = std::thread(&MessageLoopThread::StartUp, &message_loop_thread); thread.join(); } Loading
system/common/message_loop_thread.cc +61 −43 Original line number Diff line number Diff line Loading @@ -35,25 +35,24 @@ MessageLoopThread::MessageLoopThread(const std::string& thread_name) thread_(nullptr), thread_id_(-1), linux_tid_(-1), weak_ptr_factory_(this) {} weak_ptr_factory_(this), shutting_down_(false) {} MessageLoopThread::~MessageLoopThread() { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); if (thread_ != nullptr) { ShutDown(); } } MessageLoopThread::~MessageLoopThread() { ShutDown(); } void MessageLoopThread::StartUp() { std::promise<void> start_up_promise; std::future<void> start_up_future = start_up_promise.get_future(); { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); if (thread_ != nullptr) { LOG(WARNING) << __func__ << ": thread " << *this << " is already started"; return; } std::promise<void> start_up_promise; std::future<void> start_up_future = start_up_promise.get_future(); thread_ = new std::thread(&MessageLoopThread::RunThread, this, std::move(start_up_promise)); } start_up_future.wait(); } Loading Loading @@ -82,15 +81,24 @@ bool MessageLoopThread::DoInThreadDelayed( } void MessageLoopThread::ShutDown() { { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); if (thread_ == nullptr) { LOG(WARNING) << __func__ << ": thread " << *this << " is already stopped"; LOG(INFO) << __func__ << ": thread " << *this << " is already stopped"; return; } if (message_loop_ == nullptr) { LOG(INFO) << __func__ << ": message_loop_ is null. Already stopping"; return; } if (shutting_down_) { LOG(INFO) << __func__ << ": waiting for thread to join"; return; } shutting_down_ = true; CHECK_NE(thread_id_, base::PlatformThread::CurrentId()) << __func__ << " should not be called on the thread itself. " << "Otherwise, deadlock may happen."; if (message_loop_ != nullptr) { if (!message_loop_->task_runner()->PostTask( FROM_HERE, run_loop_->QuitWhenIdleClosure())) { LOG(FATAL) << __func__ Loading @@ -99,8 +107,12 @@ void MessageLoopThread::ShutDown() { } } thread_->join(); { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); delete thread_; thread_ = nullptr; shutting_down_ = false; } } base::PlatformThreadId MessageLoopThread::GetThreadId() const { Loading @@ -109,7 +121,6 @@ base::PlatformThreadId MessageLoopThread::GetThreadId() const { } std::string MessageLoopThread::GetName() const { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); return thread_name_; } Loading Loading @@ -158,8 +169,9 @@ base::WeakPtr<MessageLoopThread> MessageLoopThread::GetWeakPtr() { return weak_ptr_factory_.GetWeakPtr(); } // Non API method, should NOT be protected by API mutex to avoid deadlock void MessageLoopThread::Run(std::promise<void> start_up_promise) { { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); LOG(INFO) << __func__ << ": message loop starting for thread " << thread_name_; base::PlatformThread::SetName(thread_name_); Loading @@ -168,8 +180,13 @@ void MessageLoopThread::Run(std::promise<void> start_up_promise) { thread_id_ = base::PlatformThread::CurrentId(); linux_tid_ = static_cast<pid_t>(syscall(SYS_gettid)); start_up_promise.set_value(); } // Blocking until ShutDown() is called run_loop_->Run(); { std::lock_guard<std::recursive_mutex> api_lock(api_mutex_); thread_id_ = -1; linux_tid_ = -1; delete message_loop_; Loading @@ -179,6 +196,7 @@ void MessageLoopThread::Run(std::promise<void> start_up_promise) { LOG(INFO) << __func__ << ": message loop finished for thread " << thread_name_; } } } // namespace common Loading
system/common/message_loop_thread.h +2 −1 Original line number Diff line number Diff line Loading @@ -188,7 +188,7 @@ class MessageLoopThread final { void Run(std::promise<void> start_up_promise); mutable std::recursive_mutex api_mutex_; std::string thread_name_; const std::string thread_name_; base::MessageLoop* message_loop_; base::RunLoop* run_loop_; std::thread* thread_; Loading @@ -196,6 +196,7 @@ class MessageLoopThread final { // Linux specific abstractions pid_t linux_tid_; base::WeakPtrFactory<MessageLoopThread> weak_ptr_factory_; bool shutting_down_; DISALLOW_COPY_AND_ASSIGN(MessageLoopThread); }; Loading
system/common/message_loop_thread_unittest.cc +50 −0 Original line number Diff line number Diff line Loading @@ -280,3 +280,53 @@ TEST_F(MessageLoopThreadTest, shut_down_while_in_callback) { std::string my_name = name_future.get(); ASSERT_EQ(name, my_name); } // Verify the message loop thread will shutdown after callback finishes TEST_F(MessageLoopThreadTest, shut_down_while_in_callback_check_lock) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); message_loop_thread.DoInThread( FROM_HERE, base::BindOnce([](MessageLoopThread* thread) { thread->IsRunning(); }, &message_loop_thread)); message_loop_thread.ShutDown(); } // Verify multiple threads try shutdown, no deadlock/crash TEST_F(MessageLoopThreadTest, shut_down_multi_thread) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); auto thread = std::thread(&MessageLoopThread::ShutDown, &message_loop_thread); message_loop_thread.ShutDown(); thread.join(); } // Verify multiple threads try startup, no deadlock/crash TEST_F(MessageLoopThreadTest, start_up_multi_thread) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); auto thread = std::thread(&MessageLoopThread::StartUp, &message_loop_thread); thread.join(); } // Verify multiple threads try startup/shutdown, no deadlock/crash TEST_F(MessageLoopThreadTest, start_up_shut_down_multi_thread) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); auto thread = std::thread(&MessageLoopThread::ShutDown, &message_loop_thread); thread.join(); } // Verify multiple threads try shutdown/startup, no deadlock/crash TEST_F(MessageLoopThreadTest, shut_down_start_up_multi_thread) { std::string name = "test_thread"; MessageLoopThread message_loop_thread(name); message_loop_thread.StartUp(); message_loop_thread.ShutDown(); auto thread = std::thread(&MessageLoopThread::StartUp, &message_loop_thread); thread.join(); }