Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7fc931a4 authored by Jack He's avatar Jack He
Browse files

Common: Replace ExecutionBarrier with std::promise and std::future

* std::promise and std::future are able to achieve the same
  functionalities of ExecutionBarrier with extra flexibility
* Replace "_barrier" with "_promise" in system/bt/common

Bug: 110303473
Fixes: 112159657
Test: mm -j40, unit test
Change-Id: I2a420bbf16bf92e4b3dd256d9f23480fc2be7be1
parent 34bd633f
Loading
Loading
Loading
Loading
+0 −2
Original line number Diff line number Diff line
@@ -8,7 +8,6 @@ cc_library_static {
    ],
    srcs: [
        "message_loop_thread.cc",
        "execution_barrier.cc",
        "metrics.cc",
        "time_util.cc",
    ],
@@ -27,7 +26,6 @@ cc_test {
        "packages/modules/Bluetooth/system/stack/include",
    ],
    srcs : [
        "execution_barrier_unittest.cc",
        "leaky_bonded_queue_unittest.cc",
        "message_loop_thread_unittest.cc",
        "metrics_unittest.cc",
+69 −51
Original line number Diff line number Diff line
@@ -19,22 +19,21 @@
#include <base/run_loop.h>
#include <base/threading/thread.h>
#include <benchmark/benchmark.h>
#include <future>
#include <memory>
#include <thread>

#include "common/execution_barrier.h"
#include "common/message_loop_thread.h"
#include "osi/include/fixed_queue.h"
#include "osi/include/thread.h"

using ::benchmark::State;
using bluetooth::common::ExecutionBarrier;
using bluetooth::common::MessageLoopThread;

#define NUM_MESSAGES_TO_SEND 100000

volatile static int g_counter = 0;
static std::unique_ptr<ExecutionBarrier> g_counter_barrier = nullptr;
static std::unique_ptr<std::promise<void>> g_counter_promise = nullptr;

void pthread_callback_batch(void* context) {
  auto queue = static_cast<fixed_queue_t*>(context);
@@ -42,16 +41,16 @@ void pthread_callback_batch(void* context) {
  fixed_queue_dequeue(queue);
  g_counter++;
  if (g_counter >= NUM_MESSAGES_TO_SEND) {
    g_counter_barrier->NotifyFinished();
    g_counter_promise->set_value();
  }
}

void callback_sequential(void* context) { g_counter_barrier->NotifyFinished(); }
void callback_sequential(void* context) { g_counter_promise->set_value(); }

void callback_sequential_queue(fixed_queue_t* queue, void* context) {
  CHECK_NE(queue, nullptr);
  fixed_queue_dequeue(queue);
  g_counter_barrier->NotifyFinished();
  g_counter_promise->set_value();
}

void callback_batch(fixed_queue_t* queue, void* data) {
@@ -59,7 +58,7 @@ void callback_batch(fixed_queue_t* queue, void* data) {
  fixed_queue_dequeue(queue);
  g_counter++;
  if (g_counter >= NUM_MESSAGES_TO_SEND) {
    g_counter_barrier->NotifyFinished();
    g_counter_promise->set_value();
  }
}

@@ -67,19 +66,19 @@ class BM_ThreadPerformance : public ::benchmark::Fixture {
 protected:
  void SetUp(State& st) override {
    benchmark::Fixture::SetUp(st);
    set_up_barrier_ = std::make_unique<ExecutionBarrier>();
    set_up_promise_ = std::make_unique<std::promise<void>>();
    g_counter = 0;
    bt_msg_queue_ = fixed_queue_new(SIZE_MAX);
  }
  void TearDown(State& st) override {
    fixed_queue_free(bt_msg_queue_, nullptr);
    bt_msg_queue_ = nullptr;
    set_up_barrier_.reset(nullptr);
    g_counter_barrier.reset(nullptr);
    set_up_promise_.reset(nullptr);
    g_counter_promise.reset(nullptr);
    benchmark::Fixture::TearDown(st);
  }
  fixed_queue_t* bt_msg_queue_ = nullptr;
  std::unique_ptr<ExecutionBarrier> set_up_barrier_;
  std::unique_ptr<std::promise<void>> set_up_promise_;
};

class BM_MessageLoop : public BM_ThreadPerformance {
@@ -97,8 +96,8 @@ class BM_MessageLoop : public BM_ThreadPerformance {
    message_loop_ = new base::MessageLoop();
    run_loop_ = new base::RunLoop();
    message_loop_->task_runner()->PostTask(
        FROM_HERE, base::BindOnce(&ExecutionBarrier::NotifyFinished,
                                  base::Unretained(set_up_barrier_.get())));
        FROM_HERE, base::BindOnce(&std::promise<void>::set_value,
                                  base::Unretained(set_up_promise_.get())));
    run_loop_->Run();
    delete message_loop_;
    message_loop_ = nullptr;
@@ -115,9 +114,10 @@ class BM_MessageLoopOsiThread : public BM_MessageLoop {
 protected:
  void SetUp(State& st) override {
    BM_MessageLoop::SetUp(st);
    std::future<void> set_up_future = set_up_promise_->get_future();
    thread_ = thread_new("BM_MessageLoopOnOsiThread thread");
    thread_post(thread_, &BM_MessageLoop::RunThread, this);
    set_up_barrier_->WaitForExecution();
    set_up_future.wait();
  }

  void TearDown(State& st) override {
@@ -134,23 +134,25 @@ class BM_MessageLoopOsiThread : public BM_MessageLoop {
BENCHMARK_F(BM_MessageLoopOsiThread, batch_enque_dequeue)(State& state) {
  for (auto _ : state) {
    g_counter = 0;
    g_counter_barrier = std::make_unique<ExecutionBarrier>();
    g_counter_promise = std::make_unique<std::promise<void>>();
    std::future<void> counter_future = g_counter_promise->get_future();
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter);
      message_loop_->task_runner()->PostTask(
          FROM_HERE, base::BindOnce(&callback_batch, bt_msg_queue_, nullptr));
    }
    g_counter_barrier->WaitForExecution();
    counter_future.wait();
  }
};

BENCHMARK_F(BM_MessageLoopOsiThread, sequential_execution)(State& state) {
  for (auto _ : state) {
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      g_counter_barrier = std::make_unique<ExecutionBarrier>();
      g_counter_promise = std::make_unique<std::promise<void>>();
      std::future<void> counter_future = g_counter_promise->get_future();
      message_loop_->task_runner()->PostTask(
          FROM_HERE, base::BindOnce(&callback_sequential, nullptr));
      g_counter_barrier->WaitForExecution();
      counter_future.wait();
    }
  }
};
@@ -159,8 +161,9 @@ class BM_MessageLoopStlThread : public BM_MessageLoop {
 protected:
  void SetUp(State& st) override {
    BM_MessageLoop::SetUp(st);
    std::future<void> set_up_future = set_up_promise_->get_future();
    thread_ = new std::thread(&BM_MessageLoop::RunThread, this);
    set_up_barrier_->WaitForExecution();
    set_up_future.wait();
  }

  void TearDown(State& st) override {
@@ -178,23 +181,25 @@ class BM_MessageLoopStlThread : public BM_MessageLoop {
BENCHMARK_F(BM_MessageLoopStlThread, batch_enque_dequeue)(State& state) {
  for (auto _ : state) {
    g_counter = 0;
    g_counter_barrier = std::make_unique<ExecutionBarrier>();
    g_counter_promise = std::make_unique<std::promise<void>>();
    std::future<void> counter_future = g_counter_promise->get_future();
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter);
      message_loop_->task_runner()->PostTask(
          FROM_HERE, base::BindOnce(&callback_batch, bt_msg_queue_, nullptr));
    }
    g_counter_barrier->WaitForExecution();
    counter_future.wait();
  }
};

BENCHMARK_F(BM_MessageLoopStlThread, sequential_execution)(State& state) {
  for (auto _ : state) {
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      g_counter_barrier = std::make_unique<ExecutionBarrier>();
      g_counter_promise = std::make_unique<std::promise<void>>();
      std::future<void> counter_future = g_counter_promise->get_future();
      message_loop_->task_runner()->PostTask(
          FROM_HERE, base::BindOnce(&callback_sequential, nullptr));
      g_counter_barrier->WaitForExecution();
      counter_future.wait();
    }
  }
};
@@ -203,8 +208,9 @@ class BM_MessageLoopPosixThread : public BM_MessageLoop {
 protected:
  void SetUp(State& st) override {
    BM_MessageLoop::SetUp(st);
    std::future<void> set_up_future = set_up_promise_->get_future();
    pthread_create(&thread_, nullptr, &BM_MessageLoop::RunPThread, (void*)this);
    set_up_barrier_->WaitForExecution();
    set_up_future.wait();
  }

  void TearDown(State& st) override {
@@ -220,23 +226,25 @@ class BM_MessageLoopPosixThread : public BM_MessageLoop {
BENCHMARK_F(BM_MessageLoopPosixThread, batch_enque_dequeue)(State& state) {
  for (auto _ : state) {
    g_counter = 0;
    g_counter_barrier = std::make_unique<ExecutionBarrier>();
    g_counter_promise = std::make_unique<std::promise<void>>();
    std::future<void> counter_future = g_counter_promise->get_future();
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter);
      message_loop_->task_runner()->PostTask(
          FROM_HERE, base::BindOnce(&callback_batch, bt_msg_queue_, nullptr));
    }
    g_counter_barrier->WaitForExecution();
    counter_future.wait();
  }
};

BENCHMARK_F(BM_MessageLoopPosixThread, sequential_execution)(State& state) {
  for (auto _ : state) {
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      g_counter_barrier = std::make_unique<ExecutionBarrier>();
      g_counter_promise = std::make_unique<std::promise<void>>();
      std::future<void> counter_future = g_counter_promise->get_future();
      message_loop_->task_runner()->PostTask(
          FROM_HERE, base::BindOnce(&callback_sequential, nullptr));
      g_counter_barrier->WaitForExecution();
      counter_future.wait();
    }
  }
};
@@ -261,12 +269,13 @@ BENCHMARK_F(BM_OsiReactorThread, batch_enque_dequeue_using_thread_post)
(State& state) {
  for (auto _ : state) {
    g_counter = 0;
    g_counter_barrier = std::make_unique<ExecutionBarrier>();
    g_counter_promise = std::make_unique<std::promise<void>>();
    std::future<void> counter_future = g_counter_promise->get_future();
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter);
      thread_post(thread_, pthread_callback_batch, bt_msg_queue_);
    }
    g_counter_barrier->WaitForExecution();
    counter_future.wait();
  }
};

@@ -274,9 +283,10 @@ BENCHMARK_F(BM_OsiReactorThread, sequential_execution_using_thread_post)
(State& state) {
  for (auto _ : state) {
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      g_counter_barrier = std::make_unique<ExecutionBarrier>();
      g_counter_promise = std::make_unique<std::promise<void>>();
      std::future<void> counter_future = g_counter_promise->get_future();
      thread_post(thread_, callback_sequential, nullptr);
      g_counter_barrier->WaitForExecution();
      counter_future.wait();
    }
  }
};
@@ -287,11 +297,12 @@ BENCHMARK_F(BM_OsiReactorThread, batch_enque_dequeue_using_reactor)
                               callback_batch, nullptr);
  for (auto _ : state) {
    g_counter = 0;
    g_counter_barrier = std::make_unique<ExecutionBarrier>();
    g_counter_promise = std::make_unique<std::promise<void>>();
    std::future<void> counter_future = g_counter_promise->get_future();
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter);
    }
    g_counter_barrier->WaitForExecution();
    counter_future.wait();
  }
};

@@ -301,9 +312,10 @@ BENCHMARK_F(BM_OsiReactorThread, sequential_execution_using_reactor)
                               callback_sequential_queue, nullptr);
  for (auto _ : state) {
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      g_counter_barrier = std::make_unique<ExecutionBarrier>();
      g_counter_promise = std::make_unique<std::promise<void>>();
      std::future<void> counter_future = g_counter_promise->get_future();
      fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter);
      g_counter_barrier->WaitForExecution();
      counter_future.wait();
    }
  }
};
@@ -312,13 +324,14 @@ class BM_MessageLooopThread : public BM_ThreadPerformance {
 protected:
  void SetUp(State& st) override {
    BM_ThreadPerformance::SetUp(st);
    std::future<void> set_up_future = set_up_promise_->get_future();
    message_loop_thread_ =
        new MessageLoopThread("BM_MessageLooopThread thread");
    message_loop_thread_->StartUp();
    message_loop_thread_->DoInThread(
        FROM_HERE, base::BindOnce(&ExecutionBarrier::NotifyFinished,
                                  base::Unretained(set_up_barrier_.get())));
    set_up_barrier_->WaitForExecution();
        FROM_HERE, base::BindOnce(&std::promise<void>::set_value,
                                  base::Unretained(set_up_promise_.get())));
    set_up_future.wait();
  }

  void TearDown(State& st) override {
@@ -334,23 +347,25 @@ class BM_MessageLooopThread : public BM_ThreadPerformance {
BENCHMARK_F(BM_MessageLooopThread, batch_enque_dequeue)(State& state) {
  for (auto _ : state) {
    g_counter = 0;
    g_counter_barrier = std::make_unique<ExecutionBarrier>();
    g_counter_promise = std::make_unique<std::promise<void>>();
    std::future<void> counter_future = g_counter_promise->get_future();
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter);
      message_loop_thread_->DoInThread(
          FROM_HERE, base::BindOnce(&callback_batch, bt_msg_queue_, nullptr));
    }
    g_counter_barrier->WaitForExecution();
    counter_future.wait();
  }
};

BENCHMARK_F(BM_MessageLooopThread, sequential_execution)(State& state) {
  for (auto _ : state) {
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      g_counter_barrier = std::make_unique<ExecutionBarrier>();
      g_counter_promise = std::make_unique<std::promise<void>>();
      std::future<void> counter_future = g_counter_promise->get_future();
      message_loop_thread_->DoInThread(
          FROM_HERE, base::BindOnce(&callback_sequential, nullptr));
      g_counter_barrier->WaitForExecution();
      counter_future.wait();
    }
  }
};
@@ -359,12 +374,13 @@ class BM_LibChromeThread : public BM_ThreadPerformance {
 protected:
  void SetUp(State& st) override {
    BM_ThreadPerformance::SetUp(st);
    std::future<void> set_up_future = set_up_promise_->get_future();
    thread_ = new base::Thread("BM_LibChromeThread thread");
    thread_->Start();
    thread_->task_runner()->PostTask(
        FROM_HERE, base::BindOnce(&ExecutionBarrier::NotifyFinished,
                                  base::Unretained(set_up_barrier_.get())));
    set_up_barrier_->WaitForExecution();
        FROM_HERE, base::BindOnce(&std::promise<void>::set_value,
                                  base::Unretained(set_up_promise_.get())));
    set_up_future.wait();
  }

  void TearDown(State& st) override {
@@ -380,23 +396,25 @@ class BM_LibChromeThread : public BM_ThreadPerformance {
BENCHMARK_F(BM_LibChromeThread, batch_enque_dequeue)(State& state) {
  for (auto _ : state) {
    g_counter = 0;
    g_counter_barrier = std::make_unique<ExecutionBarrier>();
    g_counter_promise = std::make_unique<std::promise<void>>();
    std::future<void> counter_future = g_counter_promise->get_future();
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter);
      thread_->task_runner()->PostTask(
          FROM_HERE, base::BindOnce(&callback_batch, bt_msg_queue_, nullptr));
    }
    g_counter_barrier->WaitForExecution();
    counter_future.wait();
  }
};

BENCHMARK_F(BM_LibChromeThread, sequential_execution)(State& state) {
  for (auto _ : state) {
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      g_counter_barrier = std::make_unique<ExecutionBarrier>();
      g_counter_promise = std::make_unique<std::promise<void>>();
      std::future<void> counter_future = g_counter_promise->get_future();
      thread_->task_runner()->PostTask(
          FROM_HERE, base::BindOnce(&callback_sequential, nullptr));
      g_counter_barrier->WaitForExecution();
      counter_future.wait();
    }
  }
};
+0 −38
Original line number Diff line number Diff line
/*
 * Copyright 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "execution_barrier.h"

namespace bluetooth {

namespace common {

void ExecutionBarrier::WaitForExecution() {
  std::unique_lock<std::mutex> lock(execution_mutex_);
  while (!finished_) {
    execution_cv_.wait(lock);
  }
}

void ExecutionBarrier::NotifyFinished() {
  std::unique_lock<std::mutex> lock(execution_mutex_);
  finished_ = true;
  execution_cv_.notify_all();
}

}  // namespace common

}  // namespace bluetooth
 No newline at end of file

system/common/execution_barrier.h

deleted100644 → 0
+0 −70
Original line number Diff line number Diff line
/*
 * Copyright 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <condition_variable>
#include <memory>
#include <mutex>

#include <base/macros.h>

namespace bluetooth {

namespace common {

/**
 * A utility to wait for an event on another thread
 *
 * This class can be used once only. This means that after the first time
 * NotifyFinished() is called, WaitForExecution() will no longer block. User
 * needs to create a new instance if another ExecutionBarrier is needed.
 *
 * No reset mechanism is provided for this class to avoid racy scenarios and
 * unsafe API usage
 *
 * Similar to std::experimental::barrier, but this can be used only once
 */
class ExecutionBarrier final {
 public:
  explicit ExecutionBarrier() : finished_(false){};

  /**
   * Blocks until NotifyFinished() is called on this object
   *
   */
  void WaitForExecution();

  /**
   * Unblocks any caller who are blocked on WaitForExecution() method call
   */
  void NotifyFinished();

 private:
  bool finished_;
  std::mutex execution_mutex_;
  std::condition_variable execution_cv_;

  /**
   * Prevent COPY and ASSIGN since many internal states cannot be copied or
   * assigned
   */
  DISALLOW_COPY_AND_ASSIGN(ExecutionBarrier);
};

}  // namespace common

}  // namespace bluetooth
 No newline at end of file
+0 −95
Original line number Diff line number Diff line
/*
 * Copyright 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <chrono>
#include <thread>

#include <gtest/gtest.h>

#include "execution_barrier.h"

using bluetooth::common::ExecutionBarrier;

static constexpr int kSleepTimeMs = 100;
static constexpr int kSchedulingDelayMaxMs = 5;

TEST(ExecutionBarrierTest, test_two_threads_wait_before_execution) {
  ExecutionBarrier execution_barrier;
  std::thread caller1([&]() {
    auto start = std::chrono::high_resolution_clock::now();
    execution_barrier.WaitForExecution();
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> elapsed_ms = end - start;
    EXPECT_NEAR(elapsed_ms.count(), kSleepTimeMs, kSchedulingDelayMaxMs);
  });
  std::thread executor([&]() {
    // Wait for kSleepTimeMs so that caller1 starts waiting first
    std::this_thread::sleep_for(std::chrono::milliseconds(kSleepTimeMs));
    execution_barrier.NotifyFinished();
  });
  executor.join();
  caller1.join();
  // Further calls to WaitForExecution() no longer blocks
  std::thread caller2([&]() {
    auto start = std::chrono::high_resolution_clock::now();
    execution_barrier.WaitForExecution();
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> elapsed_ms = end - start;
    EXPECT_LT(elapsed_ms.count(), kSchedulingDelayMaxMs);
  });
  caller2.join();
}

TEST(ExecutionBarrierTest, test_two_threads_execution_before_wait) {
  ExecutionBarrier execution_barrier;
  std::thread executor([&]() { execution_barrier.NotifyFinished(); });
  std::thread caller1([&]() {
    // Wait for kSleepTimeMs so that executor finishes running first
    std::this_thread::sleep_for(std::chrono::milliseconds(kSleepTimeMs));
    auto start = std::chrono::high_resolution_clock::now();
    execution_barrier.WaitForExecution();
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> elapsed_ms = end - start;
    EXPECT_LT(elapsed_ms.count(), kSchedulingDelayMaxMs);
  });
  executor.join();
  caller1.join();
}

TEST(ExecutionBarrierTest, test_two_callers_one_executor) {
  ExecutionBarrier execution_barrier;
  std::thread caller1([&]() {
    auto start = std::chrono::high_resolution_clock::now();
    execution_barrier.WaitForExecution();
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> elapsed_ms = end - start;
    EXPECT_NEAR(elapsed_ms.count(), kSleepTimeMs, 5);
  });
  std::thread caller2([&]() {
    auto start = std::chrono::high_resolution_clock::now();
    execution_barrier.WaitForExecution();
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> elapsed_ms = end - start;
    EXPECT_NEAR(elapsed_ms.count(), kSleepTimeMs, 5);
  });
  std::thread executor([&]() {
    std::this_thread::sleep_for(std::chrono::milliseconds(kSleepTimeMs));
    execution_barrier.NotifyFinished();
  });
  executor.join();
  caller1.join();
  caller2.join();
}
Loading