Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f47abc46 authored by Ajay Panicker's avatar Ajay Panicker
Browse files

Move hci_thread to a message loop and prevent thread from spinning

Before this patch, if there was a message on the command queue and
there were no command credits, the thread reactor would spin trying
to process the message on the command queue and would continue until
a credit was received. This led to a bug where upon switching users,
hci_thread would spin and try to use 100% of the CPU. This is fixed
by moving over to a message loop and queue system. The message loop
processes all the messages. If there aren't enough command credits,
command messages are deferred to the command queue and popped off
whenever more credits are aquired. The deferred queue has priority
to credits over recently posted messages.

Bug: 37733903
Test: Swap users with the real time scheduling patch applied, and
      general Bluetooth usage.
      TestTracker: 86249
Change-Id: Ib775e47f6d4810d3d7d8af5b3ba84adc4ada3da5
(cherry picked from commit 417ffed2)
parent aeebce2f
Loading
Loading
Loading
Loading
+77 −54
Original line number Diff line number Diff line
@@ -20,7 +20,12 @@

#include "hci_layer.h"

#include <base/bind.h>
#include <base/logging.h>
#include <base/run_loop.h>
#include <base/sequenced_task_runner.h>
#include <base/threading/thread.h>

#include <signal.h>
#include <string.h>
#include <sys/types.h>
@@ -80,13 +85,15 @@ static const packet_fragmenter_t* packet_fragmenter;

static future_t* startup_future;
static thread_t* thread;  // We own this
static base::MessageLoop* message_loop_ = nullptr;
static base::RunLoop* run_loop_ = nullptr;

static alarm_t* startup_timer;

// Outbound-related
static int command_credits = 1;
static fixed_queue_t* command_queue;
static fixed_queue_t* packet_queue;
static std::mutex command_credits_mutex;
static std::queue<base::Closure> command_queue;

// Inbound-related
static alarm_t* command_response_timer;
@@ -102,8 +109,10 @@ static waiting_command_t* get_waiting_command(command_opcode_t opcode);
static void event_finish_startup(void* context);
static void startup_timer_expired(void* context);

static void event_command_ready(fixed_queue_t* queue, void* context);
static void event_packet_ready(fixed_queue_t* queue, void* context);
static void enqueue_command(waiting_command_t* wait_entry);
static void event_command_ready(waiting_command_t* wait_entry);
static void enqueue_packet(void* packet);
static void event_packet_ready(void* packet);
static void command_timed_out(void* context);

static void update_command_response_timer(void);
@@ -117,7 +126,8 @@ static const packet_fragmenter_callbacks_t packet_fragmenter_callbacks = {
    transmit_fragment, dispatch_reassembled, fragmenter_transmit_finished};

void initialization_complete() {
  thread_post(thread, event_finish_startup, NULL);
  message_loop_->task_runner()->PostTask(
      FROM_HERE, base::Bind(&event_finish_startup, nullptr));
}

void hci_event_received(BT_HDR* packet) {
@@ -143,6 +153,21 @@ void sco_data_received(BT_HDR* packet) {

static future_t* hci_module_shut_down();

void message_loop_run(UNUSED_ATTR void* context) {
  message_loop_ = new base::MessageLoop();
  run_loop_ = new base::RunLoop();

  message_loop_->task_runner()->PostTask(FROM_HERE,
                                         base::Bind(&hci_initialize));
  run_loop_->Run();

  delete message_loop_;
  message_loop_ = nullptr;

  delete run_loop_;
  run_loop_ = nullptr;
}

static future_t* hci_module_start_up(void) {
  LOG_INFO(LOG_TAG, "%s", __func__);

@@ -174,18 +199,6 @@ static future_t* hci_module_start_up(void) {
    goto error;
  }

  command_queue = fixed_queue_new(SIZE_MAX);
  if (!command_queue) {
    LOG_ERROR(LOG_TAG, "%s unable to create pending command queue.", __func__);
    goto error;
  }

  packet_queue = fixed_queue_new(SIZE_MAX);
  if (!packet_queue) {
    LOG_ERROR(LOG_TAG, "%s unable to create pending packet queue.", __func__);
    goto error;
  }

  thread = thread_new("hci_thread");
  if (!thread) {
    LOG_ERROR(LOG_TAG, "%s unable to create thread.", __func__);
@@ -211,12 +224,7 @@ static future_t* hci_module_start_up(void) {

  packet_fragmenter->init(&packet_fragmenter_callbacks);

  fixed_queue_register_dequeue(command_queue, thread_get_reactor(thread),
                               event_command_ready, NULL);
  fixed_queue_register_dequeue(packet_queue, thread_get_reactor(thread),
                               event_packet_ready, NULL);

  hci_initialize();
  thread_post(thread, message_loop_run, NULL);

  LOG_DEBUG(LOG_TAG, "%s starting async portion", __func__);
  return local_startup_future;
@@ -238,6 +246,8 @@ static future_t* hci_module_shut_down() {
    startup_timer = NULL;
  }

  message_loop_->task_runner()->PostTask(FROM_HERE, run_loop_->QuitClosure());

  // Stop the thread to prevent Send() calls.
  if (thread) {
    thread_stop(thread);
@@ -247,10 +257,6 @@ static future_t* hci_module_shut_down() {
  // Close HCI to prevent callbacks.
  hci_close();

  fixed_queue_free(command_queue, osi_free);
  command_queue = NULL;
  fixed_queue_free(packet_queue, buffer_allocator->free);
  packet_queue = NULL;
  {
    std::lock_guard<std::recursive_mutex> lock(commands_pending_response_mutex);
    list_free(commands_pending_response);
@@ -294,7 +300,7 @@ static void transmit_command(BT_HDR* command,
  // in case the upper layer didn't already
  command->event = MSG_STACK_TO_HC_HCI_CMD;

  fixed_queue_enqueue(command_queue, wait_entry);
  enqueue_command(wait_entry);
}

static future_t* transmit_command_futured(BT_HDR* command) {
@@ -311,7 +317,7 @@ static future_t* transmit_command_futured(BT_HDR* command) {
  // in case the upper layer didn't already
  command->event = MSG_STACK_TO_HC_HCI_CMD;

  fixed_queue_enqueue(command_queue, wait_entry);
  enqueue_command(wait_entry);
  return future;
}

@@ -323,7 +329,7 @@ static void transmit_downward(data_dispatcher_type_t type, void* data) {
             "%s legacy transmit of command. Use transmit_command instead.",
             __func__);
  } else {
    fixed_queue_enqueue(packet_queue, data);
    enqueue_packet(data);
  }
}

@@ -346,18 +352,21 @@ static void startup_timer_expired(UNUSED_ATTR void* context) {
}

// Command/packet transmitting functions
static void enqueue_command(waiting_command_t* wait_entry) {
  base::Closure callback = base::Bind(&event_command_ready, wait_entry);

static void event_command_ready(fixed_queue_t* queue,
                                UNUSED_ATTR void* context) {
  std::lock_guard<std::mutex> lock(command_credits_mutex);
  if (command_credits > 0) {
    waiting_command_t* wait_entry =
        reinterpret_cast<waiting_command_t*>(fixed_queue_dequeue(queue));
    message_loop_->task_runner()->PostTask(FROM_HERE, std::move(callback));
    command_credits--;
  } else {
    command_queue.push(std::move(callback));
  }
}

    // Move it to the list of commands awaiting response
    {
      std::lock_guard<std::recursive_mutex> lock(
          commands_pending_response_mutex);
static void event_command_ready(waiting_command_t* wait_entry) {
  /// Move it to the list of commands awaiting response
  std::lock_guard<std::recursive_mutex> lock(commands_pending_response_mutex);
  wait_entry->timestamp = std::chrono::steady_clock::now();
  list_append(commands_pending_response, wait_entry);

@@ -366,14 +375,15 @@ static void event_command_ready(fixed_queue_t* queue,

  update_command_response_timer();
}
  }

static void enqueue_packet(void* packet) {
  message_loop_->task_runner()->PostTask(
      FROM_HERE, base::Bind(&event_packet_ready, packet));
}

static void event_packet_ready(fixed_queue_t* queue,
                               UNUSED_ATTR void* context) {
static void event_packet_ready(void* pkt) {
  // The queue may be the command queue or the packet queue, we don't care
  BT_HDR* packet = (BT_HDR*)fixed_queue_dequeue(queue);

  BT_HDR* packet = (BT_HDR*)pkt;
  packet_fragmenter->fragment_and_dispatch(packet);
}

@@ -428,6 +438,17 @@ static void command_timed_out(UNUSED_ATTR void* context) {
}

// Event/packet receiving functions
void process_command_credits(int credits) {
  std::lock_guard<std::mutex> lock(command_credits_mutex);

  command_credits = credits;
  while (command_credits > 0 && command_queue.size() > 0) {
    message_loop_->task_runner()->PostTask(FROM_HERE,
                                           std::move(command_queue.front()));
    command_queue.pop();
    command_credits--;
  }
}

// Returns true if the event was intercepted and should not proceed to
// higher layers. Also inspects an incoming event for interesting
@@ -436,19 +457,20 @@ static bool filter_incoming_event(BT_HDR* packet) {
  waiting_command_t* wait_entry = NULL;
  uint8_t* stream = packet->data;
  uint8_t event_code;
  int credits = 0;
  command_opcode_t opcode;

  STREAM_TO_UINT8(event_code, stream);
  STREAM_SKIP_UINT8(stream);  // Skip the parameter total length field

  if (event_code == HCI_COMMAND_COMPLETE_EVT) {
    STREAM_TO_UINT8(command_credits, stream);
    STREAM_TO_UINT8(credits, stream);
    STREAM_TO_UINT16(opcode, stream);

    process_command_credits(credits);

    wait_entry = get_waiting_command(opcode);
    if (!wait_entry) {
      // TODO: Currently command_credits aren't parsed at all; here or in higher
      // layers...
      if (opcode != HCI_COMMAND_NONE) {
        LOG_WARN(LOG_TAG,
                 "%s command complete event with no matching command (opcode: "
@@ -468,12 +490,13 @@ static bool filter_incoming_event(BT_HDR* packet) {
  } else if (event_code == HCI_COMMAND_STATUS_EVT) {
    uint8_t status;
    STREAM_TO_UINT8(status, stream);
    STREAM_TO_UINT8(command_credits, stream);
    STREAM_TO_UINT8(credits, stream);
    STREAM_TO_UINT16(opcode, stream);

    process_command_credits(credits);

    // If a command generates a command status event, it won't be getting a
    // command complete event

    wait_entry = get_waiting_command(opcode);
    if (!wait_entry) {
      LOG_WARN(