Loading system/gd/rust/facade/src/lib.rs +1 −1 Original line number Diff line number Diff line Loading @@ -152,7 +152,7 @@ impl FacadeServiceManager { let mut services = Vec::new(); match req.get_module_under_test() { BluetoothModule::HAL => { services.push(HciHalFacadeService::create(Arc::clone(&rt))); services.push(HciHalFacadeService::create(hal_exports, Arc::clone(&rt))); } BluetoothModule::HCI => { let hci_exports = Hci::start(hal_exports, Arc::clone(&rt)); Loading system/gd/rust/hal/src/facade.rs +48 −10 Original line number Diff line number Diff line Loading @@ -5,31 +5,50 @@ use bt_hal_proto::facade::*; use bt_hal_proto::facade_grpc::{create_hci_hal_facade, HciHalFacade}; use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::sync::Mutex; use futures::sink::SinkExt; use grpcio::*; use std::sync::Arc; use crate::HalExports; use bt_packet::{HciCommand, HciEvent, RawPacket}; /// HCI HAL facade service #[derive(Clone)] pub struct HciHalFacadeService { rt: Arc<Runtime>, cmd_tx: mpsc::UnboundedSender<HciCommand>, evt_rx: Arc<Mutex<mpsc::UnboundedReceiver<HciEvent>>>, acl_tx: mpsc::UnboundedSender<RawPacket>, acl_rx: Arc<Mutex<mpsc::UnboundedReceiver<HciEvent>>>, } impl HciHalFacadeService { /// Create a new instance of HCI HAL facade service pub fn create(rt: Arc<Runtime>) -> grpcio::Service { create_hci_hal_facade(Self { rt }) pub fn create(hal_exports: HalExports, rt: Arc<Runtime>) -> grpcio::Service { create_hci_hal_facade(Self { rt, cmd_tx: hal_exports.cmd_tx, evt_rx: Arc::new(Mutex::new(hal_exports.evt_rx)), acl_tx: hal_exports.acl_tx, acl_rx: Arc::new(Mutex::new(hal_exports.acl_rx)), }) } } impl HciHalFacade for HciHalFacadeService { fn send_command(&mut self, _ctx: RpcContext<'_>, _cmd: Command, _sink: UnarySink<Empty>) { unimplemented!() fn send_command(&mut self, _ctx: RpcContext<'_>, mut cmd: Command, sink: UnarySink<Empty>) { self.cmd_tx.send(cmd.take_payload().into()).unwrap(); sink.success(Empty::default()); } fn send_acl(&mut self, _ctx: RpcContext<'_>, _acl: AclPacket, _sink: UnarySink<Empty>) { unimplemented!() fn send_acl(&mut self, _ctx: RpcContext<'_>, mut acl: AclPacket, sink: UnarySink<Empty>) { self.acl_tx.send(acl.take_payload().into()).unwrap(); sink.success(Empty::default()); } fn send_sco(&mut self, _ctx: RpcContext<'_>, _sco: ScoPacket, _sink: UnarySink<Empty>) { Loading @@ -40,17 +59,36 @@ impl HciHalFacade for HciHalFacadeService { unimplemented!() } fn stream_events(&mut self, _ctx: RpcContext<'_>, _: Empty, _sink: ServerStreamingSink<Event>) { unimplemented!() fn stream_events( &mut self, _ctx: RpcContext<'_>, _: Empty, mut sink: ServerStreamingSink<Event>, ) { let evt_rx = self.evt_rx.clone(); self.rt.spawn(async move { while let Some(event) = evt_rx.lock().await.recv().await { let mut output = Event::default(); output.set_payload(event.to_vec()); sink.send((output, WriteFlags::default())).await.unwrap(); } }); } fn stream_acl( &mut self, _ctx: RpcContext<'_>, _: Empty, _sink: ServerStreamingSink<AclPacket>, mut sink: ServerStreamingSink<AclPacket>, ) { unimplemented!() let acl_rx = self.acl_rx.clone(); self.rt.spawn(async move { while let Some(acl) = acl_rx.lock().await.recv().await { let mut output = AclPacket::default(); output.set_payload(acl.to_vec()); sink.send((output, WriteFlags::default())).await.unwrap(); } }); } fn stream_sco( Loading system/gd/rust/hal/src/lib.rs +26 −3 Original line number Diff line number Diff line //! HCI Hardware Abstraction Layer //! Supports sending HCI commands to the HAL and receving //! HCI events from the HAL pub mod rootcanal_hal; pub mod facade; pub mod rootcanal_hal; use thiserror::Error; use tokio::sync::mpsc; use bt_packet::{HciCommand, HciEvent}; use bt_packet::{HciCommand, HciEvent, RawPacket}; /// H4 packet header size const H4_HEADER_SIZE: usize = 1; Loading @@ -20,6 +20,10 @@ pub struct HalExports { pub cmd_tx: mpsc::UnboundedSender<HciCommand>, /// Receive end of a channel used to receive HCI events pub evt_rx: mpsc::UnboundedReceiver<HciEvent>, /// Transmit end of a channel used to send ACL data pub acl_tx: mpsc::UnboundedSender<RawPacket>, /// Receive end of a channel used to receive ACL data pub acl_rx: mpsc::UnboundedReceiver<RawPacket>, } /// HCI HAL Loading @@ -29,6 +33,10 @@ pub struct Hal { pub cmd_rx: mpsc::UnboundedReceiver<HciCommand>, /// Transmit end of a channel used to send HCI events pub evt_tx: mpsc::UnboundedSender<HciEvent>, /// Receive end of a channel used to send ACL data pub acl_rx: mpsc::UnboundedReceiver<RawPacket>, /// Transmit end of a channel used to receive ACL data pub acl_tx: mpsc::UnboundedSender<RawPacket>, } impl Hal { Loading @@ -36,7 +44,22 @@ impl Hal { pub fn new() -> (HalExports, Self) { let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let (evt_tx, evt_rx) = mpsc::unbounded_channel(); (HalExports { cmd_tx, evt_rx }, Self { cmd_rx, evt_tx }) let (acl_down_tx, acl_down_rx) = mpsc::unbounded_channel(); let (acl_up_tx, acl_up_rx) = mpsc::unbounded_channel(); ( HalExports { cmd_tx, evt_rx, acl_tx: acl_down_tx, acl_rx: acl_up_rx, }, Self { cmd_rx, evt_tx, acl_rx: acl_down_rx, acl_tx: acl_up_tx, }, ) } } Loading system/gd/rust/hal/src/rootcanal_hal.rs +42 −12 Original line number Diff line number Diff line Loading @@ -2,19 +2,19 @@ //! This connects to "rootcanal" which provides a simulated //! Bluetooth chip as well as a simulated environment. use bytes::{BufMut, BytesMut}; use bytes::{BufMut, Bytes, BytesMut}; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpStream; use tokio::select; use tokio::runtime::Runtime; use futures::stream::StreamExt; use tokio::sync::mpsc; use bt_packet::{HciCommand, HciEvent, HciPacketHeaderSize, HciPacketType}; use bt_packet::{HciCommand, HciEvent, HciPacketHeaderSize, HciPacketType, RawPacket}; use std::sync::Arc; Loading @@ -30,7 +30,10 @@ pub struct RootcanalConfig { impl RootcanalConfig { /// Create a rootcanal config pub fn new(port: u16, server_address: &str) -> Self { Self { port, server_address: String::from(server_address) } Self { port, server_address: String::from(server_address), } } } Loading @@ -40,7 +43,11 @@ pub struct RootcanalHal; impl RootcanalHal { /// Send HCI events received from the HAL to the HCI layer async fn dispatch_incoming<R>(evt_tx: mpsc::UnboundedSender<HciEvent>, reader: R) -> Result<()> async fn dispatch_incoming<R>( evt_tx: mpsc::UnboundedSender<HciEvent>, acl_tx: mpsc::UnboundedSender<RawPacket>, reader: R, ) -> Result<()> where R: AsyncReadExt + Unpin, { Loading @@ -58,6 +65,8 @@ impl RootcanalHal { header.unsplit(payload); if h4_type[0] == HciPacketType::Event as u8 { evt_tx.send(header.freeze()).unwrap(); } else if h4_type[0] == HciPacketType::Acl as u8 { acl_tx.send(header.freeze()).unwrap(); } } } Loading @@ -65,17 +74,38 @@ impl RootcanalHal { /// Send commands received from the HCI later to rootcanal async fn dispatch_outgoing<W>( mut cmd_rx: mpsc::UnboundedReceiver<HciCommand>, mut acl_rx: mpsc::UnboundedReceiver<RawPacket>, mut writer: W, ) -> Result<()> where W: AsyncWriteExt + Unpin, { while let Some(next_cmd) = cmd_rx.next().await { let mut command = BytesMut::with_capacity(next_cmd.len() + 1); command.put_u8(HciPacketType::Command as u8); command.extend(next_cmd); writer.write_all(&command[..]).await?; loop { select! { Some(cmd) = cmd_rx.recv() => { Self::write_with_type(&mut writer, HciPacketType::Command, cmd).await?; } Some(acl) = acl_rx.recv() => { Self::write_with_type(&mut writer, HciPacketType::Acl, acl).await?; } else => { break; } } } Ok(()) } async fn write_with_type<W>(writer: &mut W, t: HciPacketType, b: Bytes) -> Result<()> where W: AsyncWriteExt + Unpin, { let mut data = BytesMut::with_capacity(b.len() + 1); data.put_u8(t as u8); data.extend(b); writer.write_all(&data[..]).await?; Ok(()) } Loading @@ -87,8 +117,8 @@ impl RootcanalHal { let stream = TcpStream::connect(&socket_addr).await?; let (reader, writer) = stream.into_split(); rt.spawn(Self::dispatch_incoming(hal.evt_tx, reader)); rt.spawn(Self::dispatch_outgoing(hal.cmd_rx, writer)); rt.spawn(Self::dispatch_incoming(hal.evt_tx, hal.acl_tx, reader)); rt.spawn(Self::dispatch_outgoing(hal.cmd_rx, hal.acl_rx, writer)); Ok(hal_exports) } } Loading
system/gd/rust/facade/src/lib.rs +1 −1 Original line number Diff line number Diff line Loading @@ -152,7 +152,7 @@ impl FacadeServiceManager { let mut services = Vec::new(); match req.get_module_under_test() { BluetoothModule::HAL => { services.push(HciHalFacadeService::create(Arc::clone(&rt))); services.push(HciHalFacadeService::create(hal_exports, Arc::clone(&rt))); } BluetoothModule::HCI => { let hci_exports = Hci::start(hal_exports, Arc::clone(&rt)); Loading
system/gd/rust/hal/src/facade.rs +48 −10 Original line number Diff line number Diff line Loading @@ -5,31 +5,50 @@ use bt_hal_proto::facade::*; use bt_hal_proto::facade_grpc::{create_hci_hal_facade, HciHalFacade}; use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::sync::Mutex; use futures::sink::SinkExt; use grpcio::*; use std::sync::Arc; use crate::HalExports; use bt_packet::{HciCommand, HciEvent, RawPacket}; /// HCI HAL facade service #[derive(Clone)] pub struct HciHalFacadeService { rt: Arc<Runtime>, cmd_tx: mpsc::UnboundedSender<HciCommand>, evt_rx: Arc<Mutex<mpsc::UnboundedReceiver<HciEvent>>>, acl_tx: mpsc::UnboundedSender<RawPacket>, acl_rx: Arc<Mutex<mpsc::UnboundedReceiver<HciEvent>>>, } impl HciHalFacadeService { /// Create a new instance of HCI HAL facade service pub fn create(rt: Arc<Runtime>) -> grpcio::Service { create_hci_hal_facade(Self { rt }) pub fn create(hal_exports: HalExports, rt: Arc<Runtime>) -> grpcio::Service { create_hci_hal_facade(Self { rt, cmd_tx: hal_exports.cmd_tx, evt_rx: Arc::new(Mutex::new(hal_exports.evt_rx)), acl_tx: hal_exports.acl_tx, acl_rx: Arc::new(Mutex::new(hal_exports.acl_rx)), }) } } impl HciHalFacade for HciHalFacadeService { fn send_command(&mut self, _ctx: RpcContext<'_>, _cmd: Command, _sink: UnarySink<Empty>) { unimplemented!() fn send_command(&mut self, _ctx: RpcContext<'_>, mut cmd: Command, sink: UnarySink<Empty>) { self.cmd_tx.send(cmd.take_payload().into()).unwrap(); sink.success(Empty::default()); } fn send_acl(&mut self, _ctx: RpcContext<'_>, _acl: AclPacket, _sink: UnarySink<Empty>) { unimplemented!() fn send_acl(&mut self, _ctx: RpcContext<'_>, mut acl: AclPacket, sink: UnarySink<Empty>) { self.acl_tx.send(acl.take_payload().into()).unwrap(); sink.success(Empty::default()); } fn send_sco(&mut self, _ctx: RpcContext<'_>, _sco: ScoPacket, _sink: UnarySink<Empty>) { Loading @@ -40,17 +59,36 @@ impl HciHalFacade for HciHalFacadeService { unimplemented!() } fn stream_events(&mut self, _ctx: RpcContext<'_>, _: Empty, _sink: ServerStreamingSink<Event>) { unimplemented!() fn stream_events( &mut self, _ctx: RpcContext<'_>, _: Empty, mut sink: ServerStreamingSink<Event>, ) { let evt_rx = self.evt_rx.clone(); self.rt.spawn(async move { while let Some(event) = evt_rx.lock().await.recv().await { let mut output = Event::default(); output.set_payload(event.to_vec()); sink.send((output, WriteFlags::default())).await.unwrap(); } }); } fn stream_acl( &mut self, _ctx: RpcContext<'_>, _: Empty, _sink: ServerStreamingSink<AclPacket>, mut sink: ServerStreamingSink<AclPacket>, ) { unimplemented!() let acl_rx = self.acl_rx.clone(); self.rt.spawn(async move { while let Some(acl) = acl_rx.lock().await.recv().await { let mut output = AclPacket::default(); output.set_payload(acl.to_vec()); sink.send((output, WriteFlags::default())).await.unwrap(); } }); } fn stream_sco( Loading
system/gd/rust/hal/src/lib.rs +26 −3 Original line number Diff line number Diff line //! HCI Hardware Abstraction Layer //! Supports sending HCI commands to the HAL and receving //! HCI events from the HAL pub mod rootcanal_hal; pub mod facade; pub mod rootcanal_hal; use thiserror::Error; use tokio::sync::mpsc; use bt_packet::{HciCommand, HciEvent}; use bt_packet::{HciCommand, HciEvent, RawPacket}; /// H4 packet header size const H4_HEADER_SIZE: usize = 1; Loading @@ -20,6 +20,10 @@ pub struct HalExports { pub cmd_tx: mpsc::UnboundedSender<HciCommand>, /// Receive end of a channel used to receive HCI events pub evt_rx: mpsc::UnboundedReceiver<HciEvent>, /// Transmit end of a channel used to send ACL data pub acl_tx: mpsc::UnboundedSender<RawPacket>, /// Receive end of a channel used to receive ACL data pub acl_rx: mpsc::UnboundedReceiver<RawPacket>, } /// HCI HAL Loading @@ -29,6 +33,10 @@ pub struct Hal { pub cmd_rx: mpsc::UnboundedReceiver<HciCommand>, /// Transmit end of a channel used to send HCI events pub evt_tx: mpsc::UnboundedSender<HciEvent>, /// Receive end of a channel used to send ACL data pub acl_rx: mpsc::UnboundedReceiver<RawPacket>, /// Transmit end of a channel used to receive ACL data pub acl_tx: mpsc::UnboundedSender<RawPacket>, } impl Hal { Loading @@ -36,7 +44,22 @@ impl Hal { pub fn new() -> (HalExports, Self) { let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let (evt_tx, evt_rx) = mpsc::unbounded_channel(); (HalExports { cmd_tx, evt_rx }, Self { cmd_rx, evt_tx }) let (acl_down_tx, acl_down_rx) = mpsc::unbounded_channel(); let (acl_up_tx, acl_up_rx) = mpsc::unbounded_channel(); ( HalExports { cmd_tx, evt_rx, acl_tx: acl_down_tx, acl_rx: acl_up_rx, }, Self { cmd_rx, evt_tx, acl_rx: acl_down_rx, acl_tx: acl_up_tx, }, ) } } Loading
system/gd/rust/hal/src/rootcanal_hal.rs +42 −12 Original line number Diff line number Diff line Loading @@ -2,19 +2,19 @@ //! This connects to "rootcanal" which provides a simulated //! Bluetooth chip as well as a simulated environment. use bytes::{BufMut, BytesMut}; use bytes::{BufMut, Bytes, BytesMut}; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpStream; use tokio::select; use tokio::runtime::Runtime; use futures::stream::StreamExt; use tokio::sync::mpsc; use bt_packet::{HciCommand, HciEvent, HciPacketHeaderSize, HciPacketType}; use bt_packet::{HciCommand, HciEvent, HciPacketHeaderSize, HciPacketType, RawPacket}; use std::sync::Arc; Loading @@ -30,7 +30,10 @@ pub struct RootcanalConfig { impl RootcanalConfig { /// Create a rootcanal config pub fn new(port: u16, server_address: &str) -> Self { Self { port, server_address: String::from(server_address) } Self { port, server_address: String::from(server_address), } } } Loading @@ -40,7 +43,11 @@ pub struct RootcanalHal; impl RootcanalHal { /// Send HCI events received from the HAL to the HCI layer async fn dispatch_incoming<R>(evt_tx: mpsc::UnboundedSender<HciEvent>, reader: R) -> Result<()> async fn dispatch_incoming<R>( evt_tx: mpsc::UnboundedSender<HciEvent>, acl_tx: mpsc::UnboundedSender<RawPacket>, reader: R, ) -> Result<()> where R: AsyncReadExt + Unpin, { Loading @@ -58,6 +65,8 @@ impl RootcanalHal { header.unsplit(payload); if h4_type[0] == HciPacketType::Event as u8 { evt_tx.send(header.freeze()).unwrap(); } else if h4_type[0] == HciPacketType::Acl as u8 { acl_tx.send(header.freeze()).unwrap(); } } } Loading @@ -65,17 +74,38 @@ impl RootcanalHal { /// Send commands received from the HCI later to rootcanal async fn dispatch_outgoing<W>( mut cmd_rx: mpsc::UnboundedReceiver<HciCommand>, mut acl_rx: mpsc::UnboundedReceiver<RawPacket>, mut writer: W, ) -> Result<()> where W: AsyncWriteExt + Unpin, { while let Some(next_cmd) = cmd_rx.next().await { let mut command = BytesMut::with_capacity(next_cmd.len() + 1); command.put_u8(HciPacketType::Command as u8); command.extend(next_cmd); writer.write_all(&command[..]).await?; loop { select! { Some(cmd) = cmd_rx.recv() => { Self::write_with_type(&mut writer, HciPacketType::Command, cmd).await?; } Some(acl) = acl_rx.recv() => { Self::write_with_type(&mut writer, HciPacketType::Acl, acl).await?; } else => { break; } } } Ok(()) } async fn write_with_type<W>(writer: &mut W, t: HciPacketType, b: Bytes) -> Result<()> where W: AsyncWriteExt + Unpin, { let mut data = BytesMut::with_capacity(b.len() + 1); data.put_u8(t as u8); data.extend(b); writer.write_all(&data[..]).await?; Ok(()) } Loading @@ -87,8 +117,8 @@ impl RootcanalHal { let stream = TcpStream::connect(&socket_addr).await?; let (reader, writer) = stream.into_split(); rt.spawn(Self::dispatch_incoming(hal.evt_tx, reader)); rt.spawn(Self::dispatch_outgoing(hal.cmd_rx, writer)); rt.spawn(Self::dispatch_incoming(hal.evt_tx, hal.acl_tx, reader)); rt.spawn(Self::dispatch_outgoing(hal.cmd_rx, hal.acl_rx, writer)); Ok(hal_exports) } }