Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d783352f authored by Myles Watson's avatar Myles Watson
Browse files

AclManager: Create a shared assembler

Bug: 145832107
Test: ./cert/run --host && bluetooth_test_gd
Change-Id: I7cfcf5f7860cb00473f84696670a2c8f0e908a7d
parent c5ca7d3a
Loading
Loading
Loading
Loading
+39 −100
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@
#include "hci/acl_manager.h"

#include <atomic>
#include <future>
#include <queue>
#include <set>
#include <utility>
@@ -67,41 +66,29 @@ uint16_t GetL2capPduSize(AclPacketView packet) {

}  // namespace

struct AclManager::acl_connection {
  acl_connection(AddressWithType address_with_type, os::Handler* handler)
      : address_with_type_(address_with_type), handler_(handler) {}
struct assembler {
  assembler(AddressWithType address_with_type, AclConnection::QueueDownEnd* down_end, os::Handler* handler)
      : address_with_type_(address_with_type), down_end_(down_end), handler_(handler) {}
  AddressWithType address_with_type_;
  AclConnection::QueueDownEnd* down_end_;
  os::Handler* handler_;
  std::unique_ptr<AclConnection::Queue> queue_ = std::make_unique<AclConnection::Queue>(10);
  bool is_disconnected_ = false;
  ErrorCode disconnect_reason_;
  os::Handler* command_complete_handler_ = nullptr;
  os::Handler* disconnect_handler_ = nullptr;
  ConnectionManagementCallbacks* command_complete_callbacks_ = nullptr;
  os::Handler* le_command_complete_handler_ = nullptr;
  LeConnectionManagementCallbacks* le_command_complete_callbacks_ = nullptr;
  common::OnceCallback<void(ErrorCode)> on_disconnect_callback_;
  // For LE Connection parameter update from L2CAP
  common::OnceCallback<void(ErrorCode)> on_connection_update_complete_callback_;
  os::Handler* on_connection_update_complete_callback_handler_ = nullptr;
  PacketViewForRecombination recombination_stage_{std::make_shared<std::vector<uint8_t>>()};
  int remaining_sdu_continuation_packet_size_ = 0;
  std::atomic_bool enqueue_registered_ = false;
  std::shared_ptr<std::atomic_bool> enqueue_registered_ = std::make_shared<std::atomic_bool>(false);
  std::queue<packet::PacketView<kLittleEndian>> incoming_queue_;

  ~acl_connection() {
    if (enqueue_registered_.exchange(false)) {
      queue_->GetDownEnd()->UnregisterEnqueue();
  ~assembler() {
    if (enqueue_registered_->exchange(false)) {
      down_end_->UnregisterEnqueue();
    }
    queue_.reset();
  }

  // Invoked from some external Queue Reactable context
  std::unique_ptr<packet::PacketView<kLittleEndian>> on_incoming_data_ready() {
  std::unique_ptr<packet::PacketView<kLittleEndian>> on_le_incoming_data_ready() {
    auto packet = incoming_queue_.front();
    incoming_queue_.pop();
    if (incoming_queue_.empty() && enqueue_registered_.exchange(false)) {
      queue_->GetDownEnd()->UnregisterEnqueue();
    if (incoming_queue_.empty() && enqueue_registered_->exchange(false)) {
      down_end_->UnregisterEnqueue();
    }
    return std::make_unique<PacketView<kLittleEndian>>(packet);
  }
@@ -147,11 +134,30 @@ struct AclManager::acl_connection {
    }

    incoming_queue_.push(payload);
    if (!enqueue_registered_.exchange(true)) {
      queue_->GetDownEnd()->RegisterEnqueue(
          handler_, common::Bind(&AclManager::acl_connection::on_incoming_data_ready, common::Unretained(this)));
    if (!enqueue_registered_->exchange(true)) {
      down_end_->RegisterEnqueue(handler_,
                                 common::Bind(&assembler::on_le_incoming_data_ready, common::Unretained(this)));
    }
  }
};

struct AclManager::acl_connection {
  acl_connection(AddressWithType address_with_type, os::Handler* handler)
      : queue_(std::make_unique<AclConnection::Queue>(10)),
        assembler_(address_with_type, queue_->GetDownEnd(), handler), address_with_type_(address_with_type),
        handler_(handler) {}
  std::unique_ptr<AclConnection::Queue> queue_;
  struct assembler assembler_;
  AddressWithType address_with_type_;
  os::Handler* handler_;
  bool is_disconnected_ = false;
  ErrorCode disconnect_reason_;
  os::Handler* command_complete_handler_ = nullptr;
  os::Handler* disconnect_handler_ = nullptr;
  ConnectionManagementCallbacks* command_complete_callbacks_ = nullptr;
  common::OnceCallback<void(ErrorCode)> on_disconnect_callback_;

  ~acl_connection() {}

  void call_disconnect_callback() {
    disconnect_handler_->Post(BindOnce(std::move(on_disconnect_callback_), disconnect_reason_));
@@ -160,22 +166,12 @@ struct AclManager::acl_connection {

struct AclManager::le_acl_connection {
  le_acl_connection(AddressWithType address_with_type, os::Handler* handler)
      : address_with_type_(address_with_type), handler_(handler) {}
  AddressWithType address_with_type_;
      : queue_(std::make_unique<AclConnection::Queue>(10)),
        assembler_(address_with_type, queue_->GetDownEnd(), handler), handler_(handler) {}
  std::unique_ptr<AclConnection::Queue> queue_;
  struct assembler assembler_;
  LeConnectionManagementCallbacks* le_connection_management_callbacks_;
  os::Handler* handler_;
  std::shared_ptr<AclConnection::Queue> queue_ = std::make_unique<AclConnection::Queue>(10);
  PacketViewForRecombination recombination_stage_{std::make_shared<std::vector<uint8_t>>()};
  int remaining_sdu_continuation_packet_size_ = 0;
  std::shared_ptr<std::atomic_bool> enqueue_registered_ = std::make_shared<std::atomic_bool>(false);
  std::queue<packet::PacketView<kLittleEndian>> incoming_queue_;

  ~le_acl_connection() {
    if (enqueue_registered_->exchange(false)) {
      queue_->GetDownEnd()->UnregisterEnqueue();
    }
    queue_.reset();
  }

  AclConnection::QueueDownEnd* GetDownEnd() {
    return queue_->GetDownEnd();
@@ -184,63 +180,6 @@ struct AclManager::le_acl_connection {
  void SetLeConnectionManagementCallbacks(LeConnectionManagementCallbacks* callbacks) {
    le_connection_management_callbacks_ = callbacks;
  }

  // Invoked from some external Queue Reactable context
  std::unique_ptr<packet::PacketView<kLittleEndian>> on_le_incoming_data_ready() {
    auto packet = incoming_queue_.front();
    incoming_queue_.pop();
    if (incoming_queue_.empty() && enqueue_registered_->exchange(false)) {
      queue_->GetDownEnd()->UnregisterEnqueue();
    }
    return std::make_unique<PacketView<kLittleEndian>>(packet);
  }

  void on_incoming_packet(AclPacketView packet) {
    // TODO: What happens if the connection is stalled and fills up?
    PacketView<kLittleEndian> payload = packet.GetPayload();
    auto payload_size = payload.size();
    auto packet_boundary_flag = packet.GetPacketBoundaryFlag();
    if (packet_boundary_flag == PacketBoundaryFlag::FIRST_NON_AUTOMATICALLY_FLUSHABLE) {
      LOG_ERROR("Controller is not allowed to send FIRST_NON_AUTOMATICALLY_FLUSHABLE to host except loopback mode");
      return;
    }
    if (packet_boundary_flag == PacketBoundaryFlag::CONTINUING_FRAGMENT) {
      if (remaining_sdu_continuation_packet_size_ < payload_size) {
        LOG_WARN("Remote sent unexpected L2CAP PDU. Drop the entire L2CAP PDU");
        recombination_stage_ = PacketViewForRecombination(std::make_shared<std::vector<uint8_t>>());
        remaining_sdu_continuation_packet_size_ = 0;
        return;
      }
      remaining_sdu_continuation_packet_size_ -= payload_size;
      recombination_stage_.AppendPacketView(payload);
      if (remaining_sdu_continuation_packet_size_ != 0) {
        return;
      } else {
        payload = recombination_stage_;
        recombination_stage_ = PacketViewForRecombination(std::make_shared<std::vector<uint8_t>>());
      }
    } else if (packet_boundary_flag == PacketBoundaryFlag::FIRST_AUTOMATICALLY_FLUSHABLE) {
      if (recombination_stage_.size() > 0) {
        LOG_ERROR("Controller sent a starting packet without finishing previous packet. Drop previous one.");
      }
      auto l2cap_pdu_size = GetL2capPduSize(packet);
      remaining_sdu_continuation_packet_size_ = l2cap_pdu_size - (payload_size - kL2capBasicFrameHeaderSize);
      if (remaining_sdu_continuation_packet_size_ > 0) {
        recombination_stage_ = payload;
        return;
      }
    }
    if (incoming_queue_.size() > kMaxQueuedPacketsPerConnection) {
      LOG_ERROR("Dropping packet due to congestion from remote:%s", address_with_type_.ToString().c_str());
      return;
    }

    incoming_queue_.push(payload);
    if (!enqueue_registered_->exchange(true)) {
      queue_->GetDownEnd()->RegisterEnqueue(
          handler_, common::Bind(&AclManager::le_acl_connection::on_le_incoming_data_ready, common::Unretained(this)));
    }
  }
};

class LeAclConnectionTracker : public LeConnectionManagementCallbacks {
@@ -431,14 +370,14 @@ struct AclManager::impl : public security::ISecurityManagerListener {
    }
    auto connection_pair = acl_connections_.find(handle);
    if (connection_pair != acl_connections_.end()) {
      connection_pair->second.on_incoming_packet(*packet);
      connection_pair->second.assembler_.on_incoming_packet(*packet);
    } else {
      auto le_connection_pair = le_acl_connections_.find(handle);
      if (le_connection_pair == le_acl_connections_.end()) {
        LOG_INFO("Dropping packet of size %zu to unknown connection 0x%0hx", packet->size(), handle);
        return;
      }
      le_connection_pair->second.on_incoming_packet(*packet);
      le_connection_pair->second.assembler_.on_incoming_packet(*packet);
    }
  }