Loading system/gd/rust/facade/src/lib.rs +39 −34 Original line number Diff line number Diff line Loading @@ -81,6 +81,42 @@ struct FacadeServiceManager { lifecycle_tx: Sender<LifecycleCommand>, } struct FacadeServer { server: Server, stack: Stack, } impl FacadeServer { async fn start(stack: Stack, req: StartStackRequest, grpc_port: u16) -> Self { let mut services = Vec::new(); match req.get_module_under_test() { BluetoothModule::HAL => { services.push(stack.get_grpc::<HciHalFacadeService>().await); } BluetoothModule::HCI => { services.push(stack.get_grpc::<HciLayerFacadeService>().await); } _ => unimplemented!(), } let env = Arc::new(Environment::new(2)); let mut builder = ServerBuilder::new(env).bind("0.0.0.0", grpc_port); for service in services { builder = builder.register_service(service); } let mut server = builder.build().unwrap(); server.start(); Self { server, stack } } async fn stop(&mut self) { self.server.shutdown().await.unwrap(); self.stack.stop().await; } } /// Result type type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; Loading @@ -89,18 +125,18 @@ impl FacadeServiceManager { let (tx, mut rx) = channel::<LifecycleCommand>(1); let local_rt = rt.clone(); rt.spawn(async move { let mut server: Option<Server> = None; let mut server: Option<FacadeServer> = None; while let Some(cmd) = rx.recv().await { match cmd { LifecycleCommand::Start { req, done } => { let stack = Stack::new(local_rt.clone()).await; stack.set_rootcanal_port(rootcanal_port).await; server = Some(Self::start_internal(&stack, req, grpc_port).await); server = Some(FacadeServer::start(stack, req, grpc_port).await); done.send(()).unwrap(); } LifecycleCommand::Stop { done } => { if let Some(s) = &mut server { block_on(s.shutdown()).unwrap(); block_on(s.stop()); server = None; } done.send(()).unwrap(); Loading Loading @@ -129,35 +165,4 @@ impl FacadeServiceManager { rx.await?; Ok(()) } async fn start_internal( stack: &Stack, req: StartStackRequest, grpc_port: u16, ) -> Server { let mut services = Vec::new(); match req.get_module_under_test() { BluetoothModule::HAL => { services.push(stack.get_grpc::<HciHalFacadeService>().await); } BluetoothModule::HCI => { services.push(stack.get_grpc::<HciLayerFacadeService>().await); } _ => unimplemented!(), } FacadeServiceManager::start_server(services, grpc_port) } fn start_server(services: Vec<grpcio::Service>, grpc_port: u16) -> Server { let env = Arc::new(Environment::new(2)); let mut builder = ServerBuilder::new(env).bind("0.0.0.0", grpc_port); for service in services { builder = builder.register_service(service); } let mut server = builder.build().unwrap(); server.start(); server } } system/gd/rust/gddi-macros/src/lib.rs +15 −4 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ use proc_macro::TokenStream; use quote::{format_ident, quote}; use syn::parse::{Parse, ParseStream, Result}; use syn::punctuated::Punctuated; use syn::{braced, parse, parse_macro_input, FnArg, Ident, ItemFn, Token, Type}; use syn::{braced, parse, parse_macro_input, FnArg, Ident, ItemFn, Token, Type, DeriveInput, Path}; /// Defines a provider function, with generated helper that implicitly fetches argument instances from the registry #[proc_macro_attribute] Loading Loading @@ -46,12 +46,12 @@ pub fn provides(_attr: TokenStream, item: TokenStream) -> TokenStream { struct ModuleDef { name: Ident, providers: Punctuated<ProviderDef, Token![,]>, submodules: Punctuated<Ident, Token![,]>, submodules: Punctuated<Path, Token![,]>, } enum ModuleEntry { Providers(Punctuated<ProviderDef, Token![,]>), Submodules(Punctuated<Ident, Token![,]>), Submodules(Punctuated<Path, Token![,]>), } struct ProviderDef { Loading Loading @@ -116,7 +116,7 @@ impl Parse for ModuleEntry { let entries; braced!(entries in input); Ok(ModuleEntry::Submodules( entries.parse_terminated(Ident::parse)?, entries.parse_terminated(Path::parse)?, )) } keyword => { Loading Loading @@ -149,3 +149,14 @@ pub fn module(item: TokenStream) -> TokenStream { }; emitted_code.into() } /// Emits a default implementation for Stoppable that does nothing; #[proc_macro_derive(Stoppable)] pub fn derive_nop_stop(item: TokenStream) -> TokenStream { let input = parse_macro_input!(item as DeriveInput); let ident = input.ident; let emitted_code = quote! { impl gddi::Stoppable for #ident {} }; emitted_code.into() } system/gd/rust/gddi/src/lib.rs +24 −2 Original line number Diff line number Diff line Loading @@ -7,13 +7,19 @@ use std::pin::Pin; use std::sync::Arc; use tokio::sync::Mutex; pub use gddi_macros::{module, provides}; pub use gddi_macros::{module, provides, Stoppable}; type InstanceBox = Box<dyn Any + Send + Sync>; /// A box around a future for a provider that is safe to send between threads pub type ProviderFutureBox = Box<dyn Future<Output = Box<dyn Any>> + Send + Sync>; type ProviderFnBox = Box<dyn Fn(Arc<Registry>) -> Pin<ProviderFutureBox> + Send + Sync>; /// Called to stop an injected object pub trait Stoppable { /// Stop and close all resources fn stop(&self) {} } /// Builder for Registry pub struct RegistryBuilder { providers: HashMap<TypeId, Provider>, Loading @@ -23,6 +29,7 @@ pub struct RegistryBuilder { pub struct Registry { providers: Arc<Mutex<HashMap<TypeId, Provider>>>, instances: Arc<Mutex<HashMap<TypeId, InstanceBox>>>, start_order: Arc<Mutex<Vec<Box<dyn Stoppable + Send + Sync>>>>, } #[derive(Clone)] Loading Loading @@ -65,13 +72,14 @@ impl RegistryBuilder { Registry { providers: Arc::new(Mutex::new(self.providers)), instances: Arc::new(Mutex::new(HashMap::new())), start_order: Arc::new(Mutex::new(Vec::new())), } } } impl Registry { /// Gets an instance of a type, implicitly starting any dependencies if necessary pub async fn get<T: 'static + Clone + Send + Sync>(self: &Arc<Self>) -> T { pub async fn get<T: 'static + Clone + Send + Sync + Stoppable>(self: &Arc<Self>) -> T { let typeid = TypeId::of::<T>(); { let instances = self.instances.lock().await; Loading @@ -92,6 +100,9 @@ impl Registry { let mut instances = self.instances.lock().await; instances.insert(typeid, Box::new(casted.clone())); let mut start_order = self.start_order.lock().await; start_order.push(Box::new(casted.clone())); casted } Loading @@ -100,4 +111,15 @@ impl Registry { let mut instances = self.instances.lock().await; instances.insert(TypeId::of::<T>(), Box::new(obj)); } /// Stop all instances, in reverse order of start. pub async fn stop_all(self: &Arc<Self>) { let mut start_order = self.start_order.lock().await; while let Some(obj) = start_order.pop() { obj.stop(); } self.instances.lock().await.clear(); } } impl<T> Stoppable for std::sync::Arc<T> {} system/gd/rust/hal/src/facade.rs +2 −2 Original line number Diff line number Diff line Loading @@ -7,7 +7,7 @@ use bt_hal_proto::facade::*; use bt_hal_proto::facade_grpc::{create_hci_hal_facade, HciHalFacade}; use bt_packet::{HciCommand, HciEvent, RawPacket}; use futures::sink::SinkExt; use gddi::{module, provides}; use gddi::{module, provides, Stoppable}; use grpcio::*; use std::sync::Arc; use tokio::runtime::Runtime; Loading @@ -32,7 +32,7 @@ async fn provide_facade(hal_exports: HalExports, rt: Arc<Runtime>) -> HciHalFaca } /// HCI HAL facade service #[derive(Clone)] #[derive(Clone, Stoppable)] pub struct HciHalFacadeService { rt: Arc<Runtime>, cmd_tx: mpsc::UnboundedSender<HciCommand>, Loading system/gd/rust/hal/src/lib.rs +6 −11 Original line number Diff line number Diff line Loading @@ -7,17 +7,12 @@ extern crate lazy_static; pub mod facade; pub mod rootcanal_hal; #[cfg(not(target_os = "android"))] use rootcanal_hal::rootcanal_hal_module; #[cfg(target_os = "android")] mod hidl_hal; #[cfg(target_os = "android")] use hidl_hal::hidl_hal_module; use bt_packet::{HciCommand, HciEvent, RawPacket}; use facade::hal_facade_module; use gddi::module; use gddi::{module, Stoppable}; use std::sync::Arc; use thiserror::Error; use tokio::sync::{mpsc, Mutex}; Loading @@ -26,8 +21,8 @@ use tokio::sync::{mpsc, Mutex}; module! { hal_module, submodules { hal_facade_module, hidl_hal_module facade::hal_facade_module, hidl_hal::hidl_hal_module }, } Loading @@ -35,8 +30,8 @@ module! { module! { hal_module, submodules { hal_facade_module, rootcanal_hal_module facade::hal_facade_module, rootcanal_hal::rootcanal_hal_module }, } /// H4 packet header size Loading @@ -45,7 +40,7 @@ const H4_HEADER_SIZE: usize = 1; /// HAL interface /// This is used by the HCI module to send commands to the /// HAL and receive events from the HAL #[derive(Clone)] #[derive(Clone, Stoppable)] pub struct HalExports { /// Transmit end of a channel used to send HCI commands pub cmd_tx: mpsc::UnboundedSender<HciCommand>, Loading Loading
system/gd/rust/facade/src/lib.rs +39 −34 Original line number Diff line number Diff line Loading @@ -81,6 +81,42 @@ struct FacadeServiceManager { lifecycle_tx: Sender<LifecycleCommand>, } struct FacadeServer { server: Server, stack: Stack, } impl FacadeServer { async fn start(stack: Stack, req: StartStackRequest, grpc_port: u16) -> Self { let mut services = Vec::new(); match req.get_module_under_test() { BluetoothModule::HAL => { services.push(stack.get_grpc::<HciHalFacadeService>().await); } BluetoothModule::HCI => { services.push(stack.get_grpc::<HciLayerFacadeService>().await); } _ => unimplemented!(), } let env = Arc::new(Environment::new(2)); let mut builder = ServerBuilder::new(env).bind("0.0.0.0", grpc_port); for service in services { builder = builder.register_service(service); } let mut server = builder.build().unwrap(); server.start(); Self { server, stack } } async fn stop(&mut self) { self.server.shutdown().await.unwrap(); self.stack.stop().await; } } /// Result type type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; Loading @@ -89,18 +125,18 @@ impl FacadeServiceManager { let (tx, mut rx) = channel::<LifecycleCommand>(1); let local_rt = rt.clone(); rt.spawn(async move { let mut server: Option<Server> = None; let mut server: Option<FacadeServer> = None; while let Some(cmd) = rx.recv().await { match cmd { LifecycleCommand::Start { req, done } => { let stack = Stack::new(local_rt.clone()).await; stack.set_rootcanal_port(rootcanal_port).await; server = Some(Self::start_internal(&stack, req, grpc_port).await); server = Some(FacadeServer::start(stack, req, grpc_port).await); done.send(()).unwrap(); } LifecycleCommand::Stop { done } => { if let Some(s) = &mut server { block_on(s.shutdown()).unwrap(); block_on(s.stop()); server = None; } done.send(()).unwrap(); Loading Loading @@ -129,35 +165,4 @@ impl FacadeServiceManager { rx.await?; Ok(()) } async fn start_internal( stack: &Stack, req: StartStackRequest, grpc_port: u16, ) -> Server { let mut services = Vec::new(); match req.get_module_under_test() { BluetoothModule::HAL => { services.push(stack.get_grpc::<HciHalFacadeService>().await); } BluetoothModule::HCI => { services.push(stack.get_grpc::<HciLayerFacadeService>().await); } _ => unimplemented!(), } FacadeServiceManager::start_server(services, grpc_port) } fn start_server(services: Vec<grpcio::Service>, grpc_port: u16) -> Server { let env = Arc::new(Environment::new(2)); let mut builder = ServerBuilder::new(env).bind("0.0.0.0", grpc_port); for service in services { builder = builder.register_service(service); } let mut server = builder.build().unwrap(); server.start(); server } }
system/gd/rust/gddi-macros/src/lib.rs +15 −4 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ use proc_macro::TokenStream; use quote::{format_ident, quote}; use syn::parse::{Parse, ParseStream, Result}; use syn::punctuated::Punctuated; use syn::{braced, parse, parse_macro_input, FnArg, Ident, ItemFn, Token, Type}; use syn::{braced, parse, parse_macro_input, FnArg, Ident, ItemFn, Token, Type, DeriveInput, Path}; /// Defines a provider function, with generated helper that implicitly fetches argument instances from the registry #[proc_macro_attribute] Loading Loading @@ -46,12 +46,12 @@ pub fn provides(_attr: TokenStream, item: TokenStream) -> TokenStream { struct ModuleDef { name: Ident, providers: Punctuated<ProviderDef, Token![,]>, submodules: Punctuated<Ident, Token![,]>, submodules: Punctuated<Path, Token![,]>, } enum ModuleEntry { Providers(Punctuated<ProviderDef, Token![,]>), Submodules(Punctuated<Ident, Token![,]>), Submodules(Punctuated<Path, Token![,]>), } struct ProviderDef { Loading Loading @@ -116,7 +116,7 @@ impl Parse for ModuleEntry { let entries; braced!(entries in input); Ok(ModuleEntry::Submodules( entries.parse_terminated(Ident::parse)?, entries.parse_terminated(Path::parse)?, )) } keyword => { Loading Loading @@ -149,3 +149,14 @@ pub fn module(item: TokenStream) -> TokenStream { }; emitted_code.into() } /// Emits a default implementation for Stoppable that does nothing; #[proc_macro_derive(Stoppable)] pub fn derive_nop_stop(item: TokenStream) -> TokenStream { let input = parse_macro_input!(item as DeriveInput); let ident = input.ident; let emitted_code = quote! { impl gddi::Stoppable for #ident {} }; emitted_code.into() }
system/gd/rust/gddi/src/lib.rs +24 −2 Original line number Diff line number Diff line Loading @@ -7,13 +7,19 @@ use std::pin::Pin; use std::sync::Arc; use tokio::sync::Mutex; pub use gddi_macros::{module, provides}; pub use gddi_macros::{module, provides, Stoppable}; type InstanceBox = Box<dyn Any + Send + Sync>; /// A box around a future for a provider that is safe to send between threads pub type ProviderFutureBox = Box<dyn Future<Output = Box<dyn Any>> + Send + Sync>; type ProviderFnBox = Box<dyn Fn(Arc<Registry>) -> Pin<ProviderFutureBox> + Send + Sync>; /// Called to stop an injected object pub trait Stoppable { /// Stop and close all resources fn stop(&self) {} } /// Builder for Registry pub struct RegistryBuilder { providers: HashMap<TypeId, Provider>, Loading @@ -23,6 +29,7 @@ pub struct RegistryBuilder { pub struct Registry { providers: Arc<Mutex<HashMap<TypeId, Provider>>>, instances: Arc<Mutex<HashMap<TypeId, InstanceBox>>>, start_order: Arc<Mutex<Vec<Box<dyn Stoppable + Send + Sync>>>>, } #[derive(Clone)] Loading Loading @@ -65,13 +72,14 @@ impl RegistryBuilder { Registry { providers: Arc::new(Mutex::new(self.providers)), instances: Arc::new(Mutex::new(HashMap::new())), start_order: Arc::new(Mutex::new(Vec::new())), } } } impl Registry { /// Gets an instance of a type, implicitly starting any dependencies if necessary pub async fn get<T: 'static + Clone + Send + Sync>(self: &Arc<Self>) -> T { pub async fn get<T: 'static + Clone + Send + Sync + Stoppable>(self: &Arc<Self>) -> T { let typeid = TypeId::of::<T>(); { let instances = self.instances.lock().await; Loading @@ -92,6 +100,9 @@ impl Registry { let mut instances = self.instances.lock().await; instances.insert(typeid, Box::new(casted.clone())); let mut start_order = self.start_order.lock().await; start_order.push(Box::new(casted.clone())); casted } Loading @@ -100,4 +111,15 @@ impl Registry { let mut instances = self.instances.lock().await; instances.insert(TypeId::of::<T>(), Box::new(obj)); } /// Stop all instances, in reverse order of start. pub async fn stop_all(self: &Arc<Self>) { let mut start_order = self.start_order.lock().await; while let Some(obj) = start_order.pop() { obj.stop(); } self.instances.lock().await.clear(); } } impl<T> Stoppable for std::sync::Arc<T> {}
system/gd/rust/hal/src/facade.rs +2 −2 Original line number Diff line number Diff line Loading @@ -7,7 +7,7 @@ use bt_hal_proto::facade::*; use bt_hal_proto::facade_grpc::{create_hci_hal_facade, HciHalFacade}; use bt_packet::{HciCommand, HciEvent, RawPacket}; use futures::sink::SinkExt; use gddi::{module, provides}; use gddi::{module, provides, Stoppable}; use grpcio::*; use std::sync::Arc; use tokio::runtime::Runtime; Loading @@ -32,7 +32,7 @@ async fn provide_facade(hal_exports: HalExports, rt: Arc<Runtime>) -> HciHalFaca } /// HCI HAL facade service #[derive(Clone)] #[derive(Clone, Stoppable)] pub struct HciHalFacadeService { rt: Arc<Runtime>, cmd_tx: mpsc::UnboundedSender<HciCommand>, Loading
system/gd/rust/hal/src/lib.rs +6 −11 Original line number Diff line number Diff line Loading @@ -7,17 +7,12 @@ extern crate lazy_static; pub mod facade; pub mod rootcanal_hal; #[cfg(not(target_os = "android"))] use rootcanal_hal::rootcanal_hal_module; #[cfg(target_os = "android")] mod hidl_hal; #[cfg(target_os = "android")] use hidl_hal::hidl_hal_module; use bt_packet::{HciCommand, HciEvent, RawPacket}; use facade::hal_facade_module; use gddi::module; use gddi::{module, Stoppable}; use std::sync::Arc; use thiserror::Error; use tokio::sync::{mpsc, Mutex}; Loading @@ -26,8 +21,8 @@ use tokio::sync::{mpsc, Mutex}; module! { hal_module, submodules { hal_facade_module, hidl_hal_module facade::hal_facade_module, hidl_hal::hidl_hal_module }, } Loading @@ -35,8 +30,8 @@ module! { module! { hal_module, submodules { hal_facade_module, rootcanal_hal_module facade::hal_facade_module, rootcanal_hal::rootcanal_hal_module }, } /// H4 packet header size Loading @@ -45,7 +40,7 @@ const H4_HEADER_SIZE: usize = 1; /// HAL interface /// This is used by the HCI module to send commands to the /// HAL and receive events from the HAL #[derive(Clone)] #[derive(Clone, Stoppable)] pub struct HalExports { /// Transmit end of a channel used to send HCI commands pub cmd_tx: mpsc::UnboundedSender<HciCommand>, Loading