Loading system/gd/rust/linux/stack/src/bluetooth.rs +4 −23 Original line number Diff line number Diff line Loading @@ -52,7 +52,7 @@ use crate::bluetooth_media::{BluetoothMedia, IBluetoothMedia, MediaActions, LEA_ use crate::callbacks::Callbacks; use crate::socket_manager::SocketActions; use crate::uuid::{Profile, UuidHelper}; use crate::{APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode}; use crate::{make_message_dispatcher, APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode}; pub(crate) const FLOSS_VER: u16 = 0x0001; const DEFAULT_DISCOVERY_TIMEOUT_MS: u64 = 12800; Loading Loading @@ -858,26 +858,14 @@ impl Bluetooth { pub fn init_profiles(&mut self) { self.bluetooth_gatt.lock().unwrap().enable(true); let sdptx = self.tx.clone(); self.sdp = Some(Sdp::new(&self.intf.lock().unwrap())); self.sdp.as_mut().unwrap().initialize(SdpCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = sdptx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::Sdp(cb)).await; }); }), dispatch: make_message_dispatcher(self.tx.clone(), Message::Sdp), }); let hhtx = self.tx.clone(); self.hh = Some(HidHost::new(&self.intf.lock().unwrap())); self.hh.as_mut().unwrap().initialize(HHCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = hhtx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::HidHost(cb)).await; }); }), dispatch: make_message_dispatcher(self.tx.clone(), Message::HidHost), }); let allowed_profiles = self.bluetooth_admin.lock().unwrap().get_allowed_services(); Loading Loading @@ -1533,14 +1521,7 @@ pub(crate) trait BtifSdpCallbacks { } pub fn get_bt_dispatcher(tx: Sender<Message>) -> BaseCallbacksDispatcher { BaseCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::Base(cb)).await; }); }), } BaseCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::Base) } } impl BtifBluetoothCallbacks for Bluetooth { Loading system/gd/rust/linux/stack/src/bluetooth_gatt.rs +8 −62 Original line number Diff line number Diff line Loading @@ -14,7 +14,6 @@ use bt_topshim::profiles::gatt::{ GattStatus, LePhy, MsftAdvMonitor, MsftAdvMonitorAddress, MsftAdvMonitorPattern, }; use bt_topshim::sysprop; use bt_topshim::topstack; use bt_utils::adv_parser; use bt_utils::array_utils; Loading @@ -25,13 +24,13 @@ use crate::bluetooth_adv::{ BtifGattAdvCallbacks, IAdvertisingSetCallback, PeriodicAdvertisingParameters, }; use crate::callbacks::Callbacks; use crate::{APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode}; use crate::{make_message_dispatcher, APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode}; use log::{info, warn}; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::cast::{FromPrimitive, ToPrimitive}; use rand::rngs::SmallRng; use rand::{RngCore, SeedableRng}; use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet}; use std::convert::{TryFrom, TryInto}; use std::sync::{Arc, Mutex, MutexGuard}; use tokio::sync::mpsc::Sender; Loading Loading @@ -1490,76 +1489,23 @@ impl BluetoothGatt { pub fn init_profiles(&mut self, tx: Sender<Message>, api_tx: Sender<APIMessage>) { self.gatt = Gatt::new(&self.intf.lock().unwrap()).map(|gatt| Arc::new(Mutex::new(gatt))); // TODO(b/353643607): Make this dispatch_queue design more general for all profiles. let tx_clone = tx.clone(); let async_mutex = Arc::new(tokio::sync::Mutex::new(())); let dispatch_queue = Arc::new(Mutex::new(VecDeque::new())); let gatt_client_callbacks_dispatcher = GattClientCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); let async_mutex = async_mutex.clone(); let dispatch_queue = dispatch_queue.clone(); // Enqueue the callbacks at the synchronized block to ensure the order. dispatch_queue.lock().unwrap().push_back(cb); topstack::get_runtime().spawn(async move { // Acquire the lock first to ensure |pop_front| and |tx_clone.send| not // interrupted by the other async threads. let _guard = async_mutex.lock().await; // Consume exactly one callback. let cb = dispatch_queue.lock().unwrap().pop_front().unwrap(); let _ = tx_clone.send(Message::GattClient(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::GattClient), }; let tx_clone = tx.clone(); let gatt_server_callbacks_dispatcher = GattServerCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); topstack::get_runtime().spawn(async move { let _ = tx_clone.send(Message::GattServer(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::GattServer), }; let tx_clone = tx.clone(); let gatt_scanner_callbacks_dispatcher = GattScannerCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); topstack::get_runtime().spawn(async move { let _ = tx_clone.send(Message::LeScanner(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::LeScanner), }; let tx_clone = tx.clone(); let gatt_scanner_inband_callbacks_dispatcher = GattScannerInbandCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); topstack::get_runtime().spawn(async move { let _ = tx_clone.send(Message::LeScannerInband(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::LeScannerInband), }; let tx_clone = tx.clone(); let gatt_adv_inband_callbacks_dispatcher = GattAdvInbandCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); topstack::get_runtime().spawn(async move { let _ = tx_clone.send(Message::LeAdvInband(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::LeAdvInband), }; let tx_clone = tx.clone(); let gatt_adv_callbacks_dispatcher = GattAdvCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); topstack::get_runtime().spawn(async move { let _ = tx_clone.send(Message::LeAdv(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::LeAdv), }; self.gatt.as_ref().unwrap().lock().unwrap().initialize( Loading system/gd/rust/linux/stack/src/bluetooth_media.rs +7 −45 Original line number Diff line number Diff line Loading @@ -63,7 +63,7 @@ use crate::bluetooth::{Bluetooth, BluetoothDevice, IBluetooth}; use crate::callbacks::Callbacks; use crate::uuid; use crate::uuid::{Profile, UuidHelper}; use crate::{Message, RPCProxy}; use crate::{make_message_dispatcher, Message, RPCProxy}; use num_derive::FromPrimitive; Loading Loading @@ -3110,69 +3110,31 @@ impl BluetoothMedia { } fn get_a2dp_dispatcher(tx: Sender<Message>) -> A2dpCallbacksDispatcher { A2dpCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::A2dp(cb)).await; }); }), } A2dpCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::A2dp) } } fn get_avrcp_dispatcher(tx: Sender<Message>) -> AvrcpCallbacksDispatcher { AvrcpCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::Avrcp(cb)).await; }); }), } AvrcpCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::Avrcp) } } fn get_hfp_dispatcher(tx: Sender<Message>) -> HfpCallbacksDispatcher { HfpCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::Hfp(cb)).await; }); }), } HfpCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::Hfp) } } fn get_le_audio_dispatcher(tx: Sender<Message>) -> LeAudioClientCallbacksDispatcher { LeAudioClientCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::LeAudioClient(cb)).await; }); }), dispatch: make_message_dispatcher(tx, Message::LeAudioClient), } } fn get_vc_dispatcher(tx: Sender<Message>) -> VolumeControlCallbacksDispatcher { VolumeControlCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::VolumeControl(cb)).await; }); }), dispatch: make_message_dispatcher(tx, Message::VolumeControl), } } fn get_csis_dispatcher(tx: Sender<Message>) -> CsisClientCallbacksDispatcher { CsisClientCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::CsisClient(cb)).await; }); }), } CsisClientCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::CsisClient) } } impl IBluetoothMedia for BluetoothMedia { Loading system/gd/rust/linux/stack/src/lib.rs +43 −0 Original line number Diff line number Diff line Loading @@ -175,6 +175,49 @@ pub enum Message { GattClientDisconnected(RawAddress), } /// Returns a callable object that dispatches a BTIF callback to Message /// /// The returned object would make sure the order of how the callbacks arrive the same as how they /// goes to Message. /// /// Example /// ```ignore /// // Create a dispatcher in btstack /// let gatt_client_callbacks_dispatcher = topshim::gatt::GattClientCallbacksDispatcher { /// dispatch: make_message_dispatcher(tx.clone(), Message::GattClient), /// }; /// /// // Register the dispatcher to topshim /// bt_topshim::topstack::get_dispatchers() /// .lock() /// .unwrap() /// .set::<topshim::gatt::GattClientCb>(Arc::new(Mutex::new(gatt_client_callbacks_dispatcher))) /// ``` pub(crate) fn make_message_dispatcher<F, Cb>(tx: Sender<Message>, f: F) -> Box<dyn Fn(Cb) + Send> where Cb: Send + 'static, F: Fn(Cb) -> Message + Send + Copy + 'static, { let async_mutex = Arc::new(tokio::sync::Mutex::new(())); let dispatch_queue = Arc::new(Mutex::new(std::collections::VecDeque::new())); Box::new(move |cb| { let tx = tx.clone(); let async_mutex = async_mutex.clone(); let dispatch_queue = dispatch_queue.clone(); // Enqueue the callbacks at the synchronized block to ensure the order. dispatch_queue.lock().unwrap().push_back(cb); bt_topshim::topstack::get_runtime().spawn(async move { // Acquire the lock first to ensure |pop_front| and |tx.send| not // interrupted by the other async threads. let _guard = async_mutex.lock().await; // Consume exactly one callback. let cb = dispatch_queue.lock().unwrap().pop_front().unwrap(); let _ = tx.send(f(cb)).await; }); }) } pub enum BluetoothAPI { Adapter, Battery, Loading Loading
system/gd/rust/linux/stack/src/bluetooth.rs +4 −23 Original line number Diff line number Diff line Loading @@ -52,7 +52,7 @@ use crate::bluetooth_media::{BluetoothMedia, IBluetoothMedia, MediaActions, LEA_ use crate::callbacks::Callbacks; use crate::socket_manager::SocketActions; use crate::uuid::{Profile, UuidHelper}; use crate::{APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode}; use crate::{make_message_dispatcher, APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode}; pub(crate) const FLOSS_VER: u16 = 0x0001; const DEFAULT_DISCOVERY_TIMEOUT_MS: u64 = 12800; Loading Loading @@ -858,26 +858,14 @@ impl Bluetooth { pub fn init_profiles(&mut self) { self.bluetooth_gatt.lock().unwrap().enable(true); let sdptx = self.tx.clone(); self.sdp = Some(Sdp::new(&self.intf.lock().unwrap())); self.sdp.as_mut().unwrap().initialize(SdpCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = sdptx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::Sdp(cb)).await; }); }), dispatch: make_message_dispatcher(self.tx.clone(), Message::Sdp), }); let hhtx = self.tx.clone(); self.hh = Some(HidHost::new(&self.intf.lock().unwrap())); self.hh.as_mut().unwrap().initialize(HHCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = hhtx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::HidHost(cb)).await; }); }), dispatch: make_message_dispatcher(self.tx.clone(), Message::HidHost), }); let allowed_profiles = self.bluetooth_admin.lock().unwrap().get_allowed_services(); Loading Loading @@ -1533,14 +1521,7 @@ pub(crate) trait BtifSdpCallbacks { } pub fn get_bt_dispatcher(tx: Sender<Message>) -> BaseCallbacksDispatcher { BaseCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::Base(cb)).await; }); }), } BaseCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::Base) } } impl BtifBluetoothCallbacks for Bluetooth { Loading
system/gd/rust/linux/stack/src/bluetooth_gatt.rs +8 −62 Original line number Diff line number Diff line Loading @@ -14,7 +14,6 @@ use bt_topshim::profiles::gatt::{ GattStatus, LePhy, MsftAdvMonitor, MsftAdvMonitorAddress, MsftAdvMonitorPattern, }; use bt_topshim::sysprop; use bt_topshim::topstack; use bt_utils::adv_parser; use bt_utils::array_utils; Loading @@ -25,13 +24,13 @@ use crate::bluetooth_adv::{ BtifGattAdvCallbacks, IAdvertisingSetCallback, PeriodicAdvertisingParameters, }; use crate::callbacks::Callbacks; use crate::{APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode}; use crate::{make_message_dispatcher, APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode}; use log::{info, warn}; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::cast::{FromPrimitive, ToPrimitive}; use rand::rngs::SmallRng; use rand::{RngCore, SeedableRng}; use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet}; use std::convert::{TryFrom, TryInto}; use std::sync::{Arc, Mutex, MutexGuard}; use tokio::sync::mpsc::Sender; Loading Loading @@ -1490,76 +1489,23 @@ impl BluetoothGatt { pub fn init_profiles(&mut self, tx: Sender<Message>, api_tx: Sender<APIMessage>) { self.gatt = Gatt::new(&self.intf.lock().unwrap()).map(|gatt| Arc::new(Mutex::new(gatt))); // TODO(b/353643607): Make this dispatch_queue design more general for all profiles. let tx_clone = tx.clone(); let async_mutex = Arc::new(tokio::sync::Mutex::new(())); let dispatch_queue = Arc::new(Mutex::new(VecDeque::new())); let gatt_client_callbacks_dispatcher = GattClientCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); let async_mutex = async_mutex.clone(); let dispatch_queue = dispatch_queue.clone(); // Enqueue the callbacks at the synchronized block to ensure the order. dispatch_queue.lock().unwrap().push_back(cb); topstack::get_runtime().spawn(async move { // Acquire the lock first to ensure |pop_front| and |tx_clone.send| not // interrupted by the other async threads. let _guard = async_mutex.lock().await; // Consume exactly one callback. let cb = dispatch_queue.lock().unwrap().pop_front().unwrap(); let _ = tx_clone.send(Message::GattClient(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::GattClient), }; let tx_clone = tx.clone(); let gatt_server_callbacks_dispatcher = GattServerCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); topstack::get_runtime().spawn(async move { let _ = tx_clone.send(Message::GattServer(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::GattServer), }; let tx_clone = tx.clone(); let gatt_scanner_callbacks_dispatcher = GattScannerCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); topstack::get_runtime().spawn(async move { let _ = tx_clone.send(Message::LeScanner(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::LeScanner), }; let tx_clone = tx.clone(); let gatt_scanner_inband_callbacks_dispatcher = GattScannerInbandCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); topstack::get_runtime().spawn(async move { let _ = tx_clone.send(Message::LeScannerInband(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::LeScannerInband), }; let tx_clone = tx.clone(); let gatt_adv_inband_callbacks_dispatcher = GattAdvInbandCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); topstack::get_runtime().spawn(async move { let _ = tx_clone.send(Message::LeAdvInband(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::LeAdvInband), }; let tx_clone = tx.clone(); let gatt_adv_callbacks_dispatcher = GattAdvCallbacksDispatcher { dispatch: Box::new(move |cb| { let tx_clone = tx_clone.clone(); topstack::get_runtime().spawn(async move { let _ = tx_clone.send(Message::LeAdv(cb)).await; }); }), dispatch: make_message_dispatcher(tx.clone(), Message::LeAdv), }; self.gatt.as_ref().unwrap().lock().unwrap().initialize( Loading
system/gd/rust/linux/stack/src/bluetooth_media.rs +7 −45 Original line number Diff line number Diff line Loading @@ -63,7 +63,7 @@ use crate::bluetooth::{Bluetooth, BluetoothDevice, IBluetooth}; use crate::callbacks::Callbacks; use crate::uuid; use crate::uuid::{Profile, UuidHelper}; use crate::{Message, RPCProxy}; use crate::{make_message_dispatcher, Message, RPCProxy}; use num_derive::FromPrimitive; Loading Loading @@ -3110,69 +3110,31 @@ impl BluetoothMedia { } fn get_a2dp_dispatcher(tx: Sender<Message>) -> A2dpCallbacksDispatcher { A2dpCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::A2dp(cb)).await; }); }), } A2dpCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::A2dp) } } fn get_avrcp_dispatcher(tx: Sender<Message>) -> AvrcpCallbacksDispatcher { AvrcpCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::Avrcp(cb)).await; }); }), } AvrcpCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::Avrcp) } } fn get_hfp_dispatcher(tx: Sender<Message>) -> HfpCallbacksDispatcher { HfpCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::Hfp(cb)).await; }); }), } HfpCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::Hfp) } } fn get_le_audio_dispatcher(tx: Sender<Message>) -> LeAudioClientCallbacksDispatcher { LeAudioClientCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::LeAudioClient(cb)).await; }); }), dispatch: make_message_dispatcher(tx, Message::LeAudioClient), } } fn get_vc_dispatcher(tx: Sender<Message>) -> VolumeControlCallbacksDispatcher { VolumeControlCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::VolumeControl(cb)).await; }); }), dispatch: make_message_dispatcher(tx, Message::VolumeControl), } } fn get_csis_dispatcher(tx: Sender<Message>) -> CsisClientCallbacksDispatcher { CsisClientCallbacksDispatcher { dispatch: Box::new(move |cb| { let txl = tx.clone(); topstack::get_runtime().spawn(async move { let _ = txl.send(Message::CsisClient(cb)).await; }); }), } CsisClientCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::CsisClient) } } impl IBluetoothMedia for BluetoothMedia { Loading
system/gd/rust/linux/stack/src/lib.rs +43 −0 Original line number Diff line number Diff line Loading @@ -175,6 +175,49 @@ pub enum Message { GattClientDisconnected(RawAddress), } /// Returns a callable object that dispatches a BTIF callback to Message /// /// The returned object would make sure the order of how the callbacks arrive the same as how they /// goes to Message. /// /// Example /// ```ignore /// // Create a dispatcher in btstack /// let gatt_client_callbacks_dispatcher = topshim::gatt::GattClientCallbacksDispatcher { /// dispatch: make_message_dispatcher(tx.clone(), Message::GattClient), /// }; /// /// // Register the dispatcher to topshim /// bt_topshim::topstack::get_dispatchers() /// .lock() /// .unwrap() /// .set::<topshim::gatt::GattClientCb>(Arc::new(Mutex::new(gatt_client_callbacks_dispatcher))) /// ``` pub(crate) fn make_message_dispatcher<F, Cb>(tx: Sender<Message>, f: F) -> Box<dyn Fn(Cb) + Send> where Cb: Send + 'static, F: Fn(Cb) -> Message + Send + Copy + 'static, { let async_mutex = Arc::new(tokio::sync::Mutex::new(())); let dispatch_queue = Arc::new(Mutex::new(std::collections::VecDeque::new())); Box::new(move |cb| { let tx = tx.clone(); let async_mutex = async_mutex.clone(); let dispatch_queue = dispatch_queue.clone(); // Enqueue the callbacks at the synchronized block to ensure the order. dispatch_queue.lock().unwrap().push_back(cb); bt_topshim::topstack::get_runtime().spawn(async move { // Acquire the lock first to ensure |pop_front| and |tx.send| not // interrupted by the other async threads. let _guard = async_mutex.lock().await; // Consume exactly one callback. let cb = dispatch_queue.lock().unwrap().pop_front().unwrap(); let _ = tx.send(f(cb)).await; }); }) } pub enum BluetoothAPI { Adapter, Battery, Loading