Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 52ceb23f authored by Zach Johnson's avatar Zach Johnson
Browse files

rusty-gd: implement outbound fragmentation & dispatch

Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --rhost
Change-Id: I122e9b5e84be2164685f14c0efe48765c752c094
parent c112a604
Loading
Loading
Loading
Loading
+41 −5
Original line number Diff line number Diff line
//! Handles fragmentation & reassembly of ACL packets into whole L2CAP payloads

use bt_packets::hci::{AclChild, AclPacket, BroadcastFlag, PacketBoundaryFlag};
use bt_common::Bluetooth;
use bt_packets::hci::PacketBoundaryFlag::{
    ContinuingFragment, FirstAutomaticallyFlushable, FirstNonAutomaticallyFlushable,
};
use bt_packets::hci::{AclBuilder, AclChild, AclPacket, BroadcastFlag};
use bytes::{Buf, Bytes, BytesMut};
use futures::stream::{self, StreamExt};
use log::{error, info, warn};
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{Receiver, Sender};

const L2CAP_BASIC_FRAME_HEADER_LEN: usize = 4;

@@ -35,8 +40,8 @@ impl Reassembler {
        }

        match packet.get_packet_boundary_flag() {
            PacketBoundaryFlag::FirstNonAutomaticallyFlushable => error!("not allowed to send FIRST_NON_AUTOMATICALLY_FLUSHABLE to host except loopback mode"),
            PacketBoundaryFlag::FirstAutomaticallyFlushable => {
            FirstNonAutomaticallyFlushable => error!("not allowed to send FIRST_NON_AUTOMATICALLY_FLUSHABLE to host except loopback mode"),
            FirstAutomaticallyFlushable => {
                if self.buffer.take().is_some() {
                    error!("got a start packet without finishing previous reassembly - dropping previous");
                }
@@ -51,7 +56,7 @@ impl Reassembler {
                    self.out.send(payload).await.unwrap();
                }
            },
            PacketBoundaryFlag::ContinuingFragment => {
            ContinuingFragment => {
                match self.buffer.take() {
                    None => warn!("got continuation packet without pending reassembly"),
                    Some(_) if self.remaining < payload.len() => warn!("remote sent unexpected L2CAP PDU - dropping entire packet"),
@@ -79,3 +84,34 @@ fn get_l2cap_pdu_size(first_packet: &Bytes) -> usize {
        (&first_packet[..]).get_u16_le() as usize
    }
}

pub fn fragmenting_stream(
    rx: Receiver<Bytes>,
    mtu: usize,
    handle: u16,
    bt: Bluetooth,
) -> std::pin::Pin<
    std::boxed::Box<dyn futures::Stream<Item = bt_packets::hci::AclPacket> + std::marker::Send>,
> {
    rx.flat_map(move |data| {
        stream::iter(
            data.chunks(mtu)
                .enumerate()
                .map(move |(i, chunk)| {
                    AclBuilder {
                        handle,
                        packet_boundary_flag: match bt {
                            Bluetooth::Classic if i == 0 => FirstAutomaticallyFlushable,
                            Bluetooth::Le if i == 0 => FirstNonAutomaticallyFlushable,
                            _ => ContinuingFragment,
                        },
                        broadcast_flag: BroadcastFlag::PointToPoint,
                        payload: Some(Bytes::copy_from_slice(chunk)),
                    }
                    .build()
                })
                .collect::<Vec<AclPacket>>(),
        )
    })
    .boxed()
}
+59 −29
Original line number Diff line number Diff line
@@ -2,19 +2,21 @@

mod fragment;

use bt_common::Bluetooth;
use bt_hal::AclHal;
use bt_hci::ControllerExports;
use bt_packets::hci::AclPacket;
use bytes::Bytes;
use fragment::{fragmenting_stream, Reassembler};
use futures::stream::{SelectAll, StreamExt};
use gddi::{module, provides, Stoppable};
use log::info;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::sync::Mutex;

use fragment::Reassembler;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{oneshot, Mutex};

module! {
    acl_module,
@@ -23,53 +25,81 @@ module! {
    },
}

struct Connection {
    reassembler: Reassembler,
/// A basic ACL connection
#[derive(Debug)]
pub struct Connection {
    rx: Receiver<Bytes>,
    tx: Sender<Bytes>,
}

/// Manages rx and tx for open ACL connections
#[derive(Clone, Stoppable)]
pub struct AclDispatch {
    connections: Arc<Mutex<HashMap<u16, Connection>>>,
    requests: Sender<RegistrationRequest>,
}

impl AclDispatch {
    /// Register the provided connection with the ACL dispatch
    pub async fn register(&mut self, handle: u16) -> Receiver<Bytes> {
        let (tx, rx) = channel(10);
        assert!(self
            .connections
            .lock()
            .await
            .insert(handle, Connection { reassembler: Reassembler::new(tx) })
            .is_none());

        rx
    pub async fn register(&mut self, handle: u16, bt: Bluetooth) -> Connection {
        let (tx, rx) = oneshot::channel();
        self.requests.send(RegistrationRequest { handle, bt, fut: tx }).await.unwrap();
        rx.await.unwrap()
    }
}

#[derive(Debug)]
struct RegistrationRequest {
    handle: u16,
    bt: Bluetooth,
    fut: oneshot::Sender<Connection>,
}

const QCOM_DEBUG_HANDLE: u16 = 0xedc;

#[provides]
async fn provide_acl_dispatch(acl: AclHal, rt: Arc<Runtime>) -> AclDispatch {
    let connections: Arc<Mutex<HashMap<u16, Connection>>> = Arc::new(Mutex::new(HashMap::new()));
    let clone_connections = connections.clone();
async fn provide_acl_dispatch(
    acl: AclHal,
    controller: Arc<ControllerExports>,
    rt: Arc<Runtime>,
) -> AclDispatch {
    let (req_tx, mut req_rx) = channel::<RegistrationRequest>(10);

    rt.spawn(async move {
        let mut connections: HashMap<u16, Reassembler> = HashMap::new();
        let mut outbound = SelectAll::new();
        loop {
            select! {
            Some(acl) = consume(&acl.rx) => {
                match connections.lock().await.get_mut(&acl.get_handle()) {
                    Some(connection) => connection.reassembler.on_packet(acl).await,
                    None if acl.get_handle() == QCOM_DEBUG_HANDLE => {},
                    None => info!("no acl for {}", acl.get_handle()),
                Some(req) = req_rx.recv() => {
                    let (out_tx, out_rx) = channel(10);
                    let (in_tx, in_rx) = channel(10);

                    let mtu = match req.bt {
                        Bluetooth::Classic => controller.acl_buffer_length.into(),
                        Bluetooth::Le => controller.le_buffer_length.into(),
                    };

                    assert!(connections.insert(req.handle, Reassembler::new(out_tx)).is_none());
                    outbound.push(fragmenting_stream(in_rx, mtu, req.handle, req.bt));

                    req.fut.send(Connection { rx: out_rx, tx: in_tx }).unwrap();
                },
                Some(packet) = consume(&acl.rx) => {
                    match connections.get_mut(&packet.get_handle()) {
                        Some(reassembler) => reassembler.on_packet(packet).await,
                        None if packet.get_handle() == QCOM_DEBUG_HANDLE => {},
                        None => info!("no acl for {}", packet.get_handle()),
                    }
                }
                Some(packet) = outbound.next() => {
                    acl.tx.send(packet).await.unwrap();
                }
            }
        }
    });

    AclDispatch { connections: clone_connections }
    AclDispatch { requests: req_tx }
}

async fn consume(evt_rx: &Arc<Mutex<Receiver<AclPacket>>>) -> Option<AclPacket> {
    evt_rx.lock().await.recv().await
async fn consume(rx: &Arc<Mutex<Receiver<AclPacket>>>) -> Option<AclPacket> {
    rx.lock().await.recv().await
}
+10 −3
Original line number Diff line number Diff line
@@ -22,9 +22,7 @@ pub mod sys_prop;
#[cfg(target_os = "android")]
pub fn init_logging() {
    android_logger::init_once(
        android_logger::Config::default()
            .with_tag("bt")
            .with_min_level(log::Level::Debug),
        android_logger::Config::default().with_tag("bt").with_min_level(log::Level::Debug),
    );
}

@@ -43,3 +41,12 @@ pub trait GrpcFacade {
    /// Convert the object into the service
    fn into_grpc(self) -> grpcio::Service;
}

/// Useful for distinguishing between BT classic & LE in functions that support both
#[derive(Debug, Clone, Copy)]
pub enum Bluetooth {
    /// Classic BT we all know and love, started in the 90s.
    Classic,
    /// Bluetooth low energy from the 2010s. Also known as BLE, BTLE, etc.
    Le,
}