Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0cf0528e authored by Zach Johnson's avatar Zach Johnson Committed by Gerrit Code Review
Browse files

Merge changes I448e8ee9,I3b20f04b,I038364bc

* changes:
  rusty-gd: forward ACL channels in HciExports, expose over facades
  rusty-gd: simplify hci facade implementation
  rusty-gd: reorg hci internals
parents e41aa7de 5ead08af
Loading
Loading
Loading
Loading
+36 −48
Original line number Diff line number Diff line
@@ -5,13 +5,14 @@ use bt_common::GrpcFacade;
use bt_hci_proto::empty::Empty;
use bt_hci_proto::facade::*;
use bt_hci_proto::facade_grpc::{create_hci_layer_facade, HciLayerFacade};
use futures::prelude::*;
use bt_packet::HciEvent;
use futures::sink::SinkExt;
use gddi::{module, provides};
use grpcio::*;
use log::error;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;

module! {
    facade_module,
@@ -22,7 +23,13 @@ module! {

#[provides]
async fn provide_facade(hci_exports: HciExports, rt: Arc<Runtime>) -> HciLayerFacadeService {
    HciLayerFacadeService { hci_exports, rt }
    let (from_hci_evt_tx, to_grpc_evt_rx) = channel::<HciEvent>(10);
    HciLayerFacadeService {
        hci_exports,
        rt,
        from_hci_evt_tx,
        to_grpc_evt_rx: Arc::new(Mutex::new(to_grpc_evt_rx)),
    }
}

/// HCI layer facade service
@@ -30,6 +37,8 @@ async fn provide_facade(hci_exports: HciExports, rt: Arc<Runtime>) -> HciLayerFa
pub struct HciLayerFacadeService {
    hci_exports: HciExports,
    rt: Arc<Runtime>,
    from_hci_evt_tx: Sender<HciEvent>,
    to_grpc_evt_rx: Arc<Mutex<Receiver<HciEvent>>>,
}

impl GrpcFacade for HciLayerFacadeService {
@@ -41,7 +50,7 @@ impl GrpcFacade for HciLayerFacadeService {
impl HciLayerFacade for HciLayerFacadeService {
    fn send_command_with_complete(
        &mut self,
        ctx: RpcContext<'_>,
        _ctx: RpcContext<'_>,
        mut cmd: Command,
        sink: UnarySink<Empty>,
    ) {
@@ -49,23 +58,12 @@ impl HciLayerFacade for HciLayerFacadeService {
            self.hci_exports
                .enqueue_command_with_complete(cmd.take_payload().into()),
        );

        let f = sink
            .success(Empty::default())
            .map_err(|e: grpcio::Error| {
                error!(
                    "failed to handle enqueue_command_with_complete request: {:?}",
                    e
                )
            })
            .map(|_| ());

        ctx.spawn(f);
        sink.success(Empty::default());
    }

    fn send_command_with_status(
        &mut self,
        ctx: RpcContext<'_>,
        _ctx: RpcContext<'_>,
        mut cmd: Command,
        sink: UnarySink<Empty>,
    ) {
@@ -73,37 +71,15 @@ impl HciLayerFacade for HciLayerFacadeService {
            self.hci_exports
                .enqueue_command_with_complete(cmd.take_payload().into()),
        );

        let f = sink
            .success(Empty::default())
            .map_err(|e: grpcio::Error| {
                error!(
                    "failed to handle enqueue_command_with_status request: {:?}",
                    e
                )
            })
            .map(|_| ());

        ctx.spawn(f);
        sink.success(Empty::default());
    }

    fn request_event(&mut self, ctx: RpcContext<'_>, code: EventRequest, sink: UnarySink<Empty>) {
    fn request_event(&mut self, _ctx: RpcContext<'_>, code: EventRequest, sink: UnarySink<Empty>) {
        self.rt.block_on(
            self.hci_exports
                .register_event_handler(code.get_code() as u8),
                .register_event_handler(code.get_code() as u8, self.from_hci_evt_tx.clone()),
        );

        let f = sink
            .success(Empty::default())
            .map_err(|e: grpcio::Error| {
                error!(
                    "failed to handle enqueue_command_with_status request: {:?}",
                    e
                )
            })
            .map(|_| ());

        ctx.spawn(f);
        sink.success(Empty::default());
    }

    fn request_le_subevent(
@@ -115,8 +91,12 @@ impl HciLayerFacade for HciLayerFacadeService {
        unimplemented!()
    }

    fn send_acl(&mut self, _ctx: RpcContext<'_>, _data: AclPacket, _sink: UnarySink<Empty>) {
        unimplemented!()
    fn send_acl(&mut self, _ctx: RpcContext<'_>, mut packet: AclPacket, sink: UnarySink<Empty>) {
        self.hci_exports
            .acl_tx
            .send(packet.take_data().into())
            .unwrap();
        sink.success(Empty::default());
    }

    fn stream_events(
@@ -125,7 +105,7 @@ impl HciLayerFacade for HciLayerFacadeService {
        _req: Empty,
        mut resp: ServerStreamingSink<Event>,
    ) {
        let evt_rx = self.hci_exports.evt_rx.clone();
        let evt_rx = self.to_grpc_evt_rx.clone();

        self.rt.spawn(async move {
            while let Some(event) = evt_rx.lock().await.recv().await {
@@ -149,8 +129,16 @@ impl HciLayerFacade for HciLayerFacadeService {
        &mut self,
        _ctx: RpcContext<'_>,
        _req: Empty,
        mut _resp: ServerStreamingSink<AclPacket>,
        mut resp: ServerStreamingSink<AclPacket>,
    ) {
        unimplemented!()
        let acl_rx = self.hci_exports.acl_rx.clone();

        self.rt.spawn(async move {
            while let Some(data) = acl_rx.lock().await.recv().await {
                let mut packet = AclPacket::default();
                packet.set_data(data.to_vec());
                resp.send((packet, WriteFlags::default())).await.unwrap();
            }
        });
    }
}
+57 −77
Original line number Diff line number Diff line
@@ -7,15 +7,16 @@ pub mod error;
pub mod facade;

use bt_hal::HalExports;
use bt_packet::{HciCommand, HciEvent};
use bt_packet::{HciCommand, HciEvent, RawPacket};
use error::Result;
use facade::facade_module;
use gddi::{module, provides};
use std::collections::HashSet;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender, UnboundedReceiver, UnboundedSender};
use tokio::sync::{oneshot, Mutex};

module! {
    hci_module,
@@ -29,25 +30,21 @@ module! {

#[provides]
async fn provide_hci(hal_exports: HalExports, rt: Arc<Runtime>) -> HciExports {
    let (evt_tx, evt_rx) = channel::<HciEvent>(10);
    let (cmd_tx, cmd_rx) = mpsc::channel::<HciCommandEntry>(10);
    let hashset = Arc::new(Mutex::new(HashSet::new()));
    let pending_cmds = Arc::new(Mutex::new(Vec::new()));

    rt.spawn(on_event(
        pending_cmds.clone(),
        evt_tx.clone(),
    let (cmd_tx, cmd_rx) = channel::<Command>(10);
    let evt_handlers = Arc::new(Mutex::new(HashMap::new()));

    rt.spawn(dispatch(
        evt_handlers.clone(),
        hal_exports.evt_rx,
        hal_exports.cmd_tx,
        cmd_rx,
    ));
    rt.spawn(on_command(pending_cmds, hal_exports.cmd_tx, cmd_rx));

    let evt_rx = Arc::new(Mutex::new(evt_rx));

    HciExports {
        cmd_tx,
        registered_events: Arc::clone(&hashset),
        evt_tx,
        evt_rx,
        evt_handlers,
        acl_tx: hal_exports.acl_tx,
        acl_rx: hal_exports.acl_rx,
    }
}

@@ -55,15 +52,13 @@ async fn provide_hci(hal_exports: HalExports, rt: Arc<Runtime>) -> HciExports {
/// Uses a oneshot channel to wait until the event corresponding
/// to the command is received
#[derive(Debug)]
pub struct HciCommandEntry {
    /// The HCI command to send
struct Command {
    cmd: HciCommand,
    /// Transmit half of the oneshot
    fut: oneshot::Sender<HciCommand>,
}

#[derive(Debug)]
struct HciCommandEntryInner {
struct PendingCommand {
    opcode: u16,
    fut: oneshot::Sender<HciCommand>,
}
@@ -71,89 +66,74 @@ struct HciCommandEntryInner {
/// HCI interface
#[derive(Clone)]
pub struct HciExports {
    /// Transmit end of a channel used to send HCI commands
    cmd_tx: Sender<HciCommandEntry>,
    registered_events: Arc<Mutex<HashSet<u8>>>,
    evt_tx: Sender<HciEvent>,
    /// Receive channel half used to receive HCI events from the HAL
    pub evt_rx: Arc<Mutex<Receiver<HciEvent>>>,
    cmd_tx: Sender<Command>,
    evt_handlers: Arc<Mutex<HashMap<u8, Sender<HciEvent>>>>,
    /// Transmit end of a channel used to send ACL data
    pub acl_tx: UnboundedSender<RawPacket>,
    /// Receive end of a channel used to receive ACL data
    pub acl_rx: Arc<Mutex<UnboundedReceiver<RawPacket>>>,
}

impl HciExports {
    /// Send the HCI command
    async fn send(&mut self, cmd: HciCommand) -> Result<HciEvent> {
        let (tx, rx) = oneshot::channel::<HciEvent>();
        self.cmd_tx.send(HciCommandEntry { cmd, fut: tx }).await?;
        self.cmd_tx.send(Command { cmd, fut: tx }).await?;
        let event = rx.await?;
        Ok(event)
    }

    /// Send the HCI event
    async fn dispatch_event(&mut self, event: HciEvent) -> Result<()> {
        let evt_code = bt_packet::get_evt_code(&event);
        if let Some(evt_code) = evt_code {
            let registered_events = self.registered_events.lock().await;
            if registered_events.contains(&evt_code) {
                self.evt_tx.send(event).await?;
            }
        }
        Ok(())
    }

    /// Enqueue an HCI command expecting a command complete
    /// response from the controller
    pub async fn enqueue_command_with_complete(&mut self, cmd: HciCommand) {
        let event = self.send(cmd).await.unwrap();
        self.dispatch_event(event).await.unwrap();
    pub async fn enqueue_command_with_complete(&mut self, cmd: HciCommand) -> HciEvent {
        self.send(cmd).await.unwrap()
    }

    /// Enqueue an HCI command expecting a status response
    /// from the controller
    pub async fn enqueue_command_with_status(&mut self, cmd: HciCommand) {
        let event = self.send(cmd).await.unwrap();
        self.dispatch_event(event).await.unwrap();
    pub async fn enqueue_command_with_status(&mut self, cmd: HciCommand) -> HciEvent {
        self.send(cmd).await.unwrap()
    }

    /// Indicate interest in specific HCI events
    // TODO(qasimj): Add Sender<HciEvent> as an argument so that the calling
    // code can register its own event handler
    pub async fn register_event_handler(&mut self, evt_code: u8) {
        let mut registered_events = self.registered_events.lock().await;
        registered_events.insert(evt_code);
    pub async fn register_event_handler(&mut self, evt_code: u8, sender: Sender<HciEvent>) {
        self.evt_handlers.lock().await.insert(evt_code, sender);
    }
}

async fn on_event(
    pending_cmds: Arc<Mutex<Vec<HciCommandEntryInner>>>,
    evt_tx: Sender<HciEvent>,
    evt_rx: Arc<Mutex<mpsc::UnboundedReceiver<HciEvent>>>,
async fn dispatch(
    evt_handlers: Arc<Mutex<HashMap<u8, Sender<HciEvent>>>>,
    evt_rx: Arc<Mutex<UnboundedReceiver<HciEvent>>>,
    cmd_tx: UnboundedSender<HciCommand>,
    mut cmd_rx: Receiver<Command>,
) {
    while let Some(evt) = evt_rx.lock().await.recv().await {
    let mut pending_cmds: Vec<PendingCommand> = Vec::new();
    loop {
        select! {
            Some(evt) = consume(&evt_rx) => {
                let opcode = bt_packet::get_evt_opcode(&evt).unwrap();
        let mut pending_cmds = pending_cmds.lock().await;
                let evt_code = bt_packet::get_evt_code(&evt).unwrap();
                if let Some(pending_cmd) = remove_first(&mut pending_cmds, |entry| entry.opcode == opcode) {
                    pending_cmd.fut.send(evt).unwrap();
        } else {
            evt_tx.send(evt).await.unwrap();
        }
                } else if let Some(sender) = evt_handlers.lock().await.get(&evt_code) {
                    sender.send(evt).await.unwrap();
                }
}

async fn on_command(
    pending_cmds: Arc<Mutex<Vec<HciCommandEntryInner>>>,
    cmd_tx: mpsc::UnboundedSender<HciCommand>,
    mut cmd_rx: mpsc::Receiver<HciCommandEntry>,
) {
    while let Some(cmd) = cmd_rx.recv().await {
        let mut pending_cmds = pending_cmds.lock().await;
        pending_cmds.push(HciCommandEntryInner {
            },
            Some(cmd) = cmd_rx.recv() => {
                pending_cmds.push(PendingCommand {
                    opcode: bt_packet::get_cmd_opcode(&cmd.cmd).unwrap(),
                    fut: cmd.fut,
                });
                cmd_tx.send(cmd.cmd).unwrap();
            },
            else => break,
        }
    }
}

async fn consume(evt_rx: &Arc<Mutex<UnboundedReceiver<HciEvent>>>) -> Option<HciEvent> {
    evt_rx.lock().await.recv().await
}

fn remove_first<T, P>(vec: &mut Vec<T>, predicate: P) -> Option<T>
where
    P: FnMut(&T) -> bool,