Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 67972729 authored by Automerger Merge Worker's avatar Automerger Merge Worker
Browse files

HCI: Synchronize ACL Queue am: 518c3390 am: b9c5b2de

Change-Id: I7217a13c58970f22c82d91a86c3ee63e0a5a5cb1
parents 64930ac3 b9c5b2de
Loading
Loading
Loading
Loading
+27 −12
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

#include "hci/acl_manager.h"

#include <atomic>
#include <future>
#include <queue>
#include <set>
@@ -81,16 +82,22 @@ struct AclManager::acl_connection {
  uint16_t number_of_sent_packets_ = 0;
  PacketViewForRecombination recombination_stage_{std::make_shared<std::vector<uint8_t>>()};
  int remaining_sdu_continuation_packet_size_ = 0;
  bool enqueue_registered_ = false;
  std::atomic_bool enqueue_registered_ = false;
  std::queue<packet::PacketView<kLittleEndian>> incoming_queue_;

  ~acl_connection() {
    if (enqueue_registered_.exchange(false)) {
      queue_->GetDownEnd()->UnregisterEnqueue();
    }
    queue_.reset();
  }

  // Invoked from some external Queue Reactable context
  std::unique_ptr<packet::PacketView<kLittleEndian>> on_incoming_data_ready() {
    auto packet = incoming_queue_.front();
    incoming_queue_.pop();
    if (incoming_queue_.empty()) {
      auto queue_end = queue_->GetDownEnd();
      queue_end->UnregisterEnqueue();
      enqueue_registered_ = false;
    if (incoming_queue_.empty() && enqueue_registered_.exchange(false)) {
      queue_->GetDownEnd()->UnregisterEnqueue();
    }
    return std::make_unique<PacketView<kLittleEndian>>(packet);
  }
@@ -136,10 +143,8 @@ struct AclManager::acl_connection {
    }

    incoming_queue_.push(payload);
    if (!enqueue_registered_) {
      enqueue_registered_ = true;
      auto queue_end = queue_->GetDownEnd();
      queue_end->RegisterEnqueue(
    if (!enqueue_registered_.exchange(true)) {
      queue_->GetDownEnd()->RegisterEnqueue(
          handler_, common::Bind(&AclManager::acl_connection::on_incoming_data_ready, common::Unretained(this)));
    }
  }
@@ -224,6 +229,9 @@ struct AclManager::impl {
    hci_layer_->UnregisterEventHandler(EventCode::READ_REMOTE_EXTENDED_FEATURES_COMPLETE);
    hci_queue_end_->UnregisterDequeue();
    unregister_all_connections();
    if (enqueue_registered_.exchange(false)) {
      hci_queue_end_->UnregisterEnqueue();
    }
    controller_->UnregisterCompletedAclPacketsCallback();
    acl_connections_.clear();
    hci_queue_end_ = nullptr;
@@ -312,14 +320,19 @@ struct AclManager::impl {
  }

  void send_next_fragment() {
    if (!enqueue_registered_.exchange(true)) {
      hci_queue_end_->RegisterEnqueue(handler_,
                                      common::Bind(&impl::handle_enqueue_next_fragment, common::Unretained(this)));
    }
  }

  // Invoked from some external Queue Reactable context 1
  std::unique_ptr<AclPacketBuilder> handle_enqueue_next_fragment() {
    ASSERT(acl_packet_credits_ > 0);
    if (acl_packet_credits_ == 1 || fragments_to_send_.size() == 1) {
      if (enqueue_registered_.exchange(false)) {
        hci_queue_end_->UnregisterEnqueue();
      }
      if (fragments_to_send_.size() == 1) {
        handler_->Post(common::BindOnce(&impl::start_round_robin, common::Unretained(this)));
      }
@@ -331,6 +344,7 @@ struct AclManager::impl {
    return std::unique_ptr<AclPacketBuilder>(raw_pointer);
  }

  // Invoked from some external Queue Reactable context 2
  void dequeue_and_route_acl_packet_to_connection() {
    auto packet = hci_queue_end_->TryDequeue();
    ASSERT(packet != nullptr);
@@ -1909,6 +1923,7 @@ struct AclManager::impl {
  AclManagerCallbacks* le_acl_manager_client_callbacks_ = nullptr;
  os::Handler* le_acl_manager_client_handler_ = nullptr;
  common::BidiQueueEnd<AclPacketBuilder, AclPacketView>* hci_queue_end_ = nullptr;
  std::atomic_bool enqueue_registered_ = false;
  std::map<uint16_t, AclManager::acl_connection> acl_connections_;
  std::set<Address> connecting_;
  std::set<AddressWithType> connecting_le_;
+11 −0
Original line number Diff line number Diff line
@@ -140,12 +140,23 @@ struct Controller::impl {

  void RegisterCompletedAclPacketsCallback(Callback<void(uint16_t /* handle */, uint16_t /* packets */)> cb,
                                           Handler* handler) {
    module_.GetHandler()->Post(common::BindOnce(&impl::register_completed_acl_packets_callback,
                                                common::Unretained(this), cb, common::Unretained(handler)));
  }

  void register_completed_acl_packets_callback(Callback<void(uint16_t /* handle */, uint16_t /* packets */)> cb,
                                               Handler* handler) {
    ASSERT(acl_credits_handler_ == nullptr);
    acl_credits_callback_ = cb;
    acl_credits_handler_ = handler;
  }

  void UnregisterCompletedAclPacketsCallback() {
    module_.GetHandler()->Post(
        common::BindOnce(&impl::unregister_completed_acl_packets_callback, common::Unretained(this)));
  }

  void unregister_completed_acl_packets_callback() {
    ASSERT(acl_credits_handler_ != nullptr);
    acl_credits_callback_ = {};
    acl_credits_handler_ = nullptr;
+5 −1
Original line number Diff line number Diff line
@@ -116,12 +116,16 @@ class AclManagerFacadeService : public AclManagerFacade::Service,
        LOG_ERROR("Invalid handle");
        return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Invalid handle");
      } else {
        // TODO: This is unsafe because connection may have gone
        connection->second->GetAclQueueEnd()->RegisterEnqueue(
            facade_handler_, common::Bind(&AclManagerFacadeService::enqueue_packet, common::Unretained(this),
                                          common::Unretained(request), common::Passed(std::move(promise))));
      }
    }
    future.wait();
    auto status = future.wait_for(std::chrono::milliseconds(1000));
    if (status != std::future_status::ready) {
      return ::grpc::Status(::grpc::StatusCode::RESOURCE_EXHAUSTED, "Can't send packet");
    }
    return ::grpc::Status::OK;
  }

+2 −2
Original line number Diff line number Diff line
@@ -172,8 +172,6 @@ class LeScanningInterfaceImpl : public LeScanningInterface {
struct HciLayer::impl : public hal::HciHalCallbacks {
  impl(HciLayer& module) : hal_(nullptr), module_(module) {}

  ~impl() {}

  void Start(hal::HciHal* hal) {
    hal_ = hal;
    hci_timeout_alarm_ = new Alarm(module_.GetHandler());
@@ -292,6 +290,7 @@ struct HciLayer::impl : public hal::HciHalCallbacks {
    subevent_handlers_[subevent_code].handler->Post(BindOnce(registered_handler, meta_event_view));
  }

  // Invoked from HAL thread
  void hciEventReceived(hal::HciPacket event_bytes) override {
    auto packet = packet::PacketView<packet::kLittleEndian>(std::make_shared<std::vector<uint8_t>>(event_bytes));
    EventPacketView event = EventPacketView::Create(packet);
@@ -308,6 +307,7 @@ struct HciLayer::impl : public hal::HciHalCallbacks {
    event_handlers_[event_code].handler->Post(BindOnce(registered_handler, std::move(event)));
  }

  // From HAL thread
  void aclDataReceived(hal::HciPacket data_bytes) override {
    auto packet =
        packet::PacketView<packet::kLittleEndian>(std::make_shared<std::vector<uint8_t>>(std::move(data_bytes)));