Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6c48322b authored by Rahul Arya's avatar Rahul Arya
Browse files

[Private GATT] Simplify datastore interface

Focus only on reads/writes, and not on connection lifecycle. Instead,
use timeouts to handle cleanup of stale callbacks.

Bug: 255880936
Test: unit
Change-Id: I7afb42ed2a2a3d98fa6e83e1ff426855d2bc3667
parent 189169e5
Loading
Loading
Loading
Loading
+0 −7
Original line number Diff line number Diff line
@@ -59,13 +59,6 @@ pub trait GattCallbacks {
/// into the GattModule
#[async_trait(?Send)]
pub trait GattDatastore {
    /// Invoked to indicate when a new connection should be tracked
    fn add_connection(&self, conn_id: ConnectionId);

    /// Invoked to indicate that a connection has closed and all
    /// pending transactions can be dropped.
    fn remove_connection(&self, conn_id: ConnectionId);

    /// Read a characteristic from the specified connection at the given handle.
    async fn read(
        &self,
+80 −64
Original line number Diff line number Diff line
use std::{cell::RefCell, collections::HashMap, rc::Rc};
use std::{cell::RefCell, collections::HashMap, rc::Rc, time::Duration};

use async_trait::async_trait;
use log::{trace, warn};
use tokio::sync::oneshot;
use log::{error, trace, warn};
use tokio::{sync::oneshot, time::timeout};

use crate::{
    gatt::{
@@ -18,6 +18,40 @@ struct PendingTransaction {
    response: oneshot::Sender<Result<AttAttributeDataChild, AttErrorCode>>,
}

#[derive(Debug)]
struct PendingTransactionWatcher {
    conn_id: ConnectionId,
    trans_id: TransactionId,
    rx: oneshot::Receiver<Result<AttAttributeDataChild, AttErrorCode>>,
}

enum PendingTransactionError {
    SenderDropped,
    Timeout,
}

impl PendingTransactionWatcher {
    /// Wait for the transaction to resolve, or to hit the timeout. If the
    /// timeout is reached, clean up state related to transaction watching.
    async fn wait(
        self,
        manager: &CallbackTransactionManager,
    ) -> Result<Result<AttAttributeDataChild, AttErrorCode>, PendingTransactionError> {
        match timeout(TIMEOUT, self.rx).await {
            Ok(Ok(result)) => Ok(result),
            Ok(Err(_)) => Err(PendingTransactionError::SenderDropped),
            Err(_) => {
                manager
                    .pending_transactions
                    .borrow_mut()
                    .pending_transactions
                    .remove(&(self.conn_id, self.trans_id));
                Err(PendingTransactionError::Timeout)
            }
        }
    }
}

/// This struct converts the asynchronus read/write operations of GattDatastore
/// into the callback-based interface expected by JNI
pub struct CallbackTransactionManager {
@@ -26,15 +60,18 @@ pub struct CallbackTransactionManager {
}

struct PendingTransactionsState {
    pending_transactions: HashMap<ConnectionId, HashMap<TransactionId, PendingTransaction>>,
    pending_transactions: HashMap<(ConnectionId, TransactionId), PendingTransaction>,
    next_transaction_id: u32,
}

/// We expect all responses to be provided within this timeout
/// It should be less than 30s, as that is the ATT timeout that causes
/// the client to disconnect.
const TIMEOUT: Duration = Duration::from_secs(15);

/// The cause of a failure to dispatch a call to send_response()
#[derive(Debug, PartialEq, Eq)]
pub enum CallbackResponseError {
    /// The ConnectionId supplied was invalid
    NonExistentConnection(ConnectionId),
    /// The TransactionId supplied was invalid
    NonExistentTransaction(TransactionId),
    /// The TransactionId was valid but has since terminated
@@ -62,8 +99,7 @@ impl CallbackTransactionManager {
        value: Result<AttAttributeDataChild, AttErrorCode>,
    ) -> Result<(), CallbackResponseError> {
        let mut pending = self.pending_transactions.borrow_mut();
        if let Some(pending_transactions) = pending.pending_transactions.get_mut(&conn_id) {
            if let Some(transaction) = pending_transactions.remove(&trans_id) {
        if let Some(transaction) = pending.pending_transactions.remove(&(conn_id, trans_id)) {
            if transaction.response.send(value).is_err() {
                Err(CallbackResponseError::ListenerHungUp(trans_id))
            } else {
@@ -73,72 +109,46 @@ impl CallbackTransactionManager {
        } else {
            Err(CallbackResponseError::NonExistentTransaction(trans_id))
        }
        } else {
            Err(CallbackResponseError::NonExistentConnection(conn_id))
        }
    }
}

impl PendingTransactionsState {
    fn start_new_transaction(
        &mut self,
        conn_id: ConnectionId,
    ) -> Result<
        (TransactionId, oneshot::Receiver<Result<AttAttributeDataChild, AttErrorCode>>),
        AttErrorCode,
    > {
    fn start_new_transaction(&mut self, conn_id: ConnectionId) -> PendingTransactionWatcher {
        let trans_id = TransactionId(self.next_transaction_id);
        self.next_transaction_id += 1;
        self.next_transaction_id = self.next_transaction_id.wrapping_add(1);

        let (tx, rx) = oneshot::channel();
        let pending_transactions = self.pending_transactions.get_mut(&conn_id);

        if let Some(pending_transactions) = pending_transactions {
            trace!("starting transaction {trans_id:?}");
            pending_transactions.insert(trans_id, PendingTransaction { response: tx });
        } else {
            warn!("dropping read request attempt for transaction {trans_id:?} since connection is down - this error code should not be sent to the peer");
            return Err(AttErrorCode::UNLIKELY_ERROR);
        }

        Ok((trans_id, rx))
        self.pending_transactions.insert((conn_id, trans_id), PendingTransaction { response: tx });
        PendingTransactionWatcher { conn_id, trans_id, rx }
    }
}

#[async_trait(?Send)]
impl GattDatastore for CallbackTransactionManager {
    fn add_connection(&self, conn_id: ConnectionId) {
        let old_conn = self
            .pending_transactions
            .borrow_mut()
            .pending_transactions
            .insert(conn_id, HashMap::new());
        assert!(old_conn.is_none(), "Connection ID reuse, something has gone wrong")
    }

    fn remove_connection(&self, conn_id: ConnectionId) {
        let old_conn = self.pending_transactions.borrow_mut().pending_transactions.remove(&conn_id);
        assert!(old_conn.is_some(), "Received unexpected connection ID, something has gone wrong")
    }

    async fn read(
        &self,
        conn_id: ConnectionId,
        handle: AttHandle,
        attr_type: AttributeBackingType,
    ) -> Result<AttAttributeDataChild, AttErrorCode> {
        let (trans_id, rx) =
            self.pending_transactions.borrow_mut().start_new_transaction(conn_id)?;
        let pending_transaction =
            self.pending_transactions.borrow_mut().start_new_transaction(conn_id);
        let trans_id = pending_transaction.trans_id;

        self.callbacks.on_server_read(conn_id, trans_id, handle, attr_type, 0, false);

        if let Ok(value) = rx.await {
            value
        } else {
            warn!("sender side of {trans_id:?} dropped while handling request - most likely this response will not be sent over the air");
        match pending_transaction.wait(self).await {
            Ok(value) => value,
            Err(PendingTransactionError::SenderDropped) => {
                warn!("sender side of {trans_id:?} dropped / timed out while handling request - most likely this response will not be sent over the air");
                Err(AttErrorCode::UNLIKELY_ERROR)
            }
            Err(PendingTransactionError::Timeout) => {
                warn!("no response received from Java after timeout - returning UNLIKELY_ERROR");
                Err(AttErrorCode::UNLIKELY_ERROR)
            }
        }
    }

    async fn write(
        &self,
@@ -147,17 +157,23 @@ impl GattDatastore for CallbackTransactionManager {
        attr_type: AttributeBackingType,
        data: AttAttributeDataView<'_>,
    ) -> Result<(), AttErrorCode> {
        let (trans_id, rx) =
            self.pending_transactions.borrow_mut().start_new_transaction(conn_id)?;
        let pending_transaction =
            self.pending_transactions.borrow_mut().start_new_transaction(conn_id);
        let trans_id = pending_transaction.trans_id;

        self.callbacks.on_server_write(conn_id, trans_id, handle, attr_type, 0, true, false, data);

        if let Ok(value) = rx.await {
            value.map(|_| ()) // the data passed back is irrelevant for write
        match pending_transaction.wait(self).await {
            Ok(value) => value.map(|_| ()), // the data passed back is irrelevant for write
            // requests
        } else {
            warn!("sender side of {trans_id:?} dropped while handling request - most likely this response will not be sent over the air");
            Err(PendingTransactionError::SenderDropped) => {
                error!("the CallbackTransactionManager dropped the sender TX without sending it");
                Err(AttErrorCode::UNLIKELY_ERROR)
            }
            Err(PendingTransactionError::Timeout) => {
                warn!("no response received from Java after timeout - returning UNLIKELY_ERROR");
                Err(AttErrorCode::UNLIKELY_ERROR)
            }
        }
    }
}
+0 −12
Original line number Diff line number Diff line
@@ -32,10 +32,6 @@ impl MockDatastore {
/// Events representing calls to GattDatastore
#[derive(Debug)]
pub enum MockDatastoreEvents {
    /// A new connection was created
    AddConnection(ConnectionId),
    /// A connection was removed
    RemoveConnection(ConnectionId),
    /// A characteristic was read on a given handle. The oneshot is used to
    /// return the value read.
    Read(
@@ -57,14 +53,6 @@ pub enum MockDatastoreEvents {

#[async_trait(?Send)]
impl GattDatastore for MockDatastore {
    fn add_connection(&self, conn_id: ConnectionId) {
        self.0.send(MockDatastoreEvents::AddConnection(conn_id)).unwrap();
    }

    fn remove_connection(&self, conn_id: ConnectionId) {
        self.0.send(MockDatastoreEvents::RemoveConnection(conn_id)).unwrap();
    }

    async fn read(
        &self,
        conn_id: ConnectionId,
+0 −2
Original line number Diff line number Diff line
@@ -55,7 +55,6 @@ impl GattModule {
                conn_id.get_server_id(),
            );
        };
        self.datastore.add_connection(conn_id);
        let transport = self.transport.clone();
        self.connection_bearers.insert(
            conn_id,
@@ -71,7 +70,6 @@ impl GattModule {
    pub fn on_le_disconnect(&mut self, conn_id: ConnectionId) {
        info!("disconnected conn_id {conn_id:?}");
        self.connection_bearers.remove(&conn_id);
        self.datastore.remove_connection(conn_id);
    }

    /// Register a new GATT service on a given server
+0 −3
Original line number Diff line number Diff line
@@ -218,7 +218,6 @@ mod test {
    use crate::{
        core::{shared_box::SharedBox, uuid::Uuid},
        gatt::{
            callbacks::GattDatastore,
            ffi::AttributeBackingType,
            ids::ConnectionId,
            mocks::mock_datastore::{MockDatastore, MockDatastoreEvents},
@@ -320,8 +319,6 @@ mod test {
        // two characteristics in the database
        let (datastore, mut data_rx) = MockDatastore::new();
        let datastore = Rc::new(datastore);
        datastore.add_connection(CONN_ID);
        data_rx.blocking_recv().unwrap(); // ignore AddConnection() event
        let db = SharedBox::new(GattDatabase::new(datastore));
        db.add_service_with_handles(GattServiceWithHandle {
            handle: AttHandle(1),
Loading