Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5fdd6760 authored by Hsin-chen Chuang's avatar Hsin-chen Chuang
Browse files

floss: Guarantee the execution order of all BTIF callbacks

This applies the GATT client fix to all BTIF callbacks.

Bug: 353643607
Tag: #floss
Test: mmm packages/modules/Bluetooth
Test: manual test Sphero Mini and Sphero SPRK+
Flag: EXEMPT, Floss-only changes
Change-Id: I241d0623d47756918e6ed17190a1caa01956d351
parent 98239581
Loading
Loading
Loading
Loading
+4 −23
Original line number Original line Diff line number Diff line
@@ -52,7 +52,7 @@ use crate::bluetooth_media::{BluetoothMedia, IBluetoothMedia, MediaActions, LEA_
use crate::callbacks::Callbacks;
use crate::callbacks::Callbacks;
use crate::socket_manager::SocketActions;
use crate::socket_manager::SocketActions;
use crate::uuid::{Profile, UuidHelper};
use crate::uuid::{Profile, UuidHelper};
use crate::{APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode};
use crate::{make_message_dispatcher, APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode};


pub(crate) const FLOSS_VER: u16 = 0x0001;
pub(crate) const FLOSS_VER: u16 = 0x0001;
const DEFAULT_DISCOVERY_TIMEOUT_MS: u64 = 12800;
const DEFAULT_DISCOVERY_TIMEOUT_MS: u64 = 12800;
@@ -858,26 +858,14 @@ impl Bluetooth {
    pub fn init_profiles(&mut self) {
    pub fn init_profiles(&mut self) {
        self.bluetooth_gatt.lock().unwrap().enable(true);
        self.bluetooth_gatt.lock().unwrap().enable(true);


        let sdptx = self.tx.clone();
        self.sdp = Some(Sdp::new(&self.intf.lock().unwrap()));
        self.sdp = Some(Sdp::new(&self.intf.lock().unwrap()));
        self.sdp.as_mut().unwrap().initialize(SdpCallbacksDispatcher {
        self.sdp.as_mut().unwrap().initialize(SdpCallbacksDispatcher {
            dispatch: Box::new(move |cb| {
            dispatch: make_message_dispatcher(self.tx.clone(), Message::Sdp),
                let txl = sdptx.clone();
                topstack::get_runtime().spawn(async move {
                    let _ = txl.send(Message::Sdp(cb)).await;
                });
            }),
        });
        });


        let hhtx = self.tx.clone();
        self.hh = Some(HidHost::new(&self.intf.lock().unwrap()));
        self.hh = Some(HidHost::new(&self.intf.lock().unwrap()));
        self.hh.as_mut().unwrap().initialize(HHCallbacksDispatcher {
        self.hh.as_mut().unwrap().initialize(HHCallbacksDispatcher {
            dispatch: Box::new(move |cb| {
            dispatch: make_message_dispatcher(self.tx.clone(), Message::HidHost),
                let txl = hhtx.clone();
                topstack::get_runtime().spawn(async move {
                    let _ = txl.send(Message::HidHost(cb)).await;
                });
            }),
        });
        });


        let allowed_profiles = self.bluetooth_admin.lock().unwrap().get_allowed_services();
        let allowed_profiles = self.bluetooth_admin.lock().unwrap().get_allowed_services();
@@ -1533,14 +1521,7 @@ pub(crate) trait BtifSdpCallbacks {
}
}


pub fn get_bt_dispatcher(tx: Sender<Message>) -> BaseCallbacksDispatcher {
pub fn get_bt_dispatcher(tx: Sender<Message>) -> BaseCallbacksDispatcher {
    BaseCallbacksDispatcher {
    BaseCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::Base) }
        dispatch: Box::new(move |cb| {
            let txl = tx.clone();
            topstack::get_runtime().spawn(async move {
                let _ = txl.send(Message::Base(cb)).await;
            });
        }),
    }
}
}


impl BtifBluetoothCallbacks for Bluetooth {
impl BtifBluetoothCallbacks for Bluetooth {
+8 −62
Original line number Original line Diff line number Diff line
@@ -14,7 +14,6 @@ use bt_topshim::profiles::gatt::{
    GattStatus, LePhy, MsftAdvMonitor, MsftAdvMonitorAddress, MsftAdvMonitorPattern,
    GattStatus, LePhy, MsftAdvMonitor, MsftAdvMonitorAddress, MsftAdvMonitorPattern,
};
};
use bt_topshim::sysprop;
use bt_topshim::sysprop;
use bt_topshim::topstack;
use bt_utils::adv_parser;
use bt_utils::adv_parser;
use bt_utils::array_utils;
use bt_utils::array_utils;


@@ -25,13 +24,13 @@ use crate::bluetooth_adv::{
    BtifGattAdvCallbacks, IAdvertisingSetCallback, PeriodicAdvertisingParameters,
    BtifGattAdvCallbacks, IAdvertisingSetCallback, PeriodicAdvertisingParameters,
};
};
use crate::callbacks::Callbacks;
use crate::callbacks::Callbacks;
use crate::{APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode};
use crate::{make_message_dispatcher, APIMessage, BluetoothAPI, Message, RPCProxy, SuspendMode};
use log::{info, warn};
use log::{info, warn};
use num_derive::{FromPrimitive, ToPrimitive};
use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::cast::{FromPrimitive, ToPrimitive};
use num_traits::cast::{FromPrimitive, ToPrimitive};
use rand::rngs::SmallRng;
use rand::rngs::SmallRng;
use rand::{RngCore, SeedableRng};
use rand::{RngCore, SeedableRng};
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};
use std::convert::{TryFrom, TryInto};
use std::convert::{TryFrom, TryInto};
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{Arc, Mutex, MutexGuard};
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::Sender;
@@ -1490,76 +1489,23 @@ impl BluetoothGatt {
    pub fn init_profiles(&mut self, tx: Sender<Message>, api_tx: Sender<APIMessage>) {
    pub fn init_profiles(&mut self, tx: Sender<Message>, api_tx: Sender<APIMessage>) {
        self.gatt = Gatt::new(&self.intf.lock().unwrap()).map(|gatt| Arc::new(Mutex::new(gatt)));
        self.gatt = Gatt::new(&self.intf.lock().unwrap()).map(|gatt| Arc::new(Mutex::new(gatt)));


        // TODO(b/353643607): Make this dispatch_queue design more general for all profiles.
        let tx_clone = tx.clone();
        let async_mutex = Arc::new(tokio::sync::Mutex::new(()));
        let dispatch_queue = Arc::new(Mutex::new(VecDeque::new()));
        let gatt_client_callbacks_dispatcher = GattClientCallbacksDispatcher {
        let gatt_client_callbacks_dispatcher = GattClientCallbacksDispatcher {
            dispatch: Box::new(move |cb| {
            dispatch: make_message_dispatcher(tx.clone(), Message::GattClient),
                let tx_clone = tx_clone.clone();
                let async_mutex = async_mutex.clone();
                let dispatch_queue = dispatch_queue.clone();
                // Enqueue the callbacks at the synchronized block to ensure the order.
                dispatch_queue.lock().unwrap().push_back(cb);
                topstack::get_runtime().spawn(async move {
                    // Acquire the lock first to ensure |pop_front| and |tx_clone.send| not
                    // interrupted by the other async threads.
                    let _guard = async_mutex.lock().await;
                    // Consume exactly one callback.
                    let cb = dispatch_queue.lock().unwrap().pop_front().unwrap();
                    let _ = tx_clone.send(Message::GattClient(cb)).await;
                });
            }),
        };
        };

        let tx_clone = tx.clone();
        let gatt_server_callbacks_dispatcher = GattServerCallbacksDispatcher {
        let gatt_server_callbacks_dispatcher = GattServerCallbacksDispatcher {
            dispatch: Box::new(move |cb| {
            dispatch: make_message_dispatcher(tx.clone(), Message::GattServer),
                let tx_clone = tx_clone.clone();
                topstack::get_runtime().spawn(async move {
                    let _ = tx_clone.send(Message::GattServer(cb)).await;
                });
            }),
        };
        };

        let tx_clone = tx.clone();
        let gatt_scanner_callbacks_dispatcher = GattScannerCallbacksDispatcher {
        let gatt_scanner_callbacks_dispatcher = GattScannerCallbacksDispatcher {
            dispatch: Box::new(move |cb| {
            dispatch: make_message_dispatcher(tx.clone(), Message::LeScanner),
                let tx_clone = tx_clone.clone();
                topstack::get_runtime().spawn(async move {
                    let _ = tx_clone.send(Message::LeScanner(cb)).await;
                });
            }),
        };
        };

        let tx_clone = tx.clone();
        let gatt_scanner_inband_callbacks_dispatcher = GattScannerInbandCallbacksDispatcher {
        let gatt_scanner_inband_callbacks_dispatcher = GattScannerInbandCallbacksDispatcher {
            dispatch: Box::new(move |cb| {
            dispatch: make_message_dispatcher(tx.clone(), Message::LeScannerInband),
                let tx_clone = tx_clone.clone();
                topstack::get_runtime().spawn(async move {
                    let _ = tx_clone.send(Message::LeScannerInband(cb)).await;
                });
            }),
        };
        };

        let tx_clone = tx.clone();
        let gatt_adv_inband_callbacks_dispatcher = GattAdvInbandCallbacksDispatcher {
        let gatt_adv_inband_callbacks_dispatcher = GattAdvInbandCallbacksDispatcher {
            dispatch: Box::new(move |cb| {
            dispatch: make_message_dispatcher(tx.clone(), Message::LeAdvInband),
                let tx_clone = tx_clone.clone();
                topstack::get_runtime().spawn(async move {
                    let _ = tx_clone.send(Message::LeAdvInband(cb)).await;
                });
            }),
        };
        };

        let tx_clone = tx.clone();
        let gatt_adv_callbacks_dispatcher = GattAdvCallbacksDispatcher {
        let gatt_adv_callbacks_dispatcher = GattAdvCallbacksDispatcher {
            dispatch: Box::new(move |cb| {
            dispatch: make_message_dispatcher(tx.clone(), Message::LeAdv),
                let tx_clone = tx_clone.clone();
                topstack::get_runtime().spawn(async move {
                    let _ = tx_clone.send(Message::LeAdv(cb)).await;
                });
            }),
        };
        };


        self.gatt.as_ref().unwrap().lock().unwrap().initialize(
        self.gatt.as_ref().unwrap().lock().unwrap().initialize(
+7 −45
Original line number Original line Diff line number Diff line
@@ -63,7 +63,7 @@ use crate::bluetooth::{Bluetooth, BluetoothDevice, IBluetooth};
use crate::callbacks::Callbacks;
use crate::callbacks::Callbacks;
use crate::uuid;
use crate::uuid;
use crate::uuid::{Profile, UuidHelper};
use crate::uuid::{Profile, UuidHelper};
use crate::{Message, RPCProxy};
use crate::{make_message_dispatcher, Message, RPCProxy};


use num_derive::FromPrimitive;
use num_derive::FromPrimitive;


@@ -3110,69 +3110,31 @@ impl BluetoothMedia {
}
}


fn get_a2dp_dispatcher(tx: Sender<Message>) -> A2dpCallbacksDispatcher {
fn get_a2dp_dispatcher(tx: Sender<Message>) -> A2dpCallbacksDispatcher {
    A2dpCallbacksDispatcher {
    A2dpCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::A2dp) }
        dispatch: Box::new(move |cb| {
            let txl = tx.clone();
            topstack::get_runtime().spawn(async move {
                let _ = txl.send(Message::A2dp(cb)).await;
            });
        }),
    }
}
}


fn get_avrcp_dispatcher(tx: Sender<Message>) -> AvrcpCallbacksDispatcher {
fn get_avrcp_dispatcher(tx: Sender<Message>) -> AvrcpCallbacksDispatcher {
    AvrcpCallbacksDispatcher {
    AvrcpCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::Avrcp) }
        dispatch: Box::new(move |cb| {
            let txl = tx.clone();
            topstack::get_runtime().spawn(async move {
                let _ = txl.send(Message::Avrcp(cb)).await;
            });
        }),
    }
}
}


fn get_hfp_dispatcher(tx: Sender<Message>) -> HfpCallbacksDispatcher {
fn get_hfp_dispatcher(tx: Sender<Message>) -> HfpCallbacksDispatcher {
    HfpCallbacksDispatcher {
    HfpCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::Hfp) }
        dispatch: Box::new(move |cb| {
            let txl = tx.clone();
            topstack::get_runtime().spawn(async move {
                let _ = txl.send(Message::Hfp(cb)).await;
            });
        }),
    }
}
}


fn get_le_audio_dispatcher(tx: Sender<Message>) -> LeAudioClientCallbacksDispatcher {
fn get_le_audio_dispatcher(tx: Sender<Message>) -> LeAudioClientCallbacksDispatcher {
    LeAudioClientCallbacksDispatcher {
    LeAudioClientCallbacksDispatcher {
        dispatch: Box::new(move |cb| {
        dispatch: make_message_dispatcher(tx, Message::LeAudioClient),
            let txl = tx.clone();
            topstack::get_runtime().spawn(async move {
                let _ = txl.send(Message::LeAudioClient(cb)).await;
            });
        }),
    }
    }
}
}


fn get_vc_dispatcher(tx: Sender<Message>) -> VolumeControlCallbacksDispatcher {
fn get_vc_dispatcher(tx: Sender<Message>) -> VolumeControlCallbacksDispatcher {
    VolumeControlCallbacksDispatcher {
    VolumeControlCallbacksDispatcher {
        dispatch: Box::new(move |cb| {
        dispatch: make_message_dispatcher(tx, Message::VolumeControl),
            let txl = tx.clone();
            topstack::get_runtime().spawn(async move {
                let _ = txl.send(Message::VolumeControl(cb)).await;
            });
        }),
    }
    }
}
}


fn get_csis_dispatcher(tx: Sender<Message>) -> CsisClientCallbacksDispatcher {
fn get_csis_dispatcher(tx: Sender<Message>) -> CsisClientCallbacksDispatcher {
    CsisClientCallbacksDispatcher {
    CsisClientCallbacksDispatcher { dispatch: make_message_dispatcher(tx, Message::CsisClient) }
        dispatch: Box::new(move |cb| {
            let txl = tx.clone();
            topstack::get_runtime().spawn(async move {
                let _ = txl.send(Message::CsisClient(cb)).await;
            });
        }),
    }
}
}


impl IBluetoothMedia for BluetoothMedia {
impl IBluetoothMedia for BluetoothMedia {
+43 −0
Original line number Original line Diff line number Diff line
@@ -175,6 +175,49 @@ pub enum Message {
    GattClientDisconnected(RawAddress),
    GattClientDisconnected(RawAddress),
}
}


/// Returns a callable object that dispatches a BTIF callback to Message
///
/// The returned object would make sure the order of how the callbacks arrive the same as how they
/// goes to Message.
///
/// Example
/// ```ignore
/// // Create a dispatcher in btstack
/// let gatt_client_callbacks_dispatcher = topshim::gatt::GattClientCallbacksDispatcher {
///     dispatch: make_message_dispatcher(tx.clone(), Message::GattClient),
/// };
///
/// // Register the dispatcher to topshim
/// bt_topshim::topstack::get_dispatchers()
///     .lock()
///     .unwrap()
///     .set::<topshim::gatt::GattClientCb>(Arc::new(Mutex::new(gatt_client_callbacks_dispatcher)))
/// ```
pub(crate) fn make_message_dispatcher<F, Cb>(tx: Sender<Message>, f: F) -> Box<dyn Fn(Cb) + Send>
where
    Cb: Send + 'static,
    F: Fn(Cb) -> Message + Send + Copy + 'static,
{
    let async_mutex = Arc::new(tokio::sync::Mutex::new(()));
    let dispatch_queue = Arc::new(Mutex::new(std::collections::VecDeque::new()));

    Box::new(move |cb| {
        let tx = tx.clone();
        let async_mutex = async_mutex.clone();
        let dispatch_queue = dispatch_queue.clone();
        // Enqueue the callbacks at the synchronized block to ensure the order.
        dispatch_queue.lock().unwrap().push_back(cb);
        bt_topshim::topstack::get_runtime().spawn(async move {
            // Acquire the lock first to ensure |pop_front| and |tx.send| not
            // interrupted by the other async threads.
            let _guard = async_mutex.lock().await;
            // Consume exactly one callback.
            let cb = dispatch_queue.lock().unwrap().pop_front().unwrap();
            let _ = tx.send(f(cb)).await;
        });
    })
}

pub enum BluetoothAPI {
pub enum BluetoothAPI {
    Adapter,
    Adapter,
    Battery,
    Battery,