Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 39d47fab authored by Sonny Sasaka's avatar Sonny Sasaka Committed by Gerrit Code Review
Browse files

Merge "Convert some async to sync in btmanagerd"

parents afd604fb 02909efb
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ bt_common = { path = "../../common" }
dbus = "0.9.2"
dbus-tokio = "0.7.3"
dbus-crossroads = "0.4.0"
futures = "0.3.13"
inotify = "*"
log = "0.4.14"
nix = "*"
+4 −3
Original line number Diff line number Diff line
use dbus::nonblock::{Proxy, SyncConnection};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::Mutex;

#[derive(Clone)]
pub struct DbusCallbackUtil {
@@ -28,7 +28,8 @@ impl DbusCallbackUtil {
        hci_device: i32,
        present: bool,
    ) -> Result<(), Box<dyn std::error::Error>> {
        for path in &*self.hci_device_change_observer.lock().await {
        let paths = self.hci_device_change_observer.lock().unwrap().clone();
        for path in paths.iter() {
            let proxy = Proxy::new(
                "org.chromium.bluetooth.Manager",
                path,
@@ -51,7 +52,7 @@ impl DbusCallbackUtil {
        hci_device: i32,
        state: i32,
    ) -> Result<(), Box<dyn std::error::Error>> {
        for path in &*self.state_change_observers.lock().await {
        for path in &*self.state_change_observers.lock().unwrap() {
            let proxy = Proxy::new(
                "org.chromium.bluetooth.Manager",
                path,
+97 −139
Original line number Diff line number Diff line
@@ -7,12 +7,11 @@ use dbus::message::MatchRule;
use dbus::nonblock::SyncConnection;
use dbus_crossroads::Crossroads;
use dbus_tokio::connection;
use log::{error, info, warn};
use log::{Level, LevelFilter, Metadata, Record, SetLoggerError};
use log::error;
use log::{Level, Metadata, Record};
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use std::sync::{Arc, Mutex};

struct SimpleLogger;

@@ -106,114 +105,79 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    cr.set_object_manager_support(Some(conn.clone()));
    cr.insert("/", &[cr.object_manager()], {});

    let iface_token = cr.register("org.chromium.bluetooth.Manager", |b| {
        b.method_with_cr_async(
    let iface_token = cr.register(
        "org.chromium.bluetooth.Manager",
        |b: &mut dbus_crossroads::IfaceBuilder<ManagerContext>| {
            b.method(
                "Start",
                ("hci_interface",),
                (),
            |mut ctx, cr, (hci_interface,): (i32,)| {
                |_ctx, manager_context, (hci_interface,): (i32,)| {
                    if !config_util::modify_hci_n_enabled(hci_interface, true) {
                        error!("Config is not successfully modified");
                    }
                let proxy = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().proxy.clone();
                async move {
                    let result = proxy.start_bluetooth(hci_interface).await;
                    match result {
                        Ok(()) => ctx.reply(Ok(())),
                        Err(_) => ctx.reply(Err(dbus_crossroads::MethodErr::failed(
                            "cannot start Bluetooth",
                        ))),
                    }
                }
                    manager_context.proxy.start_bluetooth(hci_interface);
                    Ok(())
                },
            );
        b.method_with_cr_async(
            b.method(
                "Stop",
                ("hci_interface",),
                (),
            |mut ctx, cr, (hci_interface,): (i32,)| {
                let proxy = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().proxy.clone();
                |_ctx, manager_context, (hci_interface,): (i32,)| {
                    if !config_util::modify_hci_n_enabled(hci_interface, false) {
                        error!("Config is not successfully modified");
                    }
                async move {
                    let result = proxy.stop_bluetooth(hci_interface).await;
                    match result {
                        Ok(()) => ctx.reply(Ok(())),
                        Err(_) => ctx.reply(Err(dbus_crossroads::MethodErr::failed(
                            "cannot stop Bluetooth",
                        ))),
                    }
                }
                    manager_context.proxy.stop_bluetooth(hci_interface);
                    Ok(())
                },
            );
        b.method_with_cr_async("GetState", (), ("result",), |mut ctx, cr, ()| {
            let proxy = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().proxy.clone();
            async move {
                let state = proxy.get_state().await;
            b.method("GetState", (), ("result",), |_ctx, manager_context, ()| {
                let proxy = manager_context.proxy.clone();
                let state = proxy.get_state();
                let result = state_machine::state_to_i32(state);
                ctx.reply(Ok((result,)))
            }
                Ok((result,))
            });
            // Register AdapterStateChangeCallback(int hci_device, int state) on specified object_path
        b.method_with_cr_async(
            b.method(
                "RegisterStateChangeObserver",
                ("object_path",),
                (),
            |mut ctx, cr, (object_path,): (String,)| {
                let manager_context = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().clone();
                async move {
                    manager_context.state_change_observer.lock().await.push(object_path.clone());
                    ctx.reply(Ok(()))
                }
                |_ctx, manager_context, (object_path,): (String,)| {
                    manager_context.state_change_observer.lock().unwrap().push(object_path.clone());
                    Ok(())
                },
            );
        b.method_with_cr_async(
            b.method(
                "UnregisterStateChangeObserver",
                ("object_path",),
                (),
            |mut ctx, cr, (object_path,): (String,)| {
                let manager_context = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().clone();
                async move {
                    let mut observers = manager_context.state_change_observer.lock().await;
                |_ctx, manager_context, (object_path,): (String,)| {
                    let mut observers = manager_context.state_change_observer.lock().unwrap();
                    match observers.iter().position(|x| *x == object_path) {
                        Some(index) => {
                            observers.remove(index);
                            ctx.reply(Ok(()))
                            Ok(())
                        }
                        _ => ctx.reply(Err(dbus_crossroads::MethodErr::failed(&format!(
                        _ => Err(dbus_crossroads::MethodErr::failed(&format!(
                            "cannot unregister {}",
                            object_path
                        )))),
                    }
                        ))),
                    }
                },
            );
        b.method_with_cr_async("GetFlossEnabled", (), ("result",), |mut ctx, cr, ()| {
            let enabled = cr
                .data_mut::<ManagerContext>(ctx.path())
                .unwrap()
                .clone()
                .floss_enabled
                .load(Ordering::Relaxed);
            b.method("GetFlossEnabled", (), ("result",), |_ctx, manager_context, ()| {
                let enabled = manager_context.floss_enabled.load(Ordering::Relaxed);

            async move { ctx.reply(Ok((enabled,))) }
                Ok((enabled,))
            });
        b.method_with_cr_async(
            b.method(
                "SetFlossEnabled",
                ("enabled",),
                (),
            |mut ctx, cr, (enabled,): (bool,)| {
                let prev = cr
                    .data_mut::<ManagerContext>(ctx.path())
                    .unwrap()
                    .clone()
                    .floss_enabled
                    .swap(enabled, Ordering::Relaxed);
                |_ctx, manager_context, (enabled,): (bool,)| {
                    let prev = manager_context.floss_enabled.swap(enabled, Ordering::Relaxed);
                    config_util::write_floss_enabled(enabled);
                let proxy = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().proxy.clone();

                async move {
                    if prev != enabled && enabled {
                        Command::new("initctl")
                            .args(&["stop", BLUEZ_INIT_TARGET])
@@ -222,60 +186,54 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
                        // TODO: Implement multi-hci case
                        let default_device = config_util::list_hci_devices()[0];
                        if config_util::is_hci_n_enabled(default_device) {
                            let _ = proxy.start_bluetooth(default_device).await;
                            let _ = manager_context.proxy.start_bluetooth(default_device);
                        }
                    } else if prev != enabled {
                        // TODO: Implement multi-hci case
                        let default_device = config_util::list_hci_devices()[0];
                        let _ = proxy.stop_bluetooth(default_device).await;
                        manager_context.proxy.stop_bluetooth(default_device);
                        Command::new("initctl")
                            .args(&["start", BLUEZ_INIT_TARGET])
                            .output()
                            .expect("failed to start bluetoothd");
                    }
                    ctx.reply(Ok(()))
                }
                    Ok(())
                },
            );
        b.method_with_cr_async("ListHciDevices", (), ("devices",), |mut ctx, _cr, ()| {
            b.method("ListHciDevices", (), ("devices",), |_ctx, _manager_context, ()| {
                let devices = config_util::list_hci_devices();
            async move { ctx.reply(Ok((devices,))) }
                Ok((devices,))
            });
            // Register AdapterStateChangeCallback(int hci_device, int state) on specified object_path
        b.method_with_cr_async(
            b.method(
                "RegisterHciDeviceChangeObserver",
                ("object_path",),
                (),
            |mut ctx, cr, (object_path,): (String,)| {
                let manager_context = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().clone();
                async move {
                    manager_context.hci_device_change_observer.lock().await.push(object_path);
                    ctx.reply(Ok(()))
                }
                |_ctx, manager_context, (object_path,): (String,)| {
                    manager_context.hci_device_change_observer.lock().unwrap().push(object_path);
                    Ok(())
                },
            );
        b.method_with_cr_async(
            b.method(
                "UnregisterHciDeviceChangeObserver",
                ("object_path",),
                (),
            |mut ctx, cr, (object_path,): (String,)| {
                let manager_context = cr.data_mut::<ManagerContext>(ctx.path()).unwrap().clone();
                async move {
                    let mut observers = manager_context.hci_device_change_observer.lock().await;
                |_ctx, manager_context, (object_path,): (String,)| {
                    let mut observers = manager_context.hci_device_change_observer.lock().unwrap();
                    match observers.iter().position(|x| *x == object_path) {
                        Some(index) => {
                            observers.remove(index);
                            ctx.reply(Ok(()))
                            Ok(())
                        }
                        _ => ctx.reply(Err(dbus_crossroads::MethodErr::failed(&format!(
                        _ => Err(dbus_crossroads::MethodErr::failed(&format!(
                            "cannot unregister {}",
                            object_path
                        )))),
                    }
                        ))),
                    }
                },
            );
    });
        },
    );

    // Let's add the "/org/chromium/bluetooth/Manager" path, which implements
    // the org.chromium.bluetooth.Manager interface, to the crossroads instance.
+22 −31
Original line number Diff line number Diff line
@@ -7,7 +7,6 @@ use std::process::{Child, Command, Stdio};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::unix::AsyncFd;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::{mpsc, Mutex};

// Directory for Bluetooth pid file
@@ -74,32 +73,24 @@ const TX_SEND_TIMEOUT_DURATION: Duration = Duration::from_secs(3);
const COMMAND_TIMEOUT_DURATION: Duration = Duration::from_secs(3);

impl StateMachineProxy {
    pub async fn start_bluetooth(
        &self,
        hci_interface: i32,
    ) -> Result<(), SendTimeoutError<StateMachineActions>> {
        self.tx
            .send_timeout(
                StateMachineActions::StartBluetooth(hci_interface),
                TX_SEND_TIMEOUT_DURATION,
            )
            .await
    pub fn start_bluetooth(&self, hci_interface: i32) {
        let tx = self.tx.clone();
        tokio::spawn(async move {
            let _ = tx.send(StateMachineActions::StartBluetooth(hci_interface)).await;
        });
    }

    pub async fn stop_bluetooth(
        &self,
        hci_interface: i32,
    ) -> Result<(), SendTimeoutError<StateMachineActions>> {
        self.tx
            .send_timeout(
                StateMachineActions::StopBluetooth(hci_interface),
                TX_SEND_TIMEOUT_DURATION,
            )
            .await
    pub fn stop_bluetooth(&self, hci_interface: i32) {
        let tx = self.tx.clone();
        tokio::spawn(async move {
            let _ = tx.send(StateMachineActions::StopBluetooth(hci_interface)).await;
        });
    }

    pub async fn get_state(&self) -> State {
        *self.state.lock().await
    pub fn get_state(&self) -> State {
        // This assumes that self.state is never locked for a long period, i.e. never lock() and
        // await for something else without unlocking. Otherwise this function will block.
        return *futures::executor::block_on(self.state.lock());
    }
}

@@ -181,7 +172,7 @@ pub async fn mainloop<PM>(
                },
              }
            },
            expired = command_timeout.expired() => {
            _expired = command_timeout.expired() => {
                info!("expired {:?}", *context.state_machine.state.lock().await);
                let timeout_action = context.state_machine.action_on_command_timeout().await;
                match timeout_action {
@@ -210,7 +201,7 @@ pub async fn mainloop<PM>(
                                },
                                (inotify::EventMask::DELETE, Some(oss)) => {
                                    let file_name = oss.to_str().unwrap_or("invalid file");
                                    if let Some(hci) = get_hci_interface_from_pid_file_name(file_name) {
                                    if let Some(_hci) = get_hci_interface_from_pid_file_name(file_name) {
                                        context.tx.send_timeout(StateMachineActions::BluetoothStopped(), TX_SEND_TIMEOUT_DURATION).await.unwrap();
                                    }
                                },
@@ -233,7 +224,7 @@ pub async fn mainloop<PM>(
                                (inotify::EventMask::CREATE, Some(oss)) => {
                                    match get_hci_interface_from_device(oss.to_str().unwrap_or("invalid hci device")) {
                                        Some(hci) => {
                                            dbus_callback_util.send_hci_device_change_callback(hci, true).await;
                                            let _ = dbus_callback_util.send_hci_device_change_callback(hci, true).await;
                                        },
                                        _ => (),
                                    }
@@ -241,7 +232,7 @@ pub async fn mainloop<PM>(
                                (inotify::EventMask::DELETE, Some(oss)) => {
                                    match get_hci_interface_from_device(oss.to_str().unwrap_or("invalid hci device")) {
                                        Some(hci) => {
                                            dbus_callback_util.send_hci_device_change_callback(hci, false).await;
                                            let _ = dbus_callback_util.send_hci_device_change_callback(hci, false).await;
                                        },
                                        _ => (),
                                    }