Loading system/hci/src/hci_layer.cc +77 −54 Original line number Diff line number Diff line Loading @@ -20,7 +20,12 @@ #include "hci_layer.h" #include <base/bind.h> #include <base/logging.h> #include <base/run_loop.h> #include <base/sequenced_task_runner.h> #include <base/threading/thread.h> #include <signal.h> #include <string.h> #include <sys/types.h> Loading Loading @@ -80,13 +85,15 @@ static const packet_fragmenter_t* packet_fragmenter; static future_t* startup_future; static thread_t* thread; // We own this static base::MessageLoop* message_loop_ = nullptr; static base::RunLoop* run_loop_ = nullptr; static alarm_t* startup_timer; // Outbound-related static int command_credits = 1; static fixed_queue_t* command_queue; static fixed_queue_t* packet_queue; static std::mutex command_credits_mutex; static std::queue<base::Closure> command_queue; // Inbound-related static alarm_t* command_response_timer; Loading @@ -102,8 +109,10 @@ static waiting_command_t* get_waiting_command(command_opcode_t opcode); static void event_finish_startup(void* context); static void startup_timer_expired(void* context); static void event_command_ready(fixed_queue_t* queue, void* context); static void event_packet_ready(fixed_queue_t* queue, void* context); static void enqueue_command(waiting_command_t* wait_entry); static void event_command_ready(waiting_command_t* wait_entry); static void enqueue_packet(void* packet); static void event_packet_ready(void* packet); static void command_timed_out(void* context); static void update_command_response_timer(void); Loading @@ -117,7 +126,8 @@ static const packet_fragmenter_callbacks_t packet_fragmenter_callbacks = { transmit_fragment, dispatch_reassembled, fragmenter_transmit_finished}; void initialization_complete() { thread_post(thread, event_finish_startup, NULL); message_loop_->task_runner()->PostTask( FROM_HERE, base::Bind(&event_finish_startup, nullptr)); } void hci_event_received(BT_HDR* packet) { Loading @@ -143,6 +153,21 @@ void sco_data_received(BT_HDR* packet) { static future_t* hci_module_shut_down(); void message_loop_run(UNUSED_ATTR void* context) { message_loop_ = new base::MessageLoop(); run_loop_ = new base::RunLoop(); message_loop_->task_runner()->PostTask(FROM_HERE, base::Bind(&hci_initialize)); run_loop_->Run(); delete message_loop_; message_loop_ = nullptr; delete run_loop_; run_loop_ = nullptr; } static future_t* hci_module_start_up(void) { LOG_INFO(LOG_TAG, "%s", __func__); Loading Loading @@ -174,18 +199,6 @@ static future_t* hci_module_start_up(void) { goto error; } command_queue = fixed_queue_new(SIZE_MAX); if (!command_queue) { LOG_ERROR(LOG_TAG, "%s unable to create pending command queue.", __func__); goto error; } packet_queue = fixed_queue_new(SIZE_MAX); if (!packet_queue) { LOG_ERROR(LOG_TAG, "%s unable to create pending packet queue.", __func__); goto error; } thread = thread_new("hci_thread"); if (!thread) { LOG_ERROR(LOG_TAG, "%s unable to create thread.", __func__); Loading @@ -211,12 +224,7 @@ static future_t* hci_module_start_up(void) { packet_fragmenter->init(&packet_fragmenter_callbacks); fixed_queue_register_dequeue(command_queue, thread_get_reactor(thread), event_command_ready, NULL); fixed_queue_register_dequeue(packet_queue, thread_get_reactor(thread), event_packet_ready, NULL); hci_initialize(); thread_post(thread, message_loop_run, NULL); LOG_DEBUG(LOG_TAG, "%s starting async portion", __func__); return local_startup_future; Loading @@ -238,6 +246,8 @@ static future_t* hci_module_shut_down() { startup_timer = NULL; } message_loop_->task_runner()->PostTask(FROM_HERE, run_loop_->QuitClosure()); // Stop the thread to prevent Send() calls. if (thread) { thread_stop(thread); Loading @@ -247,10 +257,6 @@ static future_t* hci_module_shut_down() { // Close HCI to prevent callbacks. hci_close(); fixed_queue_free(command_queue, osi_free); command_queue = NULL; fixed_queue_free(packet_queue, buffer_allocator->free); packet_queue = NULL; { std::lock_guard<std::recursive_mutex> lock(commands_pending_response_mutex); list_free(commands_pending_response); Loading Loading @@ -294,7 +300,7 @@ static void transmit_command(BT_HDR* command, // in case the upper layer didn't already command->event = MSG_STACK_TO_HC_HCI_CMD; fixed_queue_enqueue(command_queue, wait_entry); enqueue_command(wait_entry); } static future_t* transmit_command_futured(BT_HDR* command) { Loading @@ -311,7 +317,7 @@ static future_t* transmit_command_futured(BT_HDR* command) { // in case the upper layer didn't already command->event = MSG_STACK_TO_HC_HCI_CMD; fixed_queue_enqueue(command_queue, wait_entry); enqueue_command(wait_entry); return future; } Loading @@ -323,7 +329,7 @@ static void transmit_downward(data_dispatcher_type_t type, void* data) { "%s legacy transmit of command. Use transmit_command instead.", __func__); } else { fixed_queue_enqueue(packet_queue, data); enqueue_packet(data); } } Loading @@ -346,18 +352,21 @@ static void startup_timer_expired(UNUSED_ATTR void* context) { } // Command/packet transmitting functions static void enqueue_command(waiting_command_t* wait_entry) { base::Closure callback = base::Bind(&event_command_ready, wait_entry); static void event_command_ready(fixed_queue_t* queue, UNUSED_ATTR void* context) { std::lock_guard<std::mutex> lock(command_credits_mutex); if (command_credits > 0) { waiting_command_t* wait_entry = reinterpret_cast<waiting_command_t*>(fixed_queue_dequeue(queue)); message_loop_->task_runner()->PostTask(FROM_HERE, std::move(callback)); command_credits--; } else { command_queue.push(std::move(callback)); } } // Move it to the list of commands awaiting response { std::lock_guard<std::recursive_mutex> lock( commands_pending_response_mutex); static void event_command_ready(waiting_command_t* wait_entry) { /// Move it to the list of commands awaiting response std::lock_guard<std::recursive_mutex> lock(commands_pending_response_mutex); wait_entry->timestamp = std::chrono::steady_clock::now(); list_append(commands_pending_response, wait_entry); Loading @@ -366,14 +375,15 @@ static void event_command_ready(fixed_queue_t* queue, update_command_response_timer(); } } static void enqueue_packet(void* packet) { message_loop_->task_runner()->PostTask( FROM_HERE, base::Bind(&event_packet_ready, packet)); } static void event_packet_ready(fixed_queue_t* queue, UNUSED_ATTR void* context) { static void event_packet_ready(void* pkt) { // The queue may be the command queue or the packet queue, we don't care BT_HDR* packet = (BT_HDR*)fixed_queue_dequeue(queue); BT_HDR* packet = (BT_HDR*)pkt; packet_fragmenter->fragment_and_dispatch(packet); } Loading Loading @@ -428,6 +438,17 @@ static void command_timed_out(UNUSED_ATTR void* context) { } // Event/packet receiving functions void process_command_credits(int credits) { std::lock_guard<std::mutex> lock(command_credits_mutex); command_credits = credits; while (command_credits > 0 && command_queue.size() > 0) { message_loop_->task_runner()->PostTask(FROM_HERE, std::move(command_queue.front())); command_queue.pop(); command_credits--; } } // Returns true if the event was intercepted and should not proceed to // higher layers. Also inspects an incoming event for interesting Loading @@ -436,19 +457,20 @@ static bool filter_incoming_event(BT_HDR* packet) { waiting_command_t* wait_entry = NULL; uint8_t* stream = packet->data; uint8_t event_code; int credits = 0; command_opcode_t opcode; STREAM_TO_UINT8(event_code, stream); STREAM_SKIP_UINT8(stream); // Skip the parameter total length field if (event_code == HCI_COMMAND_COMPLETE_EVT) { STREAM_TO_UINT8(command_credits, stream); STREAM_TO_UINT8(credits, stream); STREAM_TO_UINT16(opcode, stream); process_command_credits(credits); wait_entry = get_waiting_command(opcode); if (!wait_entry) { // TODO: Currently command_credits aren't parsed at all; here or in higher // layers... if (opcode != HCI_COMMAND_NONE) { LOG_WARN(LOG_TAG, "%s command complete event with no matching command (opcode: " Loading @@ -468,12 +490,13 @@ static bool filter_incoming_event(BT_HDR* packet) { } else if (event_code == HCI_COMMAND_STATUS_EVT) { uint8_t status; STREAM_TO_UINT8(status, stream); STREAM_TO_UINT8(command_credits, stream); STREAM_TO_UINT8(credits, stream); STREAM_TO_UINT16(opcode, stream); process_command_credits(credits); // If a command generates a command status event, it won't be getting a // command complete event wait_entry = get_waiting_command(opcode); if (!wait_entry) { LOG_WARN( Loading Loading
system/hci/src/hci_layer.cc +77 −54 Original line number Diff line number Diff line Loading @@ -20,7 +20,12 @@ #include "hci_layer.h" #include <base/bind.h> #include <base/logging.h> #include <base/run_loop.h> #include <base/sequenced_task_runner.h> #include <base/threading/thread.h> #include <signal.h> #include <string.h> #include <sys/types.h> Loading Loading @@ -80,13 +85,15 @@ static const packet_fragmenter_t* packet_fragmenter; static future_t* startup_future; static thread_t* thread; // We own this static base::MessageLoop* message_loop_ = nullptr; static base::RunLoop* run_loop_ = nullptr; static alarm_t* startup_timer; // Outbound-related static int command_credits = 1; static fixed_queue_t* command_queue; static fixed_queue_t* packet_queue; static std::mutex command_credits_mutex; static std::queue<base::Closure> command_queue; // Inbound-related static alarm_t* command_response_timer; Loading @@ -102,8 +109,10 @@ static waiting_command_t* get_waiting_command(command_opcode_t opcode); static void event_finish_startup(void* context); static void startup_timer_expired(void* context); static void event_command_ready(fixed_queue_t* queue, void* context); static void event_packet_ready(fixed_queue_t* queue, void* context); static void enqueue_command(waiting_command_t* wait_entry); static void event_command_ready(waiting_command_t* wait_entry); static void enqueue_packet(void* packet); static void event_packet_ready(void* packet); static void command_timed_out(void* context); static void update_command_response_timer(void); Loading @@ -117,7 +126,8 @@ static const packet_fragmenter_callbacks_t packet_fragmenter_callbacks = { transmit_fragment, dispatch_reassembled, fragmenter_transmit_finished}; void initialization_complete() { thread_post(thread, event_finish_startup, NULL); message_loop_->task_runner()->PostTask( FROM_HERE, base::Bind(&event_finish_startup, nullptr)); } void hci_event_received(BT_HDR* packet) { Loading @@ -143,6 +153,21 @@ void sco_data_received(BT_HDR* packet) { static future_t* hci_module_shut_down(); void message_loop_run(UNUSED_ATTR void* context) { message_loop_ = new base::MessageLoop(); run_loop_ = new base::RunLoop(); message_loop_->task_runner()->PostTask(FROM_HERE, base::Bind(&hci_initialize)); run_loop_->Run(); delete message_loop_; message_loop_ = nullptr; delete run_loop_; run_loop_ = nullptr; } static future_t* hci_module_start_up(void) { LOG_INFO(LOG_TAG, "%s", __func__); Loading Loading @@ -174,18 +199,6 @@ static future_t* hci_module_start_up(void) { goto error; } command_queue = fixed_queue_new(SIZE_MAX); if (!command_queue) { LOG_ERROR(LOG_TAG, "%s unable to create pending command queue.", __func__); goto error; } packet_queue = fixed_queue_new(SIZE_MAX); if (!packet_queue) { LOG_ERROR(LOG_TAG, "%s unable to create pending packet queue.", __func__); goto error; } thread = thread_new("hci_thread"); if (!thread) { LOG_ERROR(LOG_TAG, "%s unable to create thread.", __func__); Loading @@ -211,12 +224,7 @@ static future_t* hci_module_start_up(void) { packet_fragmenter->init(&packet_fragmenter_callbacks); fixed_queue_register_dequeue(command_queue, thread_get_reactor(thread), event_command_ready, NULL); fixed_queue_register_dequeue(packet_queue, thread_get_reactor(thread), event_packet_ready, NULL); hci_initialize(); thread_post(thread, message_loop_run, NULL); LOG_DEBUG(LOG_TAG, "%s starting async portion", __func__); return local_startup_future; Loading @@ -238,6 +246,8 @@ static future_t* hci_module_shut_down() { startup_timer = NULL; } message_loop_->task_runner()->PostTask(FROM_HERE, run_loop_->QuitClosure()); // Stop the thread to prevent Send() calls. if (thread) { thread_stop(thread); Loading @@ -247,10 +257,6 @@ static future_t* hci_module_shut_down() { // Close HCI to prevent callbacks. hci_close(); fixed_queue_free(command_queue, osi_free); command_queue = NULL; fixed_queue_free(packet_queue, buffer_allocator->free); packet_queue = NULL; { std::lock_guard<std::recursive_mutex> lock(commands_pending_response_mutex); list_free(commands_pending_response); Loading Loading @@ -294,7 +300,7 @@ static void transmit_command(BT_HDR* command, // in case the upper layer didn't already command->event = MSG_STACK_TO_HC_HCI_CMD; fixed_queue_enqueue(command_queue, wait_entry); enqueue_command(wait_entry); } static future_t* transmit_command_futured(BT_HDR* command) { Loading @@ -311,7 +317,7 @@ static future_t* transmit_command_futured(BT_HDR* command) { // in case the upper layer didn't already command->event = MSG_STACK_TO_HC_HCI_CMD; fixed_queue_enqueue(command_queue, wait_entry); enqueue_command(wait_entry); return future; } Loading @@ -323,7 +329,7 @@ static void transmit_downward(data_dispatcher_type_t type, void* data) { "%s legacy transmit of command. Use transmit_command instead.", __func__); } else { fixed_queue_enqueue(packet_queue, data); enqueue_packet(data); } } Loading @@ -346,18 +352,21 @@ static void startup_timer_expired(UNUSED_ATTR void* context) { } // Command/packet transmitting functions static void enqueue_command(waiting_command_t* wait_entry) { base::Closure callback = base::Bind(&event_command_ready, wait_entry); static void event_command_ready(fixed_queue_t* queue, UNUSED_ATTR void* context) { std::lock_guard<std::mutex> lock(command_credits_mutex); if (command_credits > 0) { waiting_command_t* wait_entry = reinterpret_cast<waiting_command_t*>(fixed_queue_dequeue(queue)); message_loop_->task_runner()->PostTask(FROM_HERE, std::move(callback)); command_credits--; } else { command_queue.push(std::move(callback)); } } // Move it to the list of commands awaiting response { std::lock_guard<std::recursive_mutex> lock( commands_pending_response_mutex); static void event_command_ready(waiting_command_t* wait_entry) { /// Move it to the list of commands awaiting response std::lock_guard<std::recursive_mutex> lock(commands_pending_response_mutex); wait_entry->timestamp = std::chrono::steady_clock::now(); list_append(commands_pending_response, wait_entry); Loading @@ -366,14 +375,15 @@ static void event_command_ready(fixed_queue_t* queue, update_command_response_timer(); } } static void enqueue_packet(void* packet) { message_loop_->task_runner()->PostTask( FROM_HERE, base::Bind(&event_packet_ready, packet)); } static void event_packet_ready(fixed_queue_t* queue, UNUSED_ATTR void* context) { static void event_packet_ready(void* pkt) { // The queue may be the command queue or the packet queue, we don't care BT_HDR* packet = (BT_HDR*)fixed_queue_dequeue(queue); BT_HDR* packet = (BT_HDR*)pkt; packet_fragmenter->fragment_and_dispatch(packet); } Loading Loading @@ -428,6 +438,17 @@ static void command_timed_out(UNUSED_ATTR void* context) { } // Event/packet receiving functions void process_command_credits(int credits) { std::lock_guard<std::mutex> lock(command_credits_mutex); command_credits = credits; while (command_credits > 0 && command_queue.size() > 0) { message_loop_->task_runner()->PostTask(FROM_HERE, std::move(command_queue.front())); command_queue.pop(); command_credits--; } } // Returns true if the event was intercepted and should not proceed to // higher layers. Also inspects an incoming event for interesting Loading @@ -436,19 +457,20 @@ static bool filter_incoming_event(BT_HDR* packet) { waiting_command_t* wait_entry = NULL; uint8_t* stream = packet->data; uint8_t event_code; int credits = 0; command_opcode_t opcode; STREAM_TO_UINT8(event_code, stream); STREAM_SKIP_UINT8(stream); // Skip the parameter total length field if (event_code == HCI_COMMAND_COMPLETE_EVT) { STREAM_TO_UINT8(command_credits, stream); STREAM_TO_UINT8(credits, stream); STREAM_TO_UINT16(opcode, stream); process_command_credits(credits); wait_entry = get_waiting_command(opcode); if (!wait_entry) { // TODO: Currently command_credits aren't parsed at all; here or in higher // layers... if (opcode != HCI_COMMAND_NONE) { LOG_WARN(LOG_TAG, "%s command complete event with no matching command (opcode: " Loading @@ -468,12 +490,13 @@ static bool filter_incoming_event(BT_HDR* packet) { } else if (event_code == HCI_COMMAND_STATUS_EVT) { uint8_t status; STREAM_TO_UINT8(status, stream); STREAM_TO_UINT8(command_credits, stream); STREAM_TO_UINT8(credits, stream); STREAM_TO_UINT16(opcode, stream); process_command_credits(credits); // If a command generates a command status event, it won't be getting a // command complete event wait_entry = get_waiting_command(opcode); if (!wait_entry) { LOG_WARN( Loading