Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7cba3954 authored by Sonny Sasaka's avatar Sonny Sasaka
Browse files

Floss: Integrate powerd Suspend/Resume API

btmanagerd acts as a client of powerd Suspend/Resume API[1], and
connects to btadapterd's Suspend API. This currently only supports a
single adapter.

[1]:
https://chromium.googlesource.com/chromiumos/platform/system_api/+/HEAD/dbus/power_manager/suspend.proto

Bug: 224606285
Tag: #floss
Test: Manual - build.py

Change-Id: I744d2ba536366652e9350098305b01c7cfcf61b9
parent 12bb12c7
Loading
Loading
Loading
Loading
+0 −2
Original line number Diff line number Diff line
@@ -17,8 +17,6 @@ pub struct SuspendDBus {
}

impl SuspendDBus {
    #[allow(dead_code)]
    // TODO: Remove allow dead code when the code is actually used.
    pub(crate) fn new(conn: Arc<SyncConnection>, path: dbus::Path<'static>) -> SuspendDBus {
        SuspendDBus {
            client_proxy: ClientDBusProxy::new(
+23 −6
Original line number Diff line number Diff line
@@ -3,10 +3,12 @@ mod bluetooth_manager_dbus;
mod config_util;
mod dbus_arg;
mod dbus_iface;
mod powerd_suspend_manager;
mod service_watcher;
mod state_machine;

use crate::bluetooth_manager::BluetoothManager;
use crate::powerd_suspend_manager::PowerdSuspendManager;
use dbus::channel::MatchingReceiver;
use dbus::message::MatchRule;
use dbus_crossroads::Crossroads;
@@ -42,6 +44,11 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to the D-Bus system bus (this is blocking, unfortunately).
    let (resource, conn) = connection::new_system_sync()?;

    // There are multiple signal handlers. We need to set signal match mode to true to allow signal
    // handlers process the signals independently. Otherwise only the first signal handler will get
    // the chance to handle the same signal in case the match rule overlaps.
    conn.set_signal_match_mode(true);

    // Determine whether to use upstart or systemd
    let args: Vec<String> = std::env::args().collect();
    let invoker = if args.len() > 1 {
@@ -69,14 +76,15 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {

    // Let's request a name on the bus, so that clients can find us.
    conn.request_name("org.chromium.bluetooth.Manager", false, true, false).await?;
    log::debug!("D-Bus name: {}", conn.unique_name());

    // Create a new crossroads instance.
    // The instance is configured so that introspection and properties interfaces
    // are added by default on object path additions.
    let mut cr = Crossroads::new();
    let cr = Arc::new(Mutex::new(Crossroads::new()));

    // Enable async support for the crossroads instance.
    cr.set_async_support(Some((
    cr.lock().unwrap().set_async_support(Some((
        conn.clone(),
        Box::new(|x| {
            tokio::spawn(x);
@@ -86,8 +94,9 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Object manager is necessary for clients (to inform them when Bluetooth is
    // available). Create it at root (/) so subsequent additions generate
    // InterfaceAdded and InterfaceRemoved signals.
    cr.set_object_manager_support(Some(conn.clone()));
    cr.insert("/", &[cr.object_manager()], {});
    cr.lock().unwrap().set_object_manager_support(Some(conn.clone()));
    let om = cr.lock().unwrap().object_manager();
    cr.lock().unwrap().insert("/", &[om], {});

    let bluetooth_manager = Arc::new(Mutex::new(Box::new(BluetoothManager::new(manager_context))));

@@ -100,20 +109,28 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    bluetooth_manager_dbus::export_bluetooth_manager_dbus_obj(
        "/org/chromium/bluetooth/Manager",
        conn.clone(),
        &mut cr,
        &mut cr.lock().unwrap(),
        bluetooth_manager.clone(),
        disconnect_watcher.clone(),
    );

    // We add the Crossroads instance to the connection so that incoming method calls will be handled.
    let cr_clone = cr.clone();
    conn.start_receive(
        MatchRule::new_method_call(),
        Box::new(move |msg, conn| {
            cr.handle_message(msg, conn).unwrap();
            cr_clone.lock().unwrap().handle_message(msg, conn).unwrap();
            true
        }),
    );

    let mut powerd_suspend_manager = PowerdSuspendManager::new(conn.clone(), cr);

    tokio::spawn(async move {
        powerd_suspend_manager.init().await;
        powerd_suspend_manager.mainloop().await;
    });

    tokio::spawn(async move {
        state_machine::mainloop(context, bluetooth_manager).await;
    });
+488 −0
Original line number Diff line number Diff line
use btstack::suspend::{ISuspend, ISuspendCallback, SuspendType};
use btstack::RPCProxy;
use dbus::channel::MatchingReceiver;
use dbus::message::MatchRule;
use dbus::nonblock::SyncConnection;
use dbus_crossroads::Crossroads;
use dbus_projection::DisconnectWatcher;
use manager_service::suspend::{
    RegisterSuspendDelayReply, RegisterSuspendDelayRequest, SuspendDone, SuspendImminent,
    SuspendImminent_Reason, SuspendReadinessInfo,
};
use protobuf::{CodedInputStream, CodedOutputStream, Message};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use crate::dbus_iface::{export_suspend_callback_dbus_obj, SuspendDBus};
use crate::service_watcher::ServiceWatcher;

const POWERD_SERVICE: &str = "org.chromium.PowerManager";
const POWERD_INTERFACE: &str = "org.chromium.PowerManager";
const POWERD_PATH: &str = "/org/chromium/PowerManager";
const ADAPTER_SERVICE: &str = "org.chromium.bluetooth";
const ADAPTER_SUSPEND_INTERFACE: &str = "org.chromium.bluetooth.Suspend";
const SUSPEND_IMMINENT_SIGNAL: &str = "SuspendImminent";
const SUSPEND_DONE_SIGNAL: &str = "SuspendDone";
const BTMANAGERD_NAME: &str = "Bluetooth Manager";
const DBUS_TIMEOUT: Duration = Duration::from_secs(2);

#[derive(Debug)]
enum SuspendManagerMessage {
    PowerdStarted,
    PowerdStopped,
    SuspendImminentReceived(SuspendImminent),
    SuspendDoneReceived(SuspendDone),
    AdapterFound(dbus::Path<'static>),
    AdapterRemoved,
}

struct PowerdSession {
    delay_id: i32,
    powerd_proxy: dbus::nonblock::Proxy<'static, Arc<SyncConnection>>,
}

/// Callback container for suspend interface callbacks.
pub(crate) struct SuspendCallback {
    objpath: String,

    dbus_connection: Arc<SyncConnection>,
    dbus_crossroads: Arc<Mutex<Crossroads>>,

    context: Arc<Mutex<SuspendManagerContext>>,
}

impl SuspendCallback {
    pub(crate) fn new(
        objpath: String,
        dbus_connection: Arc<SyncConnection>,
        dbus_crossroads: Arc<Mutex<Crossroads>>,
        context: Arc<Mutex<SuspendManagerContext>>,
    ) -> Self {
        Self { objpath, dbus_connection, dbus_crossroads, context }
    }
}

fn generate_proto_bytes<T: protobuf::Message>(request: &T) -> Option<Vec<u8>> {
    let mut proto_bytes: Vec<u8> = vec![];
    let mut output_stream = CodedOutputStream::vec(&mut proto_bytes);
    let write_result = request.write_to_with_cached_sizes(&mut output_stream);
    if let Err(e) = write_result {
        log::error!("Error serializing proto to bytes: {}", e);
        return None;
    }
    Some(proto_bytes)
}

// Convenient function to call HandleSuspendReadiness to powerd when we want to tell it that
// Bluetooth is ready to suspend.
fn send_handle_suspend_readiness(
    powerd_proxy: dbus::nonblock::Proxy<'static, Arc<SyncConnection>>,
    delay_id: i32,
    suspend_id: i32,
) {
    let mut suspend_readiness_info = SuspendReadinessInfo::new();
    suspend_readiness_info.set_delay_id(delay_id);
    suspend_readiness_info.set_suspend_id(suspend_id);

    if let Some(suspend_readiness_info_proto) = generate_proto_bytes(&suspend_readiness_info) {
        tokio::spawn(async move {
            log::debug!(
                "Sending HandleSuspendReadiness, delay id = {}, suspend id = {}",
                suspend_readiness_info.get_delay_id(),
                suspend_readiness_info.get_suspend_id()
            );
            let ret: Result<(), dbus::Error> = powerd_proxy
                .method_call(
                    POWERD_INTERFACE,
                    "HandleSuspendReadiness",
                    (suspend_readiness_info_proto,),
                )
                .await;

            log::debug!("HandleSuspendReadiness returns {:?}", ret);
            if let Err(e) = ret {
                log::error!("Error calling HandleSuspendReadiness: {}", e)
            }
        });
    } else {
        log::error!("Error writing SuspendReadinessInfo");
    }
}

impl ISuspendCallback for SuspendCallback {
    fn on_callback_registered(&self, callback_id: u32) {
        log::debug!("Suspend callback registered, callback_id = {}", callback_id);
    }

    fn on_suspend_ready(&self, suspend_id: u32) {
        // Received when adapter is ready to suspend. Tell powerd that suspend is ready.
        log::debug!("Suspend ready, adapter suspend_id = {}", suspend_id);

        if let Some(pending_suspend_imminent) =
            &self.context.lock().unwrap().pending_suspend_imminent
        {
            if let Some(powerd_session) = &self.context.lock().unwrap().powerd_session {
                send_handle_suspend_readiness(
                    powerd_session.powerd_proxy.clone(),
                    powerd_session.delay_id,
                    pending_suspend_imminent.get_suspend_id(),
                );
            } else {
                log::warn!("Suspend is ready but there is no powerd session to report to");
            }
        } else {
            log::warn!("Suspend ready but no SuspendImminent signal");
        }
    }

    fn on_resumed(&self, suspend_id: u32) {
        // Received when adapter is ready to suspend. This is just for our information and powerd
        // doesn't need to know about this.
        log::debug!("Suspend resumed, adapter suspend_id = {}", suspend_id);
    }
}

impl RPCProxy for SuspendCallback {
    fn register_disconnect(&mut self, _f: Box<dyn Fn(u32) + Send>) -> u32 {
        0
    }

    fn get_object_id(&self) -> String {
        self.objpath.clone()
    }

    fn unregister(&mut self, _id: u32) -> bool {
        false
    }

    fn export_for_rpc(self: Box<Self>) {
        let cr = self.dbus_crossroads.clone();
        export_suspend_callback_dbus_obj(
            self.get_object_id(),
            self.dbus_connection.clone(),
            &mut cr.lock().unwrap(),
            Arc::new(Mutex::new(self)),
            Arc::new(Mutex::new(DisconnectWatcher::new())),
        );
    }
}

/// Holds the necessary information to coordinate suspend between powerd and btadapterd.
pub struct SuspendManagerContext {
    dbus_crossroads: Arc<Mutex<Crossroads>>,
    powerd_session: Option<PowerdSession>,
    adapter_suspend_dbus: Option<SuspendDBus>,
    pending_suspend_imminent: Option<SuspendImminent>,
}

/// Coordinates suspend events of Chromium OS's powerd with btadapter Suspend API.
pub struct PowerdSuspendManager {
    context: Arc<Mutex<SuspendManagerContext>>,
    conn: Arc<SyncConnection>,
    tx: tokio::sync::mpsc::Sender<SuspendManagerMessage>,
    rx: tokio::sync::mpsc::Receiver<SuspendManagerMessage>,
}

impl PowerdSuspendManager {
    /// Instantiates the suspend manager.
    ///
    /// `conn` and `dbus_crossroads` are D-Bus objects from `dbus` crate, to be used for both
    /// communication with powerd and btadapterd.
    pub fn new(conn: Arc<SyncConnection>, dbus_crossroads: Arc<Mutex<Crossroads>>) -> Self {
        let (tx, rx) = tokio::sync::mpsc::channel::<SuspendManagerMessage>(10);
        Self {
            context: Arc::new(Mutex::new(SuspendManagerContext {
                dbus_crossroads,
                powerd_session: None,
                adapter_suspend_dbus: None,
                pending_suspend_imminent: None,
            })),
            conn,
            tx,
            rx,
        }
    }

    /// Sets up all required D-Bus listeners.
    pub async fn init(&mut self) {
        // Watch events of powerd appearing or disappearing.
        let powerd_watcher = ServiceWatcher::new(self.conn.clone(), String::from(POWERD_SERVICE));
        let tx1 = self.tx.clone();
        let tx2 = self.tx.clone();
        powerd_watcher
            .start_watch(
                Box::new(move || {
                    let tx_clone = tx1.clone();
                    tokio::spawn(async move {
                        let _ = tx_clone.send(SuspendManagerMessage::PowerdStarted).await;
                    });
                }),
                Box::new(move || {
                    let tx_clone = tx2.clone();
                    tokio::spawn(async move {
                        let _ = tx_clone.send(SuspendManagerMessage::PowerdStopped).await;
                    });
                }),
            )
            .await;

        // Watch events of btadapterd appearing or disappearing.
        let adapter_watcher = ServiceWatcher::new(self.conn.clone(), String::from(ADAPTER_SERVICE));
        let tx1 = self.tx.clone();
        let tx2 = self.tx.clone();
        adapter_watcher
            .start_watch_interface(
                String::from(ADAPTER_SUSPEND_INTERFACE),
                Box::new(move |path| {
                    let tx_clone = tx1.clone();
                    tokio::spawn(async move {
                        let _ = tx_clone.send(SuspendManagerMessage::AdapterFound(path)).await;
                    });
                }),
                Box::new(move || {
                    let tx_clone = tx2.clone();
                    tokio::spawn(async move {
                        let _ = tx_clone.send(SuspendManagerMessage::AdapterRemoved).await;
                    });
                }),
            )
            .await;

        // Watch for SuspendImminent signal from powerd.
        let mr = MatchRule::new_signal(POWERD_INTERFACE, SUSPEND_IMMINENT_SIGNAL)
            .with_sender(POWERD_SERVICE)
            .with_path(POWERD_PATH);
        self.conn.add_match_no_cb(&mr.match_str()).await.unwrap();

        let tx = self.tx.clone();
        self.conn.start_receive(
            mr,
            Box::new(move |msg, _conn| {
                if let Some(bytes) = msg.get1::<Vec<u8>>() {
                    let mut suspend_imminent = SuspendImminent::new();
                    let mut input_stream = CodedInputStream::from_bytes(&bytes[..]);
                    let decode_result = suspend_imminent.merge_from(&mut input_stream);
                    if let Err(e) = decode_result {
                        log::error!("Error decoding SuspendImminent signal: {}", e);
                    } else {
                        let tx_clone = tx.clone();
                        tokio::spawn(async move {
                            let _ = tx_clone
                                .send(SuspendManagerMessage::SuspendImminentReceived(
                                    suspend_imminent,
                                ))
                                .await;
                        });
                    }
                } else {
                    log::warn!("received empty SuspendImminent signal");
                }

                true
            }),
        );

        // Watch for SuspendDone signal from powerd.
        let mr = MatchRule::new_signal(POWERD_INTERFACE, SUSPEND_DONE_SIGNAL)
            .with_sender(POWERD_SERVICE)
            .with_path(POWERD_PATH);
        self.conn.add_match_no_cb(&mr.match_str()).await.unwrap();
        let tx = self.tx.clone();
        self.conn.start_receive(
            mr,
            Box::new(move |msg, _conn| {
                if let Some(bytes) = msg.get1::<Vec<u8>>() {
                    let mut suspend_done = SuspendDone::new();
                    let mut input_stream = CodedInputStream::from_bytes(&bytes[..]);
                    let decode_result = suspend_done.merge_from(&mut input_stream);
                    if let Err(e) = decode_result {
                        log::error!("Error decoding SuspendDone signal: {}", e);
                    } else {
                        let tx_clone = tx.clone();
                        tokio::spawn(async move {
                            let _ = tx_clone
                                .send(SuspendManagerMessage::SuspendDoneReceived(suspend_done))
                                .await;
                        });
                    }
                } else {
                    log::warn!("received empty SuspendDone signal");
                }

                true
            }),
        );
    }

    /// Starts the event handlers.
    pub async fn mainloop(&mut self) {
        loop {
            let m = self.rx.recv().await;

            if let Some(msg) = m {
                match msg {
                    SuspendManagerMessage::PowerdStarted => self.on_powerd_started().await,
                    SuspendManagerMessage::PowerdStopped => self.on_powerd_stopped(),
                    SuspendManagerMessage::SuspendImminentReceived(suspend_imminent) => {
                        self.on_suspend_imminent(suspend_imminent)
                    }
                    SuspendManagerMessage::SuspendDoneReceived(suspend_done) => {
                        self.on_suspend_done(suspend_done)
                    }
                    SuspendManagerMessage::AdapterFound(object_path) => {
                        self.on_adapter_found(object_path)
                    }
                    SuspendManagerMessage::AdapterRemoved => self.on_adapter_removed(),
                }
            } else {
                log::debug!("Exiting suspend manager mainloop");
                break;
            }
        }
    }

    async fn on_powerd_started(&mut self) {
        // As soon as powerd is available, we need to register to be a suspend readiness reporter.

        log::debug!("powerd started, initializing suspend manager");

        if self.context.lock().unwrap().powerd_session.is_some() {
            log::warn!("powerd session already exists, cleaning up first");
            self.on_powerd_stopped();
        }

        let conn = self.conn.clone();
        let powerd_proxy =
            dbus::nonblock::Proxy::new(POWERD_SERVICE, POWERD_PATH, DBUS_TIMEOUT, conn);

        let mut request = RegisterSuspendDelayRequest::new();
        request.set_description(String::from(BTMANAGERD_NAME));

        if let Some(register_suspend_delay_proto) = generate_proto_bytes(&request) {
            let result: Result<(Vec<u8>,), dbus::Error> = powerd_proxy
                .method_call(
                    POWERD_INTERFACE,
                    "RegisterSuspendDelay",
                    (register_suspend_delay_proto,),
                )
                .await;

            match result {
                Err(e) => {
                    log::error!("D-Bus error: {:?}", e);
                }
                Ok((return_proto,)) => {
                    let mut reply = RegisterSuspendDelayReply::new();
                    let mut input_stream = CodedInputStream::from_bytes(&return_proto[..]);
                    let decode_result = reply.merge_from(&mut input_stream);
                    if let Err(e) = decode_result {
                        log::error!("Error decoding RegisterSuspendDelayReply {:?}", e);
                    }

                    log::debug!("Suspend delay id = {}", reply.get_delay_id());

                    self.context.lock().unwrap().powerd_session =
                        Some(PowerdSession { delay_id: reply.get_delay_id(), powerd_proxy });
                }
            }
        } else {
            log::error!("Error writing RegisterSuspendDelayRequest");
        }
    }

    fn on_powerd_stopped(&mut self) {
        // TODO: Consider an edge case where powerd unexpectedly is stopped (maybe crashes) but we
        // still have pending SuspendImminent.
        log::debug!("powerd stopped, cleaning up");

        match self.context.lock().unwrap().powerd_session {
            None => log::warn!("powerd session does not exist, ignoring"),
            Some(_) => self.context.lock().unwrap().powerd_session = None,
        }
    }

    fn on_suspend_imminent(&mut self, suspend_imminent: SuspendImminent) {
        // powerd is telling us that system is about to suspend, if available tell btadapterd to
        // prepare for suspend.

        log::debug!(
            "received suspend imminent: suspend_id = {:?}, reason = {:?}",
            suspend_imminent.get_suspend_id(),
            suspend_imminent.get_reason()
        );

        if self.context.lock().unwrap().pending_suspend_imminent.is_some() {
            log::warn!("SuspendImminent signal received while there is a pending suspend imminent");
        }

        self.context.lock().unwrap().pending_suspend_imminent = Some(suspend_imminent.clone());

        if let Some(adapter_suspend_dbus) = &self.context.lock().unwrap().adapter_suspend_dbus {
            let adapter_suspend_id =
                adapter_suspend_dbus.suspend(match suspend_imminent.get_reason() {
                    SuspendImminent_Reason::IDLE => SuspendType::AllowWakeFromHid,
                    SuspendImminent_Reason::LID_CLOSED => SuspendType::NoWakesAllowed,
                    SuspendImminent_Reason::OTHER => SuspendType::Other,
                });
            log::debug!("Adapter suspend id = {}", adapter_suspend_id);
        } else {
            // If there is no adapter, that means Bluetooth is not active and we should always tell
            // powerd that we are ready to suspend.
            log::debug!("Adapter not available, suspend is ready.");
            if let Some(session) = &self.context.lock().unwrap().powerd_session {
                send_handle_suspend_readiness(
                    session.powerd_proxy.clone(),
                    session.delay_id,
                    suspend_imminent.get_suspend_id(),
                );
            } else {
                log::warn!("SuspendImminent is received when there is no powerd session");
            }
        }
    }

    fn on_suspend_done(&mut self, suspend_done: SuspendDone) {
        // powerd is telling us that suspend is done (system has resumed), so we tell btadapterd
        // to resume too.

        log::debug!("SuspendDone received: {:?}", suspend_done);

        if self.context.lock().unwrap().pending_suspend_imminent.is_none() {
            log::warn!("Receveid SuspendDone signal when there is no pending SuspendImminent");
        }

        self.context.lock().unwrap().pending_suspend_imminent = None;

        if let Some(adapter_suspend_dbus) = &self.context.lock().unwrap().adapter_suspend_dbus {
            let success = adapter_suspend_dbus.resume();
            log::debug!("Adapter resume is successful = {}", success);
        } else {
            log::debug!("Adapter is not available, nothing to resume.");
        }
    }

    fn on_adapter_found(&mut self, path: dbus::Path<'static>) {
        log::debug!("Found adapter {:?}", path);

        let conn = self.conn.clone();
        self.context.lock().unwrap().adapter_suspend_dbus =
            Some(SuspendDBus::new(conn.clone(), path));

        if let Some(adapter_suspend_dbus) = &mut self.context.lock().unwrap().adapter_suspend_dbus {
            let suspend_cb_objpath: String =
                format!("/org/chromium/bluetooth/Manager/suspend_callback");
            let crossroads = self.context.lock().unwrap().dbus_crossroads.clone();
            adapter_suspend_dbus.register_callback(Box::new(SuspendCallback::new(
                suspend_cb_objpath,
                conn,
                crossroads,
                self.context.clone(),
            )));
        }
    }

    fn on_adapter_removed(&mut self) {
        log::debug!("Adapter removed");
        self.context.lock().unwrap().adapter_suspend_dbus = None;
    }
}
+0 −2
Original line number Diff line number Diff line
@@ -20,8 +20,6 @@ pub struct ServiceWatcher {
    service_name: String,
}

#[allow(dead_code)]
// TODO: Remove allow dead_code when the code is actually used.
impl ServiceWatcher {
    pub fn new(conn: Arc<SyncConnection>, service_name: String) -> Self {
        ServiceWatcher { conn, service_name }