Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 57611b29 authored by Aritra Sen's avatar Aritra Sen Committed by Automerger Merge Worker
Browse files

Merge "Add support for reading events through callback to the HFP service." am: 0e708434

parents 03914ebd 0e708434
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -107,6 +107,7 @@ service HfpService {
  rpc ConnectAudio(ConnectAudioRequest) returns (google.protobuf.Empty) {}
  rpc DisconnectAudio(DisconnectAudioRequest) returns (google.protobuf.Empty) {}
  rpc SetVolume(SetVolumeRequest) returns (google.protobuf.Empty) {}
  rpc FetchEvents(FetchEventsRequest) returns (stream FetchEventsResponse) {}
}

enum EventType {
@@ -114,6 +115,7 @@ enum EventType {
  SSP_REQUEST = 1;
  LE_RAND = 2;
  GENERATE_LOCAL_OOB_DATA = 3;
  HFP_CONNECTION_STATE = 4;
}
message FetchEventsRequest {}

+65 −9
Original line number Diff line number Diff line
//! HFP service facade

use bt_topshim::btif::{BluetoothInterface, RawAddress};
use bt_topshim::profiles::hfp::{Hfp, HfpCallbacksDispatcher};
use bt_topshim::profiles::hfp::{Hfp, HfpCallbacks, HfpCallbacksDispatcher};
use bt_topshim_facade_protobuf::empty::Empty;
use bt_topshim_facade_protobuf::facade::{
    ConnectAudioRequest, DisconnectAudioRequest, SetVolumeRequest, StartSlcRequest, StopSlcRequest,
    ConnectAudioRequest, DisconnectAudioRequest, EventType, FetchEventsRequest,
    FetchEventsResponse, SetVolumeRequest, StartSlcRequest, StopSlcRequest,
};
use bt_topshim_facade_protobuf::facade_grpc::{create_hfp_service, HfpService};

use futures::sink::SinkExt;
use grpcio::*;

use std::str::from_utf8;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

fn get_hfp_dispatcher() -> HfpCallbacksDispatcher {
    HfpCallbacksDispatcher { dispatch: Box::new(move |_cb| {}) }
fn get_hfp_dispatcher(
    _hfp: Arc<Mutex<Hfp>>,
    tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
) -> HfpCallbacksDispatcher {
    HfpCallbacksDispatcher {
        dispatch: Box::new(move |cb: HfpCallbacks| {
            if let HfpCallbacks::ConnectionState(state, address) = &cb {
                println!("Hfp Connection state changed to {:?} for address {:?}", state, address);
            }
            let guard_tx = tx.lock().unwrap();
            if let Some(event_tx) = guard_tx.as_ref() {
                let txclone = event_tx.clone();
                tokio::spawn(async move {
                    let _ = txclone.send(cb).await;
                });
            }
        }),
    }
}

/// Main object for Hfp facade service
@@ -24,15 +42,17 @@ pub struct HfpServiceImpl {
    #[allow(dead_code)]
    rt: Arc<Runtime>,
    pub btif_hfp: Arc<Mutex<Hfp>>,
    #[allow(dead_code)]
    event_tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
}

impl HfpServiceImpl {
    /// Create a new instance of the root facade service
    pub fn create(rt: Arc<Runtime>, btif_intf: Arc<Mutex<BluetoothInterface>>) -> grpcio::Service {
        let mut btif_hfp = Hfp::new(&btif_intf.lock().unwrap());
        btif_hfp.initialize(get_hfp_dispatcher());

        create_hfp_service(Self { rt, btif_hfp: Arc::new(Mutex::new(btif_hfp)) })
        let btif_hfp = Arc::new(Mutex::new(Hfp::new(&btif_intf.lock().unwrap())));
        let event_tx = Arc::new(Mutex::new(None));
        btif_hfp.lock().unwrap().initialize(get_hfp_dispatcher(btif_hfp.clone(), event_tx.clone()));
        create_hfp_service(Self { rt, btif_hfp, event_tx })
    }
}

@@ -139,4 +159,40 @@ impl HfpService for HfpServiceImpl {
            }
        })
    }

    fn fetch_events(
        &mut self,
        ctx: RpcContext<'_>,
        _req: FetchEventsRequest,
        mut sink: ServerStreamingSink<FetchEventsResponse>,
    ) {
        let (tx, mut rx) = mpsc::channel(10);
        {
            let mut guard = self.event_tx.lock().unwrap();
            if guard.is_some() {
                ctx.spawn(async move {
                    sink.fail(RpcStatus::with_message(
                        RpcStatusCode::UNAVAILABLE,
                        String::from("Profile is currently already connected and streaming"),
                    ))
                    .await
                    .unwrap();
                });
                return;
            } else {
                *guard = Some(tx);
            }
        }

        ctx.spawn(async move {
            while let Some(event) = rx.recv().await {
                if let HfpCallbacks::ConnectionState(state, address) = event {
                    let mut rsp = FetchEventsResponse::new();
                    rsp.event_type = EventType::HFP_CONNECTION_STATE;
                    rsp.data = format!("{:?}, {:?}", state, address);
                    sink.send((rsp, WriteFlags::default())).await.unwrap();
                }
            }
        })
    }
}
+2 −2
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ impl From<u32> for BthfConnectionState {
    }
}

#[derive(Debug, FromPrimitive, PartialEq, PartialOrd)]
#[derive(Debug, FromPrimitive, PartialEq, PartialOrd, Clone)]
#[repr(u32)]
pub enum BthfAudioState {
    Disconnected = 0,
@@ -99,7 +99,7 @@ pub mod ffi {
    }
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum HfpCallbacks {
    ConnectionState(BthfConnectionState, RawAddress),
    AudioState(BthfAudioState, RawAddress),