Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 8bb04bb8 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

Reactor-based threading model

* Use Reactor+Thread (common/thread.h) to replace existing
  libchrome-based message_loop_thread
* Use Handler to implement multiple message queue per thread, by using
  kernel-based eventfd

Test: run unit test, and run benchmark
Change-Id: Idd2e4ef99fb9a7b2c0956de0e372c67a1098f1b6
parent f8e86f4c
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -23,3 +23,6 @@ BasedOnStyle: Google
CommentPragmas: NOLINT:.*
DerivePointerAlignment: false
ColumnLimit: 120
AllowShortFunctionsOnASingleLine: Empty
ConstructorInitializerAllOnOneLineOrOnePerLine: false
BreakConstructorInitializers: BeforeColon
+1 −0
Original line number Diff line number Diff line
@@ -129,6 +129,7 @@ cc_test {
        "test/gatt/database_test.cc",
    ],
    shared_libs: [
        "libcrypto",
        "liblog",
        "libprotobuf-cpp-lite",
    ],
+24 −4
Original line number Diff line number Diff line
cc_library_static {
    name: "libbt-common",
    defaults: ["fluoride_defaults"],
    defaults: [
        "fluoride_defaults",
        "clang_file_coverage",
    ],
    host_supported: true,
    include_dirs: [
        "packages/modules/Bluetooth/system",
@@ -8,10 +11,13 @@ cc_library_static {
    ],
    srcs: [
        "address_obfuscator.cc",
        "handler.cc",
        "message_loop_thread.cc",
        "metrics.cc",
        "once_timer.cc",
        "reactor.cc",
        "repeating_timer.cc",
        "thread.cc",
        "time_util.cc",
    ],
    shared_libs: [
@@ -25,7 +31,10 @@ cc_library_static {
cc_test {
    name: "bluetooth_test_common",
    test_suites: ["device-tests"],
    defaults: ["fluoride_defaults"],
    defaults: [
        "fluoride_defaults",
        "clang_coverage_bin",
    ],
    host_supported: true,
    include_dirs: [
        "packages/modules/Bluetooth/system",
@@ -33,12 +42,15 @@ cc_test {
    ],
    srcs : [
        "address_obfuscator_unittest.cc",
        "handler_unittest.cc",
        "leaky_bonded_queue_unittest.cc",
        "message_loop_thread_unittest.cc",
        "metrics_unittest.cc",
        "once_timer_unittest.cc",
        "reactor_unittest.cc",
        "repeating_timer_unittest.cc",
        "state_machine_unittest.cc",
        "thread_unittest.cc",
        "time_util_unittest.cc",
        "id_generator_unittest.cc",
    ],
@@ -77,12 +89,16 @@ cc_test {

cc_benchmark {
    name: "bluetooth_benchmark_thread_performance",
    defaults: ["fluoride_defaults"],
    defaults: [
        "fluoride_defaults",
    ],
    host_supported: true,
    include_dirs: ["packages/modules/Bluetooth/system"],
    srcs: [
        "benchmark/thread_performance_benchmark.cc",
    ],
    shared_libs: [
        "libcrypto",
        "liblog",
    ],
    static_libs: [
@@ -93,13 +109,17 @@ cc_benchmark {

cc_benchmark {
    name: "bluetooth_benchmark_timer_performance",
    defaults: ["fluoride_defaults"],
    defaults: [
        "fluoride_defaults",
    ],
    host_supported: false,
    include_dirs: ["packages/modules/Bluetooth/system"],
    srcs: [
        "benchmark/timer_performance_benchmark.cc",
    ],
    shared_libs: [
        "liblog",
        "libcrypto",
        "libprotobuf-cpp-lite",
        "libcutils",
    ],
+52 −0
Original line number Diff line number Diff line
@@ -23,12 +23,16 @@
#include <memory>
#include <thread>

#include "common/handler.h"
#include "common/message_loop_thread.h"
#include "common/thread.h"
#include "osi/include/fixed_queue.h"
#include "osi/include/thread.h"

using ::benchmark::State;
using bluetooth::common::Handler;
using bluetooth::common::MessageLoopThread;
using bluetooth::common::Thread;

#define NUM_MESSAGES_TO_SEND 100000

@@ -419,6 +423,54 @@ BENCHMARK_F(BM_LibChromeThread, sequential_execution)(State& state) {
  }
};

class BM_ReactorThread : public BM_ThreadPerformance {
 protected:
  void SetUp(State& st) override {
    BM_ThreadPerformance::SetUp(st);
    std::future<void> set_up_future = set_up_promise_->get_future();
    thread_ = new Thread("BM_ReactorThread thread", Thread::Priority::NORMAL);
    handler_ = new Handler(thread_);
    handler_->Post([this]() { set_up_promise_->set_value(); });
    set_up_future.wait();
  }

  void TearDown(State& st) override {
    delete handler_;
    handler_ = nullptr;
    thread_->Stop();
    delete thread_;
    thread_ = nullptr;
    BM_ThreadPerformance::TearDown(st);
  }

  Thread* thread_ = nullptr;
  Handler* handler_ = nullptr;
};

BENCHMARK_F(BM_ReactorThread, batch_enque_dequeue)(State& state) {
  for (auto _ : state) {
    g_counter = 0;
    g_counter_promise = std::make_unique<std::promise<void>>();
    std::future<void> counter_future = g_counter_promise->get_future();
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter);
      handler_->Post([this]() { callback_batch(bt_msg_queue_, nullptr); });
    }
    counter_future.wait();
  }
};

BENCHMARK_F(BM_ReactorThread, sequential_execution)(State& state) {
  for (auto _ : state) {
    for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
      g_counter_promise = std::make_unique<std::promise<void>>();
      std::future<void> counter_future = g_counter_promise->get_future();
      handler_->Post([]() { callback_sequential(nullptr); });
      counter_future.wait();
    }
  }
};

int main(int argc, char** argv) {
  // Disable LOG() output from libchrome
  logging::LoggingSettings log_settings;
+87 −0
Original line number Diff line number Diff line
/*
 * Copyright 2019 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "handler.h"

#include <sys/eventfd.h>
#include <cstring>

#include "base/logging.h"

#include "reactor.h"
#include "utils.h"

#ifndef EFD_SEMAPHORE
#define EFD_SEMAPHORE 1
#endif

namespace bluetooth {
namespace common {

Handler::Handler(Thread* thread)
  : thread_(thread),
    fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) {
  CHECK_NE(fd_, -1) << __func__ << ": cannot create eventfd: " << strerror(errno);

  reactable_ = thread_->GetReactor()->Register(fd_, [this] { this->handle_next_event(); }, nullptr);
}

Handler::~Handler() {
  thread_->GetReactor()->Unregister(reactable_);
  reactable_ = nullptr;

  int close_status;
  RUN_NO_INTR(close_status = close(fd_));
  CHECK_NE(close_status, -1) << __func__ << ": cannot close eventfd: " << strerror(errno);
}

void Handler::Post(Closure closure) {
  {
    std::lock_guard<std::mutex> lock(mutex_);
    tasks_.emplace(std::move(closure));
  }
  uint64_t val = 1;
  auto write_result = eventfd_write(fd_, val);
  CHECK_NE(write_result, -1) << __func__ << ": failed to write: " << strerror(errno);
}

void Handler::Clear() {
  std::lock_guard<std::mutex> lock(mutex_);

  std::queue<Closure> empty;
  std::swap(tasks_, empty);

  uint64_t val;
  while (eventfd_read(fd_, &val) == 0) {
  }
}

void Handler::handle_next_event() {
  Closure closure;
  uint64_t val = 0;
  auto read_result = eventfd_read(fd_, &val);
  CHECK_NE(read_result, -1) << __func__ << ": failed to read fd: " << strerror(errno);

  {
    std::lock_guard<std::mutex> lock(mutex_);
    closure = std::move(tasks_.front());
    tasks_.pop();
  }
  closure();
}

}  // namespace common
}  // namespace bluetooth
Loading