Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c112a604 authored by Zach Johnson's avatar Zach Johnson
Browse files

rusty-gd: add packet reassembler

Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --rhost
Change-Id: Ifdf4565dfe9c207f84b6b8d2a6f830509c243b08
parent 453f03d3
Loading
Loading
Loading
Loading
+5 −5
Original line number Diff line number Diff line
@@ -752,7 +752,7 @@ void PacketDef::GenRustChildEnums(std::ostream& s) const {
      s << child->name_ << "(Arc<" << child->name_ << "Data>),";
    }
    if (payload) {
      s << "Payload(Arc<Vec<u8>>),";
      s << "Payload(Bytes),";
    }
    s << "None,";
    s << "}\n";
@@ -765,7 +765,7 @@ void PacketDef::GenRustChildEnums(std::ostream& s) const {
      s << child->name_ << "(" << child->name_ << "Packet),";
    }
    if (payload) {
      s << "Payload(Arc<Vec<u8>>),";
      s << "Payload(Bytes),";
    }
    s << "None,";
    s << "}\n";
@@ -807,7 +807,7 @@ void PacketDef::GenRustStructDeclarations(std::ostream& s) const {
    s << ", ";
  }
  if (fields_.HasPayload()) {
    s << "pub payload: Option<Vec<u8>>,";
    s << "pub payload: Option<Bytes>,";
  }
  s << "}\n";
}
@@ -948,7 +948,7 @@ void PacketDef::GenRustStructImpls(std::ostream& s) const {
    s << "};\n";
  } else if (fields_.HasPayload()) {
    s << "let child = if payload.len() > 0 {";
    s << name_ << "DataChild::Payload(Arc::new(payload))";
    s << name_ << "DataChild::Payload(Bytes::from(payload))";
    s << "} else {";
    s << name_ << "DataChild::None";
    s << "};";
@@ -1200,7 +1200,7 @@ void PacketDef::GenRustBuilderStructImpls(std::ostream& s) const {
        if (ancestor->fields_.HasPayload()) {
          s << "child: match self.payload { ";
          s << "None => " << name_ << "DataChild::None,";
          s << "Some(vec) => " << name_ << "DataChild::Payload(Arc::new(vec)),";
          s << "Some(bytes) => " << name_ << "DataChild::Payload(bytes),";
          s << "},";
        } else {
          s << "child: " << name_ << "DataChild::None,";
+81 −0
Original line number Diff line number Diff line
//! Handles fragmentation & reassembly of ACL packets into whole L2CAP payloads

use bt_packets::hci::{AclChild, AclPacket, BroadcastFlag, PacketBoundaryFlag};
use bytes::{Buf, Bytes, BytesMut};
use log::{error, info, warn};
use tokio::sync::mpsc::Sender;

const L2CAP_BASIC_FRAME_HEADER_LEN: usize = 4;

pub struct Reassembler {
    buffer: Option<BytesMut>,
    remaining: usize,
    out: Sender<Bytes>,
}

impl Reassembler {
    /// Create a new reassembler
    pub fn new(out: Sender<Bytes>) -> Self {
        Self { buffer: None, remaining: 0, out }
    }

    /// Injest the packet and send out if fully reassembled
    pub async fn on_packet(&mut self, packet: AclPacket) {
        let payload = match packet.specialize() {
            AclChild::Payload(payload) => payload,
            AclChild::None => {
                info!("dropping ACL packet with empty payload");
                return;
            }
        };

        if let BroadcastFlag::ActivePeripheralBroadcast = packet.get_broadcast_flag() {
            // we do not accept broadcast packets
            return;
        }

        match packet.get_packet_boundary_flag() {
            PacketBoundaryFlag::FirstNonAutomaticallyFlushable => error!("not allowed to send FIRST_NON_AUTOMATICALLY_FLUSHABLE to host except loopback mode"),
            PacketBoundaryFlag::FirstAutomaticallyFlushable => {
                if self.buffer.take().is_some() {
                    error!("got a start packet without finishing previous reassembly - dropping previous");
                }

                let full_size = get_l2cap_pdu_size(&payload);
                self.remaining = full_size - (payload.len() - L2CAP_BASIC_FRAME_HEADER_LEN);
                if self.remaining > 0 {
                    let mut buffer = BytesMut::with_capacity(full_size);
                    buffer.extend_from_slice(&payload[..]);
                    self.buffer = Some(buffer);
                } else {
                    self.out.send(payload).await.unwrap();
                }
            },
            PacketBoundaryFlag::ContinuingFragment => {
                match self.buffer.take() {
                    None => warn!("got continuation packet without pending reassembly"),
                    Some(_) if self.remaining < payload.len() => warn!("remote sent unexpected L2CAP PDU - dropping entire packet"),
                    Some(mut buffer) => {
                        self.remaining -= payload.len();
                        buffer.extend_from_slice(&payload[..]);
                        if self.remaining == 0 {
                            self.out.send(buffer.freeze()).await.unwrap();
                        } else {
                            self.buffer = Some(buffer);
                        }
                    }
                }
            },
        }
    }
}

fn get_l2cap_pdu_size(first_packet: &Bytes) -> usize {
    if first_packet.len() <= L2CAP_BASIC_FRAME_HEADER_LEN {
        error!("invalid l2cap starting packet");

        0
    } else {
        (&first_packet[..]).get_u16_le() as usize
    }
}
+17 −14
Original line number Diff line number Diff line
//! ACL management

mod fragment;

use bt_hal::AclHal;
use bt_packets::hci::AclPacket;
use bytes::Bytes;
use gddi::{module, provides, Stoppable};
use log::info;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::select;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::{channel, Receiver};
use tokio::sync::Mutex;

use fragment::Reassembler;

module! {
    acl_module,
    providers {
@@ -18,29 +23,28 @@ module! {
    },
}

/// Base ACL connection trait
pub trait Connection {
    /// Get the handle of this connection
    fn get_handle(&self) -> u16;
    /// Get the sender side of inbound traffic
    fn get_tx(&self) -> &Sender<AclPacket>;
struct Connection {
    reassembler: Reassembler,
}

/// Manages rx and tx for open ACL connections
#[derive(Clone, Stoppable)]
pub struct AclDispatch {
    connections: Arc<Mutex<HashMap<u16, Box<dyn Connection + Sync + Send>>>>,
    connections: Arc<Mutex<HashMap<u16, Connection>>>,
}

impl AclDispatch {
    /// Register the provided connection with the ACL dispatch
    pub async fn register(&mut self, connection: Box<dyn Connection + Sync + Send>) {
    pub async fn register(&mut self, handle: u16) -> Receiver<Bytes> {
        let (tx, rx) = channel(10);
        assert!(self
            .connections
            .lock()
            .await
            .insert(connection.get_handle(), connection)
            .insert(handle, Connection { reassembler: Reassembler::new(tx) })
            .is_none());

        rx
    }
}

@@ -48,15 +52,14 @@ const QCOM_DEBUG_HANDLE: u16 = 0xedc;

#[provides]
async fn provide_acl_dispatch(acl: AclHal, rt: Arc<Runtime>) -> AclDispatch {
    let connections: Arc<Mutex<HashMap<u16, Box<dyn Connection + Sync + Send>>>> =
        Arc::new(Mutex::new(HashMap::new()));
    let connections: Arc<Mutex<HashMap<u16, Connection>>> = Arc::new(Mutex::new(HashMap::new()));
    let clone_connections = connections.clone();

    rt.spawn(async move {
        select! {
            Some(acl) = consume(&acl.rx) => {
                match connections.lock().await.get(&acl.get_handle()) {
                    Some(connection) => connection.get_tx().send(acl).await.unwrap(),
                match connections.lock().await.get_mut(&acl.get_handle()) {
                    Some(connection) => connection.reassembler.on_packet(acl).await,
                    None if acl.get_handle() == QCOM_DEBUG_HANDLE => {},
                    None => info!("no acl for {}", acl.get_handle()),
                }