Loading system/vendor_libs/test_vendor_lib/include/async_manager.h +20 −12 Original line number Diff line number Diff line Loading @@ -25,18 +25,26 @@ using AsyncTaskId = uint16_t; constexpr uint16_t kInvalidTaskId = 0; // Manages tasks that should be done in the future. It can watch file // descriptors to call a given callback when it is certain that a block will not // occur or can call a callback at a specific time (aproximately) and // (optionally) repeat the call periodically. The class itself is thread safe in // the sense that all its member functions can be called simultaneously from // different concurrent thread. However, no asumption should be made about // callback execution. The safest approach is to assume that any two callbacks // could be executed concurrently in different threads, so code provided in // callbacks need to actively deal with race conditions, deadlocks, etc. While // not required, it is strongly recommended to use the Synchronize(const // CriticalCallback&) member function to execute code inside critical sections. // Callbacks passed to this method on the same AsyncManager object from // different threads are granted to *NOT* run concurrently. // descriptors to call a given callback when it is certain that a non-blocking // read is possible or can call a callback at a specific time (aproximately) and // (optionally) repeat the call periodically. // The class is thread safe in the sense that all its member functions can be // called simultaneously from different concurrent threads. The exception to // this rule is the class destructor, which is unsafe to call concurrently with // calls to other class member functions. This exception also has its own // exception: it is safe to destroy the object even if some of its callbacks may // call its member functions, because the destructor will make sure all callback // calling threads are stopped before actually destroying anything. Callbacks // that wait for file descriptor always run on the same thread, so there is no // need of additional synchronization between them. The same applies to task // callbacks since they also run on a thread of their own, however it is // possible for a read callback and a task callback to execute at the same time // (they are garanteed to run in different threads) so synchronization is needed // to access common state (other than the internal state of the AsyncManager // class). While not required, it is strongly recommended to use the // Synchronize(const CriticalCallback&) member function to execute code inside // critical sections. Callbacks passed to this method on the same AsyncManager // object from different threads are granted to *NOT* run concurrently. class AsyncManager { public: // Starts watching a file descriptor in a separate thread. The Loading system/vendor_libs/test_vendor_lib/src/async_manager.cc +55 −49 Original line number Diff line number Diff line Loading @@ -122,7 +122,30 @@ class AsyncManager::AsyncFdWatcher { AsyncFdWatcher() = default; ~AsyncFdWatcher() { stopThread(); } ~AsyncFdWatcher() = default; int stopThread() { if (!std::atomic_exchange(&running_, false)) { return 0; // if not running already } notifyThread(); if (std::this_thread::get_id() != thread_.get_id()) { thread_.join(); } else { LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the reading thread itself", __FUNCTION__); } { std::unique_lock<std::mutex> guard(internal_mutex_); watched_shared_fds_.clear(); } return 0; } private: AsyncFdWatcher(const AsyncFdWatcher&) = delete; Loading Loading @@ -156,29 +179,6 @@ class AsyncManager::AsyncFdWatcher { return 0; } int stopThread() { if (!std::atomic_exchange(&running_, false)) { return 0; // if not running already } notifyThread(); if (std::this_thread::get_id() != thread_.get_id()) { thread_.join(); } else { LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the reading thread itself", __FUNCTION__); } { std::unique_lock<std::mutex> guard(internal_mutex_); watched_shared_fds_.clear(); } return 0; } int notifyThread() { char buffer = '0'; if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) { Loading Loading @@ -303,7 +303,29 @@ class AsyncManager::AsyncTaskManager { AsyncTaskManager() = default; ~AsyncTaskManager() { stopThread(); } ~AsyncTaskManager() = default; int stopThread() { { std::unique_lock<std::mutex> guard(internal_mutex_); tasks_by_id.clear(); task_queue_.clear(); if (!running_) { return 0; } running_ = false; // notify the thread internal_cond_var_.notify_one(); } // release the lock before joining a thread that is likely waiting for it if (std::this_thread::get_id() != thread_.get_id()) { thread_.join(); } else { LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the task thread itself", __FUNCTION__); } return 0; } private: // Holds the data for each task Loading Loading @@ -399,28 +421,6 @@ class AsyncManager::AsyncTaskManager { return -1; } int stopThread() { { std::unique_lock<std::mutex> guard(internal_mutex_); tasks_by_id.clear(); task_queue_.clear(); if (!running_) { return 0; } running_ = false; // notify the thread internal_cond_var_.notify_one(); } // release the lock before joining a thread that is likely waiting for it if (std::this_thread::get_id() != thread_.get_id()) { thread_.join(); } else { LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the task thread itself", __FUNCTION__); } return 0; } void ThreadRoutine() { while (1) { TaskCallback callback; Loading Loading @@ -477,8 +477,14 @@ AsyncManager::AsyncManager() taskManager_p_(new AsyncTaskManager()) {} AsyncManager::~AsyncManager() { fdWatcher_p_.reset(); // make sure the threads are stopped taskManager_p_.reset(); // before destroying the mutex // Make sure the threads are stopped before destroying the object. // The threads need to be stopped here and not in each internal class' // destructor because unique_ptr's reset() first assigns nullptr to the // pointer and only then calls the destructor, so any callback running // on these threads would dereference a null pointer if they called a member // function of this class. fdWatcher_p_->stopThread(); taskManager_p_->stopThread(); } int AsyncManager::WatchFdForNonBlockingReads( Loading Loading
system/vendor_libs/test_vendor_lib/include/async_manager.h +20 −12 Original line number Diff line number Diff line Loading @@ -25,18 +25,26 @@ using AsyncTaskId = uint16_t; constexpr uint16_t kInvalidTaskId = 0; // Manages tasks that should be done in the future. It can watch file // descriptors to call a given callback when it is certain that a block will not // occur or can call a callback at a specific time (aproximately) and // (optionally) repeat the call periodically. The class itself is thread safe in // the sense that all its member functions can be called simultaneously from // different concurrent thread. However, no asumption should be made about // callback execution. The safest approach is to assume that any two callbacks // could be executed concurrently in different threads, so code provided in // callbacks need to actively deal with race conditions, deadlocks, etc. While // not required, it is strongly recommended to use the Synchronize(const // CriticalCallback&) member function to execute code inside critical sections. // Callbacks passed to this method on the same AsyncManager object from // different threads are granted to *NOT* run concurrently. // descriptors to call a given callback when it is certain that a non-blocking // read is possible or can call a callback at a specific time (aproximately) and // (optionally) repeat the call periodically. // The class is thread safe in the sense that all its member functions can be // called simultaneously from different concurrent threads. The exception to // this rule is the class destructor, which is unsafe to call concurrently with // calls to other class member functions. This exception also has its own // exception: it is safe to destroy the object even if some of its callbacks may // call its member functions, because the destructor will make sure all callback // calling threads are stopped before actually destroying anything. Callbacks // that wait for file descriptor always run on the same thread, so there is no // need of additional synchronization between them. The same applies to task // callbacks since they also run on a thread of their own, however it is // possible for a read callback and a task callback to execute at the same time // (they are garanteed to run in different threads) so synchronization is needed // to access common state (other than the internal state of the AsyncManager // class). While not required, it is strongly recommended to use the // Synchronize(const CriticalCallback&) member function to execute code inside // critical sections. Callbacks passed to this method on the same AsyncManager // object from different threads are granted to *NOT* run concurrently. class AsyncManager { public: // Starts watching a file descriptor in a separate thread. The Loading
system/vendor_libs/test_vendor_lib/src/async_manager.cc +55 −49 Original line number Diff line number Diff line Loading @@ -122,7 +122,30 @@ class AsyncManager::AsyncFdWatcher { AsyncFdWatcher() = default; ~AsyncFdWatcher() { stopThread(); } ~AsyncFdWatcher() = default; int stopThread() { if (!std::atomic_exchange(&running_, false)) { return 0; // if not running already } notifyThread(); if (std::this_thread::get_id() != thread_.get_id()) { thread_.join(); } else { LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the reading thread itself", __FUNCTION__); } { std::unique_lock<std::mutex> guard(internal_mutex_); watched_shared_fds_.clear(); } return 0; } private: AsyncFdWatcher(const AsyncFdWatcher&) = delete; Loading Loading @@ -156,29 +179,6 @@ class AsyncManager::AsyncFdWatcher { return 0; } int stopThread() { if (!std::atomic_exchange(&running_, false)) { return 0; // if not running already } notifyThread(); if (std::this_thread::get_id() != thread_.get_id()) { thread_.join(); } else { LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the reading thread itself", __FUNCTION__); } { std::unique_lock<std::mutex> guard(internal_mutex_); watched_shared_fds_.clear(); } return 0; } int notifyThread() { char buffer = '0'; if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) { Loading Loading @@ -303,7 +303,29 @@ class AsyncManager::AsyncTaskManager { AsyncTaskManager() = default; ~AsyncTaskManager() { stopThread(); } ~AsyncTaskManager() = default; int stopThread() { { std::unique_lock<std::mutex> guard(internal_mutex_); tasks_by_id.clear(); task_queue_.clear(); if (!running_) { return 0; } running_ = false; // notify the thread internal_cond_var_.notify_one(); } // release the lock before joining a thread that is likely waiting for it if (std::this_thread::get_id() != thread_.get_id()) { thread_.join(); } else { LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the task thread itself", __FUNCTION__); } return 0; } private: // Holds the data for each task Loading Loading @@ -399,28 +421,6 @@ class AsyncManager::AsyncTaskManager { return -1; } int stopThread() { { std::unique_lock<std::mutex> guard(internal_mutex_); tasks_by_id.clear(); task_queue_.clear(); if (!running_) { return 0; } running_ = false; // notify the thread internal_cond_var_.notify_one(); } // release the lock before joining a thread that is likely waiting for it if (std::this_thread::get_id() != thread_.get_id()) { thread_.join(); } else { LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the task thread itself", __FUNCTION__); } return 0; } void ThreadRoutine() { while (1) { TaskCallback callback; Loading Loading @@ -477,8 +477,14 @@ AsyncManager::AsyncManager() taskManager_p_(new AsyncTaskManager()) {} AsyncManager::~AsyncManager() { fdWatcher_p_.reset(); // make sure the threads are stopped taskManager_p_.reset(); // before destroying the mutex // Make sure the threads are stopped before destroying the object. // The threads need to be stopped here and not in each internal class' // destructor because unique_ptr's reset() first assigns nullptr to the // pointer and only then calls the destructor, so any callback running // on these threads would dereference a null pointer if they called a member // function of this class. fdWatcher_p_->stopThread(); taskManager_p_->stopThread(); } int AsyncManager::WatchFdForNonBlockingReads( Loading