Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ba462786 authored by Rahul Arya's avatar Rahul Arya
Browse files

[Connection Manager] Implement direct + background connections

The architecture is as follows. The LeAcceptlistManager is
responsible for taking a TargetState and driving le_impl to this
target state. The ConnectionAttempts is responsible for
aggregating all the requests from clients, and managing basic
business logic (resolving requests once complete, rejecting
duplicate requests, cancellation etc.). determine_target_state()
converts the aggregated set of requests into a single
TargetState.

The ConnectionManager coordinates the LeAcceptlistManager and
ConnectionAttempts and sends events to them as appropriate, while
offering a top-level API.

Bug: 272572974
Test: unit + manual direct/bg connection from nRF connect

Change-Id: Ibe56093bdccb16246337ce1162ebb43594290309
parent b53199c9
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ rust_defaults {
        "libbt_common",
        "libcxx",
        "liblog_rust",
        "libscopeguard",

        // needed to work around duplicate symbols
        // caused by bug in Soong
+1 −0
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@ paste = "*"
async-trait = "*"
tokio-test = "0.4.2"
tokio = { version = "1.23.0", features = ["macros"] }
scopeguard = "1.1.0"

[lib]
crate-type = ["rlib"]
+318 −46
Original line number Diff line number Diff line
@@ -3,27 +3,38 @@
//! avoids duplicate connections to the same devices (even with different RPAs),
//! and retries failed connections

use std::{fmt::Debug, hash::Hash, ops::Deref};
use std::{
    cell::RefCell, collections::HashSet, fmt::Debug, future::Future, hash::Hash, ops::Deref,
    time::Duration,
};

use crate::{
    core::{
        address::AddressWithType,
        shared_box::{SharedBox, WeakBox},
        shared_box::{SharedBox, WeakBox, WeakBoxRef},
    },
    gatt::ids::ServerId,
};

use self::le_manager::{
    ErrorCode, InactiveLeAclManager, LeAclManager, LeAclManagerConnectionCallbacks,
use self::{
    acceptlist_manager::{determine_target_state, LeAcceptlistManager},
    attempt_manager::{ConnectionAttempts, ConnectionMode},
    le_manager::{ErrorCode, InactiveLeAclManager, LeAclManagerConnectionCallbacks},
};

mod acceptlist_manager;
mod attempt_manager;
mod ffi;
pub mod le_manager;
mod mocks;

pub use ffi::{register_callbacks, LeAclManagerImpl, LeAclManagerShim};
use log::info;
use scopeguard::ScopeGuard;
use tokio::{task::spawn_local, time::timeout};

/// Possible errors returned when making a connection attempt
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub enum CreateConnectionFailure {
    /// This client is already making a connection of the same type
    /// to the same address.
@@ -31,14 +42,16 @@ pub enum CreateConnectionFailure {
}

/// Errors returned if a connection successfully starts but fails afterwards.
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub enum ConnectionFailure {
    /// The connection attempt was cancelled
    Cancelled,
    /// The connection completed but with an HCI error code
    Error(ErrorCode),
}

/// Errors returned if the client fails to cancel their connection attempt
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub enum CancelConnectFailure {
    /// The connection attempt does not exist
    ConnectionNotPending,
@@ -54,9 +67,11 @@ pub enum ConnectionManagerClient {
}

/// An active connection
#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct LeConnection {
    /// The address of the peer device, as reported in the connection complete event
    /// This is guaranteed to be unique across active connections, so we can implement
    /// PartialEq/Eq on this.
    pub remote_address: AddressWithType,
}

@@ -64,18 +79,29 @@ pub struct LeConnection {
/// devices on the filter accept list
#[derive(Debug)]
pub struct ConnectionManager {
    _le_manager: Box<dyn LeAclManager>,
    state: RefCell<ConnectionManagerState>,
}

#[derive(Debug)]
struct ConnectionManagerState {
    /// All pending connection attempts (unresolved direct + all background)
    attempts: ConnectionAttempts,
    /// The addresses we are currently connected to
    current_connections: HashSet<AddressWithType>,
    /// Tracks the state of the LE connect list, and updates it to drive to a
    /// specified target state
    acceptlist_manager: LeAcceptlistManager,
}

struct ConnectionManagerCallbackHandler(WeakBox<ConnectionManager>);

impl LeAclManagerConnectionCallbacks for ConnectionManagerCallbackHandler {
    fn on_le_connect_success(&self, conn: LeConnection) {
        self.with_manager(|manager| manager.on_le_connect_success(conn))
    }
const DIRECT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(
    29, /* ugly hack to avoid fighting with le_impl timeout, until I remove that timeout */
);

    fn on_le_connect_fail(&self, address: AddressWithType, status: ErrorCode) {
        self.with_manager(|manager| manager.on_le_connect_fail(address, status))
impl LeAclManagerConnectionCallbacks for ConnectionManagerCallbackHandler {
    fn on_le_connect(&self, address: AddressWithType, result: Result<LeConnection, ErrorCode>) {
        self.with_manager(|manager| manager.on_le_connect(address, result))
    }

    fn on_disconnect(&self, address: AddressWithType) {
@@ -93,67 +119,313 @@ impl ConnectionManager {
    /// Constructor
    pub fn new(le_manager: impl InactiveLeAclManager) -> SharedBox<Self> {
        SharedBox::new_cyclic(|weak| Self {
            _le_manager: Box::new(
            state: RefCell::new(ConnectionManagerState {
                attempts: ConnectionAttempts::new(),
                current_connections: HashSet::new(),
                acceptlist_manager: LeAcceptlistManager::new(
                    le_manager.register_callbacks(ConnectionManagerCallbackHandler(weak)),
                ),
            }),
        })
    }
}

    /// Start a direct connection to a peer device from a specified client.
/// Make the state of the LeAcceptlistManager consistent with the attempts tracked in ConnectionAttempts
fn reconcile_state(state: &mut ConnectionManagerState) {
    state
        .acceptlist_manager
        .drive_to_state(determine_target_state(&state.attempts.active_attempts()));
}

impl WeakBoxRef<'_, ConnectionManager> {
    /// Start a direct connection to a peer device from a specified client. If the peer
    /// is connected, immediately resolve the attempt.
    pub fn start_direct_connection(
        &self,
        _client: ConnectionManagerClient,
        _address: AddressWithType,
        client: ConnectionManagerClient,
        address: AddressWithType,
    ) -> Result<(), CreateConnectionFailure> {
        todo!()
        spawn_local(timeout(DIRECT_CONNECTION_TIMEOUT, self.direct_connection(client, address)?));
        Ok(())
    }

    /// Cancel direct connection attempts from this client to the specified address.
    pub fn cancel_direct_connection(
    /// Start a direct connection to a peer device from a specified client.
    ///
    /// # Cancellation Safety
    /// If this future is dropped, the connection attempt will be cancelled. It can also be cancelled
    /// from the separate API ConnectionManager#cancel_connection.
    fn direct_connection(
        &self,
        _client: ConnectionManagerClient,
        _address: AddressWithType,
    ) -> Result<(), CancelConnectFailure> {
        todo!()
        client: ConnectionManagerClient,
        address: AddressWithType,
    ) -> Result<
        impl Future<Output = Result<LeConnection, ConnectionFailure>>,
        CreateConnectionFailure,
    > {
        let mut state = self.state.borrow_mut();

        // if connected, this is a no-op
        let attempt_and_guard = if state.current_connections.contains(&address) {
            None
        } else {
            let pending_attempt = state.attempts.register_direct_connection(client, address)?;
            let attempt_id = pending_attempt.id;
            reconcile_state(&mut state);
            Some((
                pending_attempt,
                scopeguard::guard(self.downgrade(), move |this| {
                    // remove the attempt after we are cancelled
                    this.with(|this| {
                        this.map(|this| {
                            info!("Cancelling attempt {attempt_id:?}");
                            let mut state = this.state.borrow_mut();
                            state.attempts.cancel_attempt_with_id(attempt_id);
                            reconcile_state(&mut state);
                        })
                    });
                }),
            ))
        };

        Ok(async move {
            let Some((attempt, guard)) = attempt_and_guard else {
                // if we did not make an attempt, the connection must be ready
                return Ok(LeConnection { remote_address: address })
            };
            // otherwise, wait until the attempt resolves
            let ret = attempt.await;
            // defuse scopeguard (no need to cancel now)
            ScopeGuard::into_inner(guard);
            ret
        })
    }
}

impl ConnectionManager {
    /// Start a background connection to a peer device with given parameters from a specified client.
    pub fn add_background_connection(
        &self,
        _client: ConnectionManagerClient,
        _address: AddressWithType,
        client: ConnectionManagerClient,
        address: AddressWithType,
    ) -> Result<(), CreateConnectionFailure> {
        todo!()
        let mut state = self.state.borrow_mut();
        state.attempts.register_background_connection(client, address)?;
        reconcile_state(&mut state);
        Ok(())
    }

    /// Cancel background connection attempts from this client to the specified address.
    pub fn remove_background_connection(
    /// Cancel connection attempt from this client to the specified address with the specified mode.
    pub fn cancel_connection(
        &self,
        _client: ConnectionManagerClient,
        _address: AddressWithType,
        client: ConnectionManagerClient,
        address: AddressWithType,
        mode: ConnectionMode,
    ) -> Result<(), CancelConnectFailure> {
        todo!()
        let mut state = self.state.borrow_mut();
        state.attempts.cancel_attempt(client, address, mode)?;
        reconcile_state(&mut state);
        Ok(())
    }

    /// Cancel all connection attempts to this address
    pub fn cancel_unconditionally(&self, _address: AddressWithType) {
        todo!()
    pub fn cancel_unconditionally(&self, address: AddressWithType) {
        let mut state = self.state.borrow_mut();
        state.attempts.remove_unconditionally(address);
        reconcile_state(&mut state);
    }

    /// Cancel all connection attempts from this client
    pub fn remove_client(&self, _client: ConnectionManagerClient) {
        todo!()
    pub fn remove_client(&self, client: ConnectionManagerClient) {
        let mut state = self.state.borrow_mut();
        state.attempts.remove_client(client);
        reconcile_state(&mut state);
    }

    fn on_le_connect_success(&self, _conn: LeConnection) {
        // TODO(aryarahul)
    fn on_le_connect(&self, address: AddressWithType, result: Result<LeConnection, ErrorCode>) {
        let mut state = self.state.borrow_mut();
        // record this connection while it exists
        state.current_connections.insert(address);
        // all completed connections remove the address from the direct list
        state.acceptlist_manager.on_connect_complete(address);
        // invoke any pending callbacks, update set of attempts
        state.attempts.process_connection(address, result);
        // update the acceptlist
        reconcile_state(&mut state);
    }

    fn on_le_connect_fail(&self, _address: AddressWithType, _status: ErrorCode) {
        // TODO(aryarahul)
    fn on_disconnect(&self, address: AddressWithType) {
        let mut state = self.state.borrow_mut();
        state.current_connections.remove(&address);
        reconcile_state(&mut state);
    }
}

#[cfg(test)]
mod test {
    use crate::{core::address::AddressType, utils::task::block_on_locally};

    use super::{mocks::mock_le_manager::MockLeAclManager, *};

    const CLIENT_1: ConnectionManagerClient = ConnectionManagerClient::GattClient(1);
    const CLIENT_2: ConnectionManagerClient = ConnectionManagerClient::GattClient(2);

    const ADDRESS_1: AddressWithType =
        AddressWithType { address: [1, 2, 3, 4, 5, 6], address_type: AddressType::Public };

    const ERROR: ErrorCode = ErrorCode(1);

    #[test]
    fn test_single_direct_connection() {
        block_on_locally(async {
            // arrange
            let mock_le_manager = MockLeAclManager::new();
            let connection_manager = ConnectionManager::new(mock_le_manager.clone());

            // act: initiate a direct connection
            connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();

            // assert: the direct connection is pending
            assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Direct));
            assert_eq!(mock_le_manager.current_acceptlist().len(), 1);
            assert!(mock_le_manager.current_acceptlist().contains(&ADDRESS_1));
        });
    }

    #[test]
    fn test_failed_direct_connection() {
        block_on_locally(async {
            // arrange: one pending direct connection
            let mock_le_manager = MockLeAclManager::new();
            let connection_manager = ConnectionManager::new(mock_le_manager.clone());
            connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();

            // act: the connection attempt fails
            mock_le_manager.on_le_connect(ADDRESS_1, ERROR);

            // assert: the direct connection has stopped
            assert_eq!(mock_le_manager.current_connection_mode(), None);
        });
    }

    #[test]
    fn test_single_background_connection() {
        block_on_locally(async {
            // arrange
            let mock_le_manager = MockLeAclManager::new();
            let connection_manager = ConnectionManager::new(mock_le_manager.clone());

            // act: initiate a background connection
            connection_manager.as_ref().add_background_connection(CLIENT_1, ADDRESS_1).unwrap();

            // assert: the background connection is pending
            assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Background));
            assert_eq!(mock_le_manager.current_acceptlist().len(), 1);
            assert!(mock_le_manager.current_acceptlist().contains(&ADDRESS_1));
        });
    }

    #[test]
    fn test_resolved_connection() {
        block_on_locally(async {
            // arrange
            let mock_le_manager = MockLeAclManager::new();
            let connection_manager = ConnectionManager::new(mock_le_manager.clone());

            // act: initiate a direct connection, that succeeds
            connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
            mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);

            // assert: no connection is pending
            assert_eq!(mock_le_manager.current_connection_mode(), None);
        });
    }

    #[test]
    fn test_resolved_background_connection() {
        block_on_locally(async {
            // arrange
            let mock_le_manager = MockLeAclManager::new();
            let connection_manager = ConnectionManager::new(mock_le_manager.clone());

            // act: initiate a background connection, that succeeds
            connection_manager.as_ref().add_background_connection(CLIENT_1, ADDRESS_1).unwrap();
            mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);

            // assert: no connection is pending
            assert_eq!(mock_le_manager.current_connection_mode(), None);
        });
    }

    #[test]
    fn test_resolved_direct_connection_after_disconnect() {
        block_on_locally(async {
            // arrange
            let mock_le_manager = MockLeAclManager::new();
            let connection_manager = ConnectionManager::new(mock_le_manager.clone());

            // act: initiate a direct connection, that succeeds, then disconnects
            connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
            mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);
            mock_le_manager.on_le_disconnect(ADDRESS_1);

            // assert: no connection is pending
            assert_eq!(mock_le_manager.current_connection_mode(), None);
        });
    }

    #[test]
    fn test_resolved_background_connection_after_disconnect() {
        block_on_locally(async {
            // arrange
            let mock_le_manager = MockLeAclManager::new();
            let connection_manager = ConnectionManager::new(mock_le_manager.clone());

            // act: initiate a background connection, that succeeds, then disconnects
            connection_manager.as_ref().add_background_connection(CLIENT_1, ADDRESS_1).unwrap();
            mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);
            mock_le_manager.on_le_disconnect(ADDRESS_1);

            // assert: the background connection has resumed
            assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Background));
        });
    }

    #[test]
    fn test_direct_connection_timeout() {
        block_on_locally(async {
            // arrange: a pending direct connection
            let mock_le_manager = MockLeAclManager::new();
            let connection_manager = ConnectionManager::new(mock_le_manager.clone());
            connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();

            // act: let it timeout
            tokio::time::sleep(DIRECT_CONNECTION_TIMEOUT).await;
            // go forward one tick to ensure all timers are fired
            // (since we are using fake time, this is not a race condition)
            tokio::time::sleep(Duration::from_millis(1)).await;

            // assert: it is cancelled and we are idle again
            assert_eq!(mock_le_manager.current_connection_mode(), None);
        });
    }

    #[test]
    fn test_stacked_direct_connections_timeout() {
        block_on_locally(async {
            // arrange
            let mock_le_manager = MockLeAclManager::new();
            let connection_manager = ConnectionManager::new(mock_le_manager.clone());

            // act: start a direct connection
            connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
            tokio::time::sleep(DIRECT_CONNECTION_TIMEOUT * 3 / 4).await;
            // act: after some time, start a second one
            connection_manager.as_ref().start_direct_connection(CLIENT_2, ADDRESS_1).unwrap();
            // act: wait for the first one (but not the second) to time out
            tokio::time::sleep(DIRECT_CONNECTION_TIMEOUT * 3 / 4).await;

    fn on_disconnect(&self, _address: AddressWithType) {
        // TODO(aryarahul)
            // assert: we are still doing a direct connection
            assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Direct));
        });
    }
}
+366 −0

File added.

Preview size limit exceeded, changes collapsed.

+501 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading