Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit bc75fd0d authored by Aritra Sen's avatar Aritra Sen
Browse files

Add support for reading events through callback to the HFP service.

Bug: 261221648
Test: mma -j $(nproc)
Test: ./build.py
Test: system/gd/cert/run --clean --topshim
Tag: #floss
Change-Id: Ia05520ab0f078a62644f083c914985e4264dcbd7
parent 628be691
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -107,6 +107,7 @@ service HfpService {
  rpc ConnectAudio(ConnectAudioRequest) returns (google.protobuf.Empty) {}
  rpc DisconnectAudio(DisconnectAudioRequest) returns (google.protobuf.Empty) {}
  rpc SetVolume(SetVolumeRequest) returns (google.protobuf.Empty) {}
  rpc FetchEvents(FetchEventsRequest) returns (stream FetchEventsResponse) {}
}

enum EventType {
@@ -114,6 +115,7 @@ enum EventType {
  SSP_REQUEST = 1;
  LE_RAND = 2;
  GENERATE_LOCAL_OOB_DATA = 3;
  HFP_CONNECTION_STATE = 4;
}
message FetchEventsRequest {}

+65 −9
Original line number Diff line number Diff line
//! HFP service facade

use bt_topshim::btif::{BluetoothInterface, RawAddress};
use bt_topshim::profiles::hfp::{Hfp, HfpCallbacksDispatcher};
use bt_topshim::profiles::hfp::{Hfp, HfpCallbacks, HfpCallbacksDispatcher};
use bt_topshim_facade_protobuf::empty::Empty;
use bt_topshim_facade_protobuf::facade::{
    ConnectAudioRequest, DisconnectAudioRequest, SetVolumeRequest, StartSlcRequest, StopSlcRequest,
    ConnectAudioRequest, DisconnectAudioRequest, EventType, FetchEventsRequest,
    FetchEventsResponse, SetVolumeRequest, StartSlcRequest, StopSlcRequest,
};
use bt_topshim_facade_protobuf::facade_grpc::{create_hfp_service, HfpService};

use futures::sink::SinkExt;
use grpcio::*;

use std::str::from_utf8;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

fn get_hfp_dispatcher() -> HfpCallbacksDispatcher {
    HfpCallbacksDispatcher { dispatch: Box::new(move |_cb| {}) }
fn get_hfp_dispatcher(
    _hfp: Arc<Mutex<Hfp>>,
    tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
) -> HfpCallbacksDispatcher {
    HfpCallbacksDispatcher {
        dispatch: Box::new(move |cb: HfpCallbacks| {
            if let HfpCallbacks::ConnectionState(state, address) = &cb {
                println!("Hfp Connection state changed to {:?} for address {:?}", state, address);
            }
            let guard_tx = tx.lock().unwrap();
            if let Some(event_tx) = guard_tx.as_ref() {
                let txclone = event_tx.clone();
                tokio::spawn(async move {
                    let _ = txclone.send(cb).await;
                });
            }
        }),
    }
}

/// Main object for Hfp facade service
@@ -24,15 +42,17 @@ pub struct HfpServiceImpl {
    #[allow(dead_code)]
    rt: Arc<Runtime>,
    pub btif_hfp: Arc<Mutex<Hfp>>,
    #[allow(dead_code)]
    event_tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
}

impl HfpServiceImpl {
    /// Create a new instance of the root facade service
    pub fn create(rt: Arc<Runtime>, btif_intf: Arc<Mutex<BluetoothInterface>>) -> grpcio::Service {
        let mut btif_hfp = Hfp::new(&btif_intf.lock().unwrap());
        btif_hfp.initialize(get_hfp_dispatcher());

        create_hfp_service(Self { rt, btif_hfp: Arc::new(Mutex::new(btif_hfp)) })
        let btif_hfp = Arc::new(Mutex::new(Hfp::new(&btif_intf.lock().unwrap())));
        let event_tx = Arc::new(Mutex::new(None));
        btif_hfp.lock().unwrap().initialize(get_hfp_dispatcher(btif_hfp.clone(), event_tx.clone()));
        create_hfp_service(Self { rt, btif_hfp, event_tx })
    }
}

@@ -139,4 +159,40 @@ impl HfpService for HfpServiceImpl {
            }
        })
    }

    fn fetch_events(
        &mut self,
        ctx: RpcContext<'_>,
        _req: FetchEventsRequest,
        mut sink: ServerStreamingSink<FetchEventsResponse>,
    ) {
        let (tx, mut rx) = mpsc::channel(10);
        {
            let mut guard = self.event_tx.lock().unwrap();
            if guard.is_some() {
                ctx.spawn(async move {
                    sink.fail(RpcStatus::with_message(
                        RpcStatusCode::UNAVAILABLE,
                        String::from("Profile is currently already connected and streaming"),
                    ))
                    .await
                    .unwrap();
                });
                return;
            } else {
                *guard = Some(tx);
            }
        }

        ctx.spawn(async move {
            while let Some(event) = rx.recv().await {
                if let HfpCallbacks::ConnectionState(state, address) = event {
                    let mut rsp = FetchEventsResponse::new();
                    rsp.event_type = EventType::HFP_CONNECTION_STATE;
                    rsp.data = format!("{:?}, {:?}", state, address);
                    sink.send((rsp, WriteFlags::default())).await.unwrap();
                }
            }
        })
    }
}
+2 −2
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ impl From<u32> for BthfConnectionState {
    }
}

#[derive(Debug, FromPrimitive, PartialEq, PartialOrd)]
#[derive(Debug, FromPrimitive, PartialEq, PartialOrd, Clone)]
#[repr(u32)]
pub enum BthfAudioState {
    Disconnected = 0,
@@ -99,7 +99,7 @@ pub mod ffi {
    }
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum HfpCallbacks {
    ConnectionState(BthfConnectionState, RawAddress),
    AudioState(BthfAudioState, RawAddress),