Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 26c1bc3f authored by Rahul Arya's avatar Rahul Arya
Browse files

[Private GATT] Add support for handle value indications

Bug: 255880936
Test: unit
Change-Id: Iafec2663928df99f2d6b5ac4efad7a9f13113871
parent a01c6738
Loading
Loading
Loading
Loading
+7 −2
Original line number Diff line number Diff line
@@ -2147,8 +2147,13 @@ static void gattServerSendIndicationNative(JNIEnv* env, jobject object,
  jbyte* array = env->GetByteArrayElements(val, 0);
  int val_len = env->GetArrayLength(val);

  if (bluetooth::gatt::is_connection_isolated(conn_id)) {
    auto data = ::rust::Slice<const uint8_t>((uint8_t*)array, val_len);
    bluetooth::gatt::send_indication(server_if, attr_handle, conn_id, data);
  } else {
    sGattIf->server->send_indication(server_if, attr_handle, conn_id,
                                     /*confirm*/ 1, (uint8_t*)array, val_len);
  }

  env->ReleaseByteArrayElements(val, array, JNI_ABORT);
}
+1 −0
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@

mod ffi;
pub mod shared_box;
pub mod shared_mutex;
pub mod uuid;

use std::{rc::Rc, thread};
+65 −0
Original line number Diff line number Diff line
//! The motivation for SharedMutex is to guard a resource without having to
//! extend its lifetime using an Rc<> (and potentially create reference cycles)

use std::{future::Future, rc::Rc, sync::Arc};

use tokio::sync::{Mutex, OwnedMutexGuard, Semaphore, TryLockError};

/// A mutex wrapping some contents of type T. When the mutex is dropped,
/// T will be dropped.
///
/// The lifetime of T will be extended only if a client currently holds
/// exclusive access to it, in which case once that client drops its
/// MutexGuard, then T will be dropped.
pub struct SharedMutex<T> {
    lock: Arc<Mutex<T>>,
    on_death: Rc<Semaphore>,
}

impl<T> SharedMutex<T> {
    /// Constructor
    pub fn new(t: T) -> Self {
        Self { lock: Arc::new(Mutex::new(t)), on_death: Rc::new(Semaphore::new(0)) }
    }

    /// Acquire exclusive access to T, or None if SharedMutex<T> is dropped
    /// while waiting to acquire. Unlike Mutex::lock, this method produces a
    /// future with 'static lifetime, so it can be awaited even if the
    /// SharedMutex<> itself is dropped.
    ///
    /// NOTE: if the lifetime of T is extended by the holder of the lock when
    /// the SharedMutex<> itself is dropped, all waiters will still
    /// instantly return None (rather than waiting for the lock to be
    /// released).
    pub fn lock(&self) -> impl Future<Output = Option<OwnedMutexGuard<T>>> {
        let mutex = self.lock.clone();
        let on_death = self.on_death.clone();

        async move {
            tokio::select! {
            biased;
              permit = on_death.acquire() => {
                drop(permit);
                None
              },
              acquired = mutex.lock_owned() => {
                Some(acquired)
              },
            }
        }
    }

    /// Synchronously acquire the lock. This similarly exhibits the
    /// lifetime-extension behavior of Self#lock().
    pub fn try_lock(&self) -> Result<OwnedMutexGuard<T>, TryLockError> {
        self.lock.clone().try_lock_owned()
    }
}

impl<T> Drop for SharedMutex<T> {
    fn drop(&mut self) {
        // mark dead, so all waiters instantly return
        // no one can start to wait after drop
        self.on_death.add_permits(Arc::strong_count(&self.lock) + 1);
    }
}
+12 −1
Original line number Diff line number Diff line
@@ -10,7 +10,10 @@ use async_trait::async_trait;

use crate::packets::{AttAttributeDataChild, AttAttributeDataView, AttErrorCode};

use super::ids::{AttHandle, ConnectionId, TransactionId};
use super::{
    ids::{AttHandle, ConnectionId, TransactionId},
    server::IndicationError,
};

/// These callbacks are expected to be made available to the GattModule from
/// JNI.
@@ -39,6 +42,14 @@ pub trait GattCallbacks {
        is_prepare: bool,
        value: AttAttributeDataView,
    );

    /// Invoked when a handle value indication transaction completes
    /// (either due to an error, link loss, or the peer confirming it)
    fn on_indication_sent_confirmation(
        &self,
        conn_id: ConnectionId,
        result: Result<(), IndicationError>,
    );
}

/// This interface is an "async" version of the above, and is passed directly
+52 −2
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ use bt_common::init_flags::{
use cxx::UniquePtr;
pub use inner::*;
use log::{error, info, warn};
use tokio::task::spawn_local;

use crate::{
    do_in_rust_thread,
@@ -21,7 +22,11 @@ use super::{
    arbiter::{self, with_arbiter},
    channel::AttTransport,
    ids::{AdvertiserId, AttHandle, ConnectionId, ServerId, TransactionId, TransportIndex},
    server::gatt_database::{AttPermissions, GattCharacteristicWithHandle, GattServiceWithHandle},
    server::{
        att_server_bearer::AttServerBearer,
        gatt_database::{AttPermissions, GattCharacteristicWithHandle, GattServiceWithHandle},
        IndicationError,
    },
    GattCallbacks,
};

@@ -71,6 +76,11 @@ mod inner {
            is_prepare: bool,
            value: &[u8],
        );

        /// This callback is invoked when an indication has been sent and the
        /// peer device has confirmed it, or if some error occurred.
        #[cxx_name = "OnIndicationSentConfirmation"]
        fn on_indication_sent_confirmation(&self, conn_id: u16, status: i32);
    }

    /// What action the arbiter should take in response to an incoming packet
@@ -132,7 +142,10 @@ mod inner {
        fn close_server(server_id: u8);
        fn add_service(server_id: u8, service_records: Vec<GattRecord>);
        fn remove_service(server_id: u8, service_handle: u16);

        // att operations
        fn send_response(server_id: u8, conn_id: u16, trans_id: u32, status: u8, value: &[u8]);
        fn send_indication(_server_id: u8, handle: u16, conn_id: u16, value: &[u8]);

        // connection
        fn is_connection_isolated(conn_id: u16) -> bool;
@@ -181,6 +194,20 @@ impl GattCallbacks for GattCallbacksImpl {
            &value.get_raw_payload().collect::<Vec<_>>(),
        );
    }

    fn on_indication_sent_confirmation(
        &self,
        conn_id: ConnectionId,
        result: Result<(), IndicationError>,
    ) {
        self.0.as_ref().unwrap().on_indication_sent_confirmation(
            conn_id.0,
            match result {
                Ok(()) => 0, // GATT_SUCCESS
                _ => 133,    // GATT_ERROR
            },
        )
    }
}

/// Implementation of AttTransport wrapping the corresponding C++ method
@@ -335,7 +362,7 @@ fn send_response(_server_id: u8, conn_id: u16, trans_id: u32, status: u8, value:
        Err(AttErrorCode::try_from(status).unwrap_or(AttErrorCode::UNLIKELY_ERROR))
    };
    do_in_rust_thread(move |modules| {
        match modules.gatt_callbacks.send_response(
        match modules.gatt_incoming_callbacks.send_response(
            ConnectionId(conn_id),
            TransactionId(trans_id),
            value,
@@ -346,6 +373,29 @@ fn send_response(_server_id: u8, conn_id: u16, trans_id: u32, status: u8, value:
    })
}

fn send_indication(_server_id: u8, handle: u16, conn_id: u16, value: &[u8]) {
    if !rust_event_loop_is_enabled() {
        return;
    }

    let handle = AttHandle(handle);
    let conn_id = ConnectionId(conn_id);
    let value = AttAttributeDataChild::RawData(value.into());

    do_in_rust_thread(move |modules| {
        let Some(bearer) = modules.gatt_module.get_bearer(conn_id) else {
            error!("connection {conn_id:?} does not exist");
            return;
        };
        let pending_indication = bearer.send_indication(handle, value);
        let gatt_outgoing_callbacks = modules.gatt_outgoing_callbacks.clone();
        spawn_local(async move {
            gatt_outgoing_callbacks
                .on_indication_sent_confirmation(conn_id, pending_indication.await);
        });
    })
}

fn associate_server_with_advertiser(server_id: u8, advertiser_id: u8) {
    if !rust_event_loop_is_enabled() {
        return;
Loading