Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 82cda2a6 authored by Zach Johnson's avatar Zach Johnson
Browse files

rusty-gd: move acl core from lib.rs to core.rs

this allows internals to stay internal

Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --rhost
Change-Id: Ifc9af8a5403804a5cf15b39abf979c1dc570c7bc
parent 86734bc0
Loading
Loading
Loading
Loading
+207 −0
Original line number Diff line number Diff line
//! ACL core dispatch shared between LE and classic

use crate::fragment::{fragmenting_stream, Reassembler};
use bt_common::Bluetooth::{self, Classic, Le};
use bt_hal::AclHal;
use bt_hci::{ControllerExports, EventRegistry};
use bt_packets::hci::EventChild::{DisconnectionComplete, NumberOfCompletedPackets};
use bt_packets::hci::{AclPacket, ErrorCode, EventCode};
use bytes::Bytes;
use futures::stream::{SelectAll, StreamExt};
use gddi::{module, provides, Stoppable};
use log::info;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{oneshot, Mutex};

module! {
    core_module,
    providers {
        AclDispatch => provide_acl_dispatch,
    },
}

/// A basic ACL connection
#[derive(Debug)]
pub struct Connection {
    rx: Receiver<Bytes>,
    tx: Sender<Bytes>,
    handle: u16,
    requests: Sender<Request>,
    evt_rx: Receiver<Event>,
}

struct ConnectionInternal {
    reassembler: Reassembler,
    bt: Bluetooth,
    close_tx: oneshot::Sender<()>,
    evt_tx: Sender<Event>,
}

impl Connection {
    /// Close this connection. Consumes self.
    #[allow(dead_code)]
    pub async fn close(self) {
        let (tx, rx) = oneshot::channel();
        self.requests.send(Request::Unregister { handle: self.handle, fut: tx }).await.unwrap();
        rx.await.unwrap()
    }
}

/// Events that can be generated by the underlying layer
#[derive(Debug)]
pub enum Event {
    /// Underlying connection was closed. Reports reason why.
    Closed(ErrorCode),
}

/// Manages rx and tx for open ACL connections
#[derive(Clone, Stoppable)]
pub struct AclDispatch {
    requests: Sender<Request>,
}

impl AclDispatch {
    /// Register the provided connection with the ACL dispatch
    #[allow(dead_code)]
    pub async fn register(&mut self, handle: u16, bt: Bluetooth) -> Connection {
        let (tx, rx) = oneshot::channel();
        self.requests.send(Request::Register { handle, bt, fut: tx }).await.unwrap();
        rx.await.unwrap()
    }
}

#[derive(Debug)]
enum Request {
    Register { handle: u16, bt: Bluetooth, fut: oneshot::Sender<Connection> },
    Unregister { handle: u16, fut: oneshot::Sender<()> },
}

const QCOM_DEBUG_HANDLE: u16 = 0xedc;

#[provides]
async fn provide_acl_dispatch(
    acl: AclHal,
    controller: Arc<ControllerExports>,
    mut events: EventRegistry,
    rt: Arc<Runtime>,
) -> AclDispatch {
    let (req_tx, mut req_rx) = channel::<Request>(10);
    let req_tx_clone = req_tx.clone();

    rt.spawn(async move {
        let mut connections: HashMap<u16, ConnectionInternal> = HashMap::new();
        let mut classic_outbound = SelectAll::new();
        let mut classic_credits = controller.acl_buffers;
        let mut le_outbound = SelectAll::new();
        let mut le_credits: u16 = controller.le_buffers.into();

        let (evt_tx, mut evt_rx) = channel(3);
        events.register(EventCode::NumberOfCompletedPackets, evt_tx.clone()).await;
        events.register(EventCode::DisconnectionComplete, evt_tx).await;

        loop {
            select! {
                Some(req) = req_rx.recv() => {
                    match req {
                        Request::Register { handle, bt, fut } => {
                            let (out_tx, out_rx) = channel(10);
                            let (in_tx, in_rx) = channel(10);
                            let (evt_tx, evt_rx) = channel(3);
                            let (close_tx, close_rx) = oneshot::channel();

                            assert!(connections.insert(
                                handle,
                                ConnectionInternal {
                                    reassembler: Reassembler::new(out_tx),
                                    bt,
                                    close_tx,
                                    evt_tx,
                                }).is_none());

                            match bt {
                                Classic => {
                                    classic_outbound.push(fragmenting_stream(
                                        in_rx, controller.acl_buffer_length.into(), handle, bt, close_rx));
                                },
                                Le => {
                                    le_outbound.push(fragmenting_stream(
                                        in_rx, controller.le_buffer_length.into(), handle, bt, close_rx));
                                },
                            }

                            fut.send(Connection {
                                rx: out_rx,
                                tx: in_tx,
                                handle,
                                requests: req_tx_clone.clone(),
                                evt_rx
                            }).unwrap();
                        },
                        Request::Unregister { handle, fut } => {
                            if let Some(connection) = connections.remove(&handle) {
                                connection.close_tx.send(()).unwrap();
                            }
                            fut.send(()).unwrap();
                        }
                    }
                },
                Some(packet) = consume(&acl.rx) => {
                    match connections.get_mut(&packet.get_handle()) {
                        Some(connection) => connection.reassembler.on_packet(packet).await,
                        None if packet.get_handle() == QCOM_DEBUG_HANDLE => {},
                        None => info!("no acl for {}", packet.get_handle()),
                    }
                },
                Some(packet) = classic_outbound.next(), if classic_credits > 0 => {
                    acl.tx.send(packet).await.unwrap();
                    classic_credits -= 1;
                },
                Some(packet) = le_outbound.next(), if le_credits > 0 => {
                    acl.tx.send(packet).await.unwrap();
                    le_credits -= 1;
                },
                Some(evt) = evt_rx.recv() => {
                    match evt.specialize() {
                        NumberOfCompletedPackets(evt) => {
                            for info in evt.get_completed_packets() {
                                match connections.get(&info.connection_handle) {
                                    Some(connection) => {
                                        let credits = info.host_num_of_completed_packets;
                                        match connection.bt {
                                            Classic => {
                                                classic_credits += credits;
                                                assert!(classic_credits <= controller.acl_buffers);
                                            },
                                            Le => {
                                                le_credits += credits;
                                                assert!(le_credits <= controller.le_buffers.into());
                                            },
                                        }
                                    },
                                    None => info!("dropping credits for unknown connection {}", info.connection_handle),
                                }
                            }
                        },
                        DisconnectionComplete(evt) => {
                            if let Some(connection) = connections.remove(&evt.get_connection_handle()) {
                                connection.close_tx.send(()).unwrap();
                                connection.evt_tx.send(Event::Closed(evt.get_reason())).await.unwrap();
                            }
                        },
                        _ => unimplemented!(),
                    }
                },
            }
        }
    });

    AclDispatch { requests: req_tx }
}

async fn consume(rx: &Arc<Mutex<Receiver<AclPacket>>>) -> Option<AclPacket> {
    rx.lock().await.recv().await
}
+3 −199
Original line number Diff line number Diff line
@@ -2,211 +2,15 @@

/// Exposes classic ACL functionality
pub mod classic;
mod core;
mod fragment;

use bt_common::Bluetooth::{self, Classic, Le};
use bt_hal::AclHal;
use bt_hci::{ControllerExports, EventRegistry};
use bt_packets::hci::EventChild::{DisconnectionComplete, NumberOfCompletedPackets};
use bt_packets::hci::{AclPacket, ErrorCode, EventCode};
use bytes::Bytes;
use fragment::{fragmenting_stream, Reassembler};
use futures::stream::{SelectAll, StreamExt};
use gddi::{module, provides, Stoppable};
use log::info;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{oneshot, Mutex};
use gddi::module;

module! {
    acl_module,
    submodules {
        classic::classic_acl_module,
        core::core_module,
    },
    providers {
        AclDispatch => provide_acl_dispatch,
    },
}

/// A basic ACL connection
#[derive(Debug)]
pub struct Connection {
    rx: Receiver<Bytes>,
    tx: Sender<Bytes>,
    handle: u16,
    requests: Sender<Request>,
    evt_rx: Receiver<Event>,
}

struct ConnectionInternal {
    reassembler: Reassembler,
    bt: Bluetooth,
    close_tx: oneshot::Sender<()>,
    evt_tx: Sender<Event>,
}

impl Connection {
    /// Close this connection. Consumes self.
    pub async fn close(self) {
        let (tx, rx) = oneshot::channel();
        self.requests.send(Request::Unregister { handle: self.handle, fut: tx }).await.unwrap();
        rx.await.unwrap()
    }
}

/// Events that can be generated by the underlying layer
#[derive(Debug)]
pub enum Event {
    /// Underlying connection was closed. Reports reason why.
    Closed(ErrorCode),
}

/// Manages rx and tx for open ACL connections
#[derive(Clone, Stoppable)]
pub struct AclDispatch {
    requests: Sender<Request>,
}

impl AclDispatch {
    /// Register the provided connection with the ACL dispatch
    pub async fn register(&mut self, handle: u16, bt: Bluetooth) -> Connection {
        let (tx, rx) = oneshot::channel();
        self.requests.send(Request::Register { handle, bt, fut: tx }).await.unwrap();
        rx.await.unwrap()
    }
}

#[derive(Debug)]
enum Request {
    Register { handle: u16, bt: Bluetooth, fut: oneshot::Sender<Connection> },
    Unregister { handle: u16, fut: oneshot::Sender<()> },
}

const QCOM_DEBUG_HANDLE: u16 = 0xedc;

#[provides]
async fn provide_acl_dispatch(
    acl: AclHal,
    controller: Arc<ControllerExports>,
    mut events: EventRegistry,
    rt: Arc<Runtime>,
) -> AclDispatch {
    let (req_tx, mut req_rx) = channel::<Request>(10);
    let req_tx_clone = req_tx.clone();

    rt.spawn(async move {
        let mut connections: HashMap<u16, ConnectionInternal> = HashMap::new();
        let mut classic_outbound = SelectAll::new();
        let mut classic_credits = controller.acl_buffers;
        let mut le_outbound = SelectAll::new();
        let mut le_credits: u16 = controller.le_buffers.into();

        let (evt_tx, mut evt_rx) = channel(3);
        events.register(EventCode::NumberOfCompletedPackets, evt_tx.clone()).await;
        events.register(EventCode::DisconnectionComplete, evt_tx).await;

        loop {
            select! {
                Some(req) = req_rx.recv() => {
                    match req {
                        Request::Register { handle, bt, fut } => {
                            let (out_tx, out_rx) = channel(10);
                            let (in_tx, in_rx) = channel(10);
                            let (evt_tx, evt_rx) = channel(3);
                            let (close_tx, close_rx) = oneshot::channel();

                            assert!(connections.insert(
                                handle,
                                ConnectionInternal {
                                    reassembler: Reassembler::new(out_tx),
                                    bt,
                                    close_tx,
                                    evt_tx,
                                }).is_none());

                            match bt {
                                Classic => {
                                    classic_outbound.push(fragmenting_stream(
                                        in_rx, controller.acl_buffer_length.into(), handle, bt, close_rx));
                                },
                                Le => {
                                    le_outbound.push(fragmenting_stream(
                                        in_rx, controller.le_buffer_length.into(), handle, bt, close_rx));
                                },
                            }

                            fut.send(Connection {
                                rx: out_rx,
                                tx: in_tx,
                                handle,
                                requests: req_tx_clone.clone(),
                                evt_rx
                            }).unwrap();
                        },
                        Request::Unregister { handle, fut } => {
                            if let Some(connection) = connections.remove(&handle) {
                                connection.close_tx.send(()).unwrap();
                            }
                            fut.send(()).unwrap();
                        }
                    }
                },
                Some(packet) = consume(&acl.rx) => {
                    match connections.get_mut(&packet.get_handle()) {
                        Some(connection) => connection.reassembler.on_packet(packet).await,
                        None if packet.get_handle() == QCOM_DEBUG_HANDLE => {},
                        None => info!("no acl for {}", packet.get_handle()),
                    }
                },
                Some(packet) = classic_outbound.next(), if classic_credits > 0 => {
                    acl.tx.send(packet).await.unwrap();
                    classic_credits -= 1;
                },
                Some(packet) = le_outbound.next(), if le_credits > 0 => {
                    acl.tx.send(packet).await.unwrap();
                    le_credits -= 1;
                },
                Some(evt) = evt_rx.recv() => {
                    match evt.specialize() {
                        NumberOfCompletedPackets(evt) => {
                            for info in evt.get_completed_packets() {
                                match connections.get(&info.connection_handle) {
                                    Some(connection) => {
                                        let credits = info.host_num_of_completed_packets;
                                        match connection.bt {
                                            Classic => {
                                                classic_credits += credits;
                                                assert!(classic_credits <= controller.acl_buffers);
                                            },
                                            Le => {
                                                le_credits += credits;
                                                assert!(le_credits <= controller.le_buffers.into());
                                            },
                                        }
                                    },
                                    None => info!("dropping credits for unknown connection {}", info.connection_handle),
                                }
                            }
                        },
                        DisconnectionComplete(evt) => {
                            if let Some(connection) = connections.remove(&evt.get_connection_handle()) {
                                connection.close_tx.send(()).unwrap();
                                connection.evt_tx.send(Event::Closed(evt.get_reason())).await.unwrap();
                            }
                        },
                        _ => unimplemented!(),
                    }
                },
            }
        }
    });

    AclDispatch { requests: req_tx }
}

async fn consume(rx: &Arc<Mutex<Receiver<AclPacket>>>) -> Option<AclPacket> {
    rx.lock().await.recv().await
}