Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0e708434 authored by Aritra Sen's avatar Aritra Sen Committed by Gerrit Code Review
Browse files

Merge "Add support for reading events through callback to the HFP service."

parents f6d79936 bc75fd0d
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -107,6 +107,7 @@ service HfpService {
  rpc ConnectAudio(ConnectAudioRequest) returns (google.protobuf.Empty) {}
  rpc DisconnectAudio(DisconnectAudioRequest) returns (google.protobuf.Empty) {}
  rpc SetVolume(SetVolumeRequest) returns (google.protobuf.Empty) {}
  rpc FetchEvents(FetchEventsRequest) returns (stream FetchEventsResponse) {}
}

enum EventType {
@@ -114,6 +115,7 @@ enum EventType {
  SSP_REQUEST = 1;
  LE_RAND = 2;
  GENERATE_LOCAL_OOB_DATA = 3;
  HFP_CONNECTION_STATE = 4;
}
message FetchEventsRequest {}

+65 −9
Original line number Diff line number Diff line
//! HFP service facade

use bt_topshim::btif::{BluetoothInterface, RawAddress};
use bt_topshim::profiles::hfp::{Hfp, HfpCallbacksDispatcher};
use bt_topshim::profiles::hfp::{Hfp, HfpCallbacks, HfpCallbacksDispatcher};
use bt_topshim_facade_protobuf::empty::Empty;
use bt_topshim_facade_protobuf::facade::{
    ConnectAudioRequest, DisconnectAudioRequest, SetVolumeRequest, StartSlcRequest, StopSlcRequest,
    ConnectAudioRequest, DisconnectAudioRequest, EventType, FetchEventsRequest,
    FetchEventsResponse, SetVolumeRequest, StartSlcRequest, StopSlcRequest,
};
use bt_topshim_facade_protobuf::facade_grpc::{create_hfp_service, HfpService};

use futures::sink::SinkExt;
use grpcio::*;

use std::str::from_utf8;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

fn get_hfp_dispatcher() -> HfpCallbacksDispatcher {
    HfpCallbacksDispatcher { dispatch: Box::new(move |_cb| {}) }
fn get_hfp_dispatcher(
    _hfp: Arc<Mutex<Hfp>>,
    tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
) -> HfpCallbacksDispatcher {
    HfpCallbacksDispatcher {
        dispatch: Box::new(move |cb: HfpCallbacks| {
            if let HfpCallbacks::ConnectionState(state, address) = &cb {
                println!("Hfp Connection state changed to {:?} for address {:?}", state, address);
            }
            let guard_tx = tx.lock().unwrap();
            if let Some(event_tx) = guard_tx.as_ref() {
                let txclone = event_tx.clone();
                tokio::spawn(async move {
                    let _ = txclone.send(cb).await;
                });
            }
        }),
    }
}

/// Main object for Hfp facade service
@@ -24,15 +42,17 @@ pub struct HfpServiceImpl {
    #[allow(dead_code)]
    rt: Arc<Runtime>,
    pub btif_hfp: Arc<Mutex<Hfp>>,
    #[allow(dead_code)]
    event_tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
}

impl HfpServiceImpl {
    /// Create a new instance of the root facade service
    pub fn create(rt: Arc<Runtime>, btif_intf: Arc<Mutex<BluetoothInterface>>) -> grpcio::Service {
        let mut btif_hfp = Hfp::new(&btif_intf.lock().unwrap());
        btif_hfp.initialize(get_hfp_dispatcher());

        create_hfp_service(Self { rt, btif_hfp: Arc::new(Mutex::new(btif_hfp)) })
        let btif_hfp = Arc::new(Mutex::new(Hfp::new(&btif_intf.lock().unwrap())));
        let event_tx = Arc::new(Mutex::new(None));
        btif_hfp.lock().unwrap().initialize(get_hfp_dispatcher(btif_hfp.clone(), event_tx.clone()));
        create_hfp_service(Self { rt, btif_hfp, event_tx })
    }
}

@@ -139,4 +159,40 @@ impl HfpService for HfpServiceImpl {
            }
        })
    }

    fn fetch_events(
        &mut self,
        ctx: RpcContext<'_>,
        _req: FetchEventsRequest,
        mut sink: ServerStreamingSink<FetchEventsResponse>,
    ) {
        let (tx, mut rx) = mpsc::channel(10);
        {
            let mut guard = self.event_tx.lock().unwrap();
            if guard.is_some() {
                ctx.spawn(async move {
                    sink.fail(RpcStatus::with_message(
                        RpcStatusCode::UNAVAILABLE,
                        String::from("Profile is currently already connected and streaming"),
                    ))
                    .await
                    .unwrap();
                });
                return;
            } else {
                *guard = Some(tx);
            }
        }

        ctx.spawn(async move {
            while let Some(event) = rx.recv().await {
                if let HfpCallbacks::ConnectionState(state, address) = event {
                    let mut rsp = FetchEventsResponse::new();
                    rsp.event_type = EventType::HFP_CONNECTION_STATE;
                    rsp.data = format!("{:?}, {:?}", state, address);
                    sink.send((rsp, WriteFlags::default())).await.unwrap();
                }
            }
        })
    }
}
+2 −2
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ impl From<u32> for BthfConnectionState {
    }
}

#[derive(Debug, FromPrimitive, PartialEq, PartialOrd)]
#[derive(Debug, FromPrimitive, PartialEq, PartialOrd, Clone)]
#[repr(u32)]
pub enum BthfAudioState {
    Disconnected = 0,
@@ -99,7 +99,7 @@ pub mod ffi {
    }
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum HfpCallbacks {
    ConnectionState(BthfConnectionState, RawAddress),
    AudioState(BthfAudioState, RawAddress),