Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit fa2595dd authored by Hansong Zhang's avatar Hansong Zhang
Browse files

Queue: Wait for unregistration if on different thread

Before Queue object is deleted, we must make sure that the enqueue or
dequeue callback isn't executing. If on same thread, callbacks are
synchronized. If on different thread, we must wait for unregistration to
synchronize callbacks.

Test: bluetooth_test_gd
Bug: 150174451
Change-Id: Id3c980aa0bf7bd9fa10c33c5cca3751df38f7d97
parent 1c0d4937
Loading
Loading
Loading
Loading
+32 −10
Original line number Diff line number Diff line
@@ -37,12 +37,23 @@ void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) {

template <typename T>
void Queue<T>::UnregisterEnqueue() {
  Reactor* reactor = nullptr;
  Reactor::Reactable* to_unregister = nullptr;
  bool wait_for_unregister = false;
  {
    std::lock_guard<std::mutex> lock(mutex_);
    ASSERT(enqueue_.reactable_ != nullptr);
  enqueue_.handler_->thread_->GetReactor()->Unregister(enqueue_.reactable_);
    reactor = enqueue_.handler_->thread_->GetReactor();
    wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread());
    to_unregister = enqueue_.reactable_;
    enqueue_.reactable_ = nullptr;
    enqueue_.handler_ = nullptr;
  }
  reactor->Unregister(to_unregister);
  if (wait_for_unregister) {
    reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
  }
}

template <typename T>
void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) {
@@ -56,12 +67,23 @@ void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) {

template <typename T>
void Queue<T>::UnregisterDequeue() {
  Reactor* reactor = nullptr;
  Reactor::Reactable* to_unregister = nullptr;
  bool wait_for_unregister = false;
  {
    std::lock_guard<std::mutex> lock(mutex_);
    ASSERT(dequeue_.reactable_ != nullptr);
  dequeue_.handler_->thread_->GetReactor()->Unregister(dequeue_.reactable_);
    reactor = dequeue_.handler_->thread_->GetReactor();
    wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread());
    to_unregister = dequeue_.reactable_;
    dequeue_.reactable_ = nullptr;
    dequeue_.handler_ = nullptr;
  }
  reactor->Unregister(to_unregister);
  if (wait_for_unregister) {
    reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
  }
}

template <typename T>
std::unique_ptr<T> Queue<T>::TryDequeue() {
+63 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
#include "os/queue.h"

#include <sys/eventfd.h>
#include <atomic>
#include <future>
#include <unordered_map>

@@ -721,6 +722,68 @@ TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
  future.wait();
}

std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) {
  std::this_thread::sleep_for(std::chrono::milliseconds(100));
  (*to_increase)++;
  return std::make_unique<std::string>("Hello");
}

TEST_F(QueueTest, unregister_enqueue_and_wait) {
  Queue<std::string> queue(10);
  int* indicator = new int(100);
  queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator)));
  std::this_thread::sleep_for(std::chrono::milliseconds(50));
  queue.UnregisterEnqueue();
  EXPECT_EQ(*indicator, 101);
  delete indicator;
}

std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(int* to_increase, Queue<std::string>* queue,
                                                                       std::atomic_bool* is_registered) {
  std::this_thread::sleep_for(std::chrono::milliseconds(100));
  (*to_increase)++;
  if (is_registered->exchange(false)) {
    queue->UnregisterEnqueue();
  }
  return std::make_unique<std::string>("Hello");
}

TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) {
  Queue<std::string> queue(10);
  int* indicator = new int(100);
  std::atomic_bool is_registered = true;
  queue.RegisterEnqueue(enqueue_handler_,
                        common::Bind(&sleep_and_enqueue_callback_and_unregister, common::Unretained(indicator),
                                     common::Unretained(&queue), common::Unretained(&is_registered)));
  std::this_thread::sleep_for(std::chrono::milliseconds(50));
  if (is_registered.exchange(false)) {
    queue.UnregisterEnqueue();
  }
  EXPECT_EQ(*indicator, 101);
  delete indicator;
}

void sleep_and_dequeue_callback(int* to_increase) {
  std::this_thread::sleep_for(std::chrono::milliseconds(100));
  (*to_increase)++;
}

TEST_F(QueueTest, unregister_dequeue_and_wait) {
  int* indicator = new int(100);
  Queue<std::string> queue(10);
  queue.RegisterEnqueue(enqueue_handler_, common::Bind(
                                              [](Queue<std::string>* queue) {
                                                queue->UnregisterEnqueue();
                                                return std::make_unique<std::string>("Hello");
                                              },
                                              common::Unretained(&queue)));
  queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator)));
  std::this_thread::sleep_for(std::chrono::milliseconds(50));
  queue.UnregisterDequeue();
  EXPECT_EQ(*indicator, 101);
  delete indicator;
}

// Create all threads for death tests in the function that dies
class QueueDeathTest : public ::testing::Test {
 public: