Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 02909efb authored by Sonny Sasaka's avatar Sonny Sasaka
Browse files

Convert some async to sync in btmanagerd

In preparation to use dbus_projection with btmanagerd, some interfaces
that interact with D-Bus needs to be sync instead of async due to the
nature of the floss API design where all methods should be sync (return
immediately) and async events are via callbacks instead:

* StateMachineProxy needs to be sync since this interfaces directly with
  D-Bus handlers.
* DbusCallbackUtil needs to be sync, but this is only transitional state
  because DbusCallbackUtil will be removed in the next patch when we
  integrate with dbus_projection.

All other internal code can stay using async interfaces.

This patch also fixes some compiler warnings.

Bug: 189501475
Tag: #floss
Test: Build floss on Linux

Change-Id: Id08792b2348944bd57b9ff7fbe97c43a2cb8ae8b
parent 974c72cc
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ bt_common = { path = "../../common" }
dbus = "0.9.2"
dbus-tokio = "0.7.3"
dbus-crossroads = "0.4.0"
futures = "0.3.13"
inotify = "*"
log = "0.4.14"
nix = "*"
+4 −3
Original line number Diff line number Diff line
use dbus::nonblock::{Proxy, SyncConnection};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::Mutex;

#[derive(Clone)]
pub struct DbusCallbackUtil {
@@ -28,7 +28,8 @@ impl DbusCallbackUtil {
        hci_device: i32,
        present: bool,
    ) -> Result<(), Box<dyn std::error::Error>> {
        for path in &*self.hci_device_change_observer.lock().await {
        let paths = self.hci_device_change_observer.lock().unwrap().clone();
        for path in paths.iter() {
            let proxy = Proxy::new(
                "org.chromium.bluetooth.Manager",
                path,
@@ -51,7 +52,7 @@ impl DbusCallbackUtil {
        hci_device: i32,
        state: i32,
    ) -> Result<(), Box<dyn std::error::Error>> {
        for path in &*self.state_change_observers.lock().await {
        for path in &*self.state_change_observers.lock().unwrap() {
            let proxy = Proxy::new(
                "org.chromium.bluetooth.Manager",
                path,
+97 −139
Original line number Diff line number Diff line
@@ -7,12 +7,11 @@ use dbus::message::MatchRule;
use dbus::nonblock::SyncConnection;
use dbus_crossroads::Crossroads;
use dbus_tokio::connection;
use log::{error, info, warn};
use log::{Level, LevelFilter, Metadata, Record, SetLoggerError};
use log::error;
use log::{Level, Metadata, Record};
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use std::sync::{Arc, Mutex};

struct SimpleLogger;

@@ -106,114 +105,79 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    cr.set_object_manager_support(Some(conn.clone()));
    cr.insert("/", &[cr.object_manager()], {});

    let iface_token = cr.register("org.chromium.bluetooth.Manager", |b| {
        b.method_with_cr_async(
    let iface_token = cr.register(
        "org.chromium.bluetooth.Manager",
        |b: &mut dbus_crossroads::IfaceBuilder<ManagerContext>| {
            b.method(
                "Start",
                ("hci_interface",),
                (),
            |mut ctx, cr, (hci_interface,): (i32,)| {
                |_ctx, manager_context, (hci_interface,): (i32,)| {
                    if !config_util::modify_hci_n_enabled(hci_interface, true) {
                        error!("Config is not successfully modified");
                    }
                let proxy = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().proxy.clone();
                async move {
                    let result = proxy.start_bluetooth(hci_interface).await;
                    match result {
                        Ok(()) => ctx.reply(Ok(())),
                        Err(_) => ctx.reply(Err(dbus_crossroads::MethodErr::failed(
                            "cannot start Bluetooth",
                        ))),
                    }
                }
                    manager_context.proxy.start_bluetooth(hci_interface);
                    Ok(())
                },
            );
        b.method_with_cr_async(
            b.method(
                "Stop",
                ("hci_interface",),
                (),
            |mut ctx, cr, (hci_interface,): (i32,)| {
                let proxy = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().proxy.clone();
                |_ctx, manager_context, (hci_interface,): (i32,)| {
                    if !config_util::modify_hci_n_enabled(hci_interface, false) {
                        error!("Config is not successfully modified");
                    }
                async move {
                    let result = proxy.stop_bluetooth(hci_interface).await;
                    match result {
                        Ok(()) => ctx.reply(Ok(())),
                        Err(_) => ctx.reply(Err(dbus_crossroads::MethodErr::failed(
                            "cannot stop Bluetooth",
                        ))),
                    }
                }
                    manager_context.proxy.stop_bluetooth(hci_interface);
                    Ok(())
                },
            );
        b.method_with_cr_async("GetState", (), ("result",), |mut ctx, cr, ()| {
            let proxy = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().proxy.clone();
            async move {
                let state = proxy.get_state().await;
            b.method("GetState", (), ("result",), |_ctx, manager_context, ()| {
                let proxy = manager_context.proxy.clone();
                let state = proxy.get_state();
                let result = state_machine::state_to_i32(state);
                ctx.reply(Ok((result,)))
            }
                Ok((result,))
            });
            // Register AdapterStateChangeCallback(int hci_device, int state) on specified object_path
        b.method_with_cr_async(
            b.method(
                "RegisterStateChangeObserver",
                ("object_path",),
                (),
            |mut ctx, cr, (object_path,): (String,)| {
                let manager_context = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().clone();
                async move {
                    manager_context.state_change_observer.lock().await.push(object_path.clone());
                    ctx.reply(Ok(()))
                }
                |_ctx, manager_context, (object_path,): (String,)| {
                    manager_context.state_change_observer.lock().unwrap().push(object_path.clone());
                    Ok(())
                },
            );
        b.method_with_cr_async(
            b.method(
                "UnregisterStateChangeObserver",
                ("object_path",),
                (),
            |mut ctx, cr, (object_path,): (String,)| {
                let manager_context = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().clone();
                async move {
                    let mut observers = manager_context.state_change_observer.lock().await;
                |_ctx, manager_context, (object_path,): (String,)| {
                    let mut observers = manager_context.state_change_observer.lock().unwrap();
                    match observers.iter().position(|x| *x == object_path) {
                        Some(index) => {
                            observers.remove(index);
                            ctx.reply(Ok(()))
                            Ok(())
                        }
                        _ => ctx.reply(Err(dbus_crossroads::MethodErr::failed(&format!(
                        _ => Err(dbus_crossroads::MethodErr::failed(&format!(
                            "cannot unregister {}",
                            object_path
                        )))),
                    }
                        ))),
                    }
                },
            );
        b.method_with_cr_async("GetFlossEnabled", (), ("result",), |mut ctx, cr, ()| {
            let enabled = cr
                .data_mut::<ManagerContext>(ctx.path())
                .unwrap()
                .clone()
                .floss_enabled
                .load(Ordering::Relaxed);
            b.method("GetFlossEnabled", (), ("result",), |_ctx, manager_context, ()| {
                let enabled = manager_context.floss_enabled.load(Ordering::Relaxed);

            async move { ctx.reply(Ok((enabled,))) }
                Ok((enabled,))
            });
        b.method_with_cr_async(
            b.method(
                "SetFlossEnabled",
                ("enabled",),
                (),
            |mut ctx, cr, (enabled,): (bool,)| {
                let prev = cr
                    .data_mut::<ManagerContext>(ctx.path())
                    .unwrap()
                    .clone()
                    .floss_enabled
                    .swap(enabled, Ordering::Relaxed);
                |_ctx, manager_context, (enabled,): (bool,)| {
                    let prev = manager_context.floss_enabled.swap(enabled, Ordering::Relaxed);
                    config_util::write_floss_enabled(enabled);
                let proxy = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().proxy.clone();

                async move {
                    if prev != enabled && enabled {
                        Command::new("initctl")
                            .args(&["stop", BLUEZ_INIT_TARGET])
@@ -222,60 +186,54 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
                        // TODO: Implement multi-hci case
                        let default_device = config_util::list_hci_devices()[0];
                        if config_util::is_hci_n_enabled(default_device) {
                            let _ = proxy.start_bluetooth(default_device).await;
                            let _ = manager_context.proxy.start_bluetooth(default_device);
                        }
                    } else if prev != enabled {
                        // TODO: Implement multi-hci case
                        let default_device = config_util::list_hci_devices()[0];
                        let _ = proxy.stop_bluetooth(default_device).await;
                        manager_context.proxy.stop_bluetooth(default_device);
                        Command::new("initctl")
                            .args(&["start", BLUEZ_INIT_TARGET])
                            .output()
                            .expect("failed to start bluetoothd");
                    }
                    ctx.reply(Ok(()))
                }
                    Ok(())
                },
            );
        b.method_with_cr_async("ListHciDevices", (), ("devices",), |mut ctx, _cr, ()| {
            b.method("ListHciDevices", (), ("devices",), |_ctx, _manager_context, ()| {
                let devices = config_util::list_hci_devices();
            async move { ctx.reply(Ok((devices,))) }
                Ok((devices,))
            });
            // Register AdapterStateChangeCallback(int hci_device, int state) on specified object_path
        b.method_with_cr_async(
            b.method(
                "RegisterHciDeviceChangeObserver",
                ("object_path",),
                (),
            |mut ctx, cr, (object_path,): (String,)| {
                let manager_context = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().clone();
                async move {
                    manager_context.hci_device_change_observer.lock().await.push(object_path);
                    ctx.reply(Ok(()))
                }
                |_ctx, manager_context, (object_path,): (String,)| {
                    manager_context.hci_device_change_observer.lock().unwrap().push(object_path);
                    Ok(())
                },
            );
        b.method_with_cr_async(
            b.method(
                "UnregisterHciDeviceChangeObserver",
                ("object_path",),
                (),
            |mut ctx, cr, (object_path,): (String,)| {
                let manager_context = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().clone();
                async move {
                    let mut observers = manager_context.hci_device_change_observer.lock().await;
                |_ctx, manager_context, (object_path,): (String,)| {
                    let mut observers = manager_context.hci_device_change_observer.lock().unwrap();
                    match observers.iter().position(|x| *x == object_path) {
                        Some(index) => {
                            observers.remove(index);
                            ctx.reply(Ok(()))
                            Ok(())
                        }
                        _ => ctx.reply(Err(dbus_crossroads::MethodErr::failed(&format!(
                        _ => Err(dbus_crossroads::MethodErr::failed(&format!(
                            "cannot unregister {}",
                            object_path
                        )))),
                    }
                        ))),
                    }
                },
            );
    });
        },
    );

    // Let's add the "/org/chromium/bluetooth/Manager" path, which implements
    // the org.chromium.bluetooth.Manager interface, to the crossroads instance.
+22 −31
Original line number Diff line number Diff line
@@ -7,7 +7,6 @@ use std::process::{Child, Command, Stdio};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::unix::AsyncFd;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::{mpsc, Mutex};

// Directory for Bluetooth pid file
@@ -74,32 +73,24 @@ const TX_SEND_TIMEOUT_DURATION: Duration = Duration::from_secs(3);
const COMMAND_TIMEOUT_DURATION: Duration = Duration::from_secs(3);

impl StateMachineProxy {
    pub async fn start_bluetooth(
        &self,
        hci_interface: i32,
    ) -> Result<(), SendTimeoutError<StateMachineActions>> {
        self.tx
            .send_timeout(
                StateMachineActions::StartBluetooth(hci_interface),
                TX_SEND_TIMEOUT_DURATION,
            )
            .await
    pub fn start_bluetooth(&self, hci_interface: i32) {
        let tx = self.tx.clone();
        tokio::spawn(async move {
            let _ = tx.send(StateMachineActions::StartBluetooth(hci_interface)).await;
        });
    }

    pub async fn stop_bluetooth(
        &self,
        hci_interface: i32,
    ) -> Result<(), SendTimeoutError<StateMachineActions>> {
        self.tx
            .send_timeout(
                StateMachineActions::StopBluetooth(hci_interface),
                TX_SEND_TIMEOUT_DURATION,
            )
            .await
    pub fn stop_bluetooth(&self, hci_interface: i32) {
        let tx = self.tx.clone();
        tokio::spawn(async move {
            let _ = tx.send(StateMachineActions::StopBluetooth(hci_interface)).await;
        });
    }

    pub async fn get_state(&self) -> State {
        *self.state.lock().await
    pub fn get_state(&self) -> State {
        // This assumes that self.state is never locked for a long period, i.e. never lock() and
        // await for something else without unlocking. Otherwise this function will block.
        return *futures::executor::block_on(self.state.lock());
    }
}

@@ -181,7 +172,7 @@ pub async fn mainloop<PM>(
                },
              }
            },
            expired = command_timeout.expired() => {
            _expired = command_timeout.expired() => {
                info!("expired {:?}", *context.state_machine.state.lock().await);
                let timeout_action = context.state_machine.action_on_command_timeout().await;
                match timeout_action {
@@ -210,7 +201,7 @@ pub async fn mainloop<PM>(
                                },
                                (inotify::EventMask::DELETE, Some(oss)) => {
                                    let file_name = oss.to_str().unwrap_or("invalid file");
                                    if let Some(hci) = get_hci_interface_from_pid_file_name(file_name) {
                                    if let Some(_hci) = get_hci_interface_from_pid_file_name(file_name) {
                                        context.tx.send_timeout(StateMachineActions::BluetoothStopped(), TX_SEND_TIMEOUT_DURATION).await.unwrap();
                                    }
                                },
@@ -233,7 +224,7 @@ pub async fn mainloop<PM>(
                                (inotify::EventMask::CREATE, Some(oss)) => {
                                    match get_hci_interface_from_device(oss.to_str().unwrap_or("invalid hci device")) {
                                        Some(hci) => {
                                            dbus_callback_util.send_hci_device_change_callback(hci, true).await;
                                            let _ = dbus_callback_util.send_hci_device_change_callback(hci, true).await;
                                        },
                                        _ => (),
                                    }
@@ -241,7 +232,7 @@ pub async fn mainloop<PM>(
                                (inotify::EventMask::DELETE, Some(oss)) => {
                                    match get_hci_interface_from_device(oss.to_str().unwrap_or("invalid hci device")) {
                                        Some(hci) => {
                                            dbus_callback_util.send_hci_device_change_callback(hci, false).await;
                                            let _ = dbus_callback_util.send_hci_device_change_callback(hci, false).await;
                                        },
                                        _ => (),
                                    }