Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 8db43045 authored by Henri Chataing's avatar Henri Chataing
Browse files

uwb: Refactor default AIDL HAL implementation

Refactor the implementation in order to:
  - simplify the code and make it less error prone
    by keeping the serial open at all times
  - address issues about futures being awoken too often
    taking CPU time

Bug: 345676140
Test: m android.hardware.uwb-service
Test: atest CtsUwbTestCases
Test: AVD boot test
Change-Id: Ibe4f00dab87ffac42f627def5ca84c0be5147820
parent ebf42989
Loading
Loading
Loading
Loading
+0 −4
Original line number Original line Diff line number Diff line
@@ -16,17 +16,13 @@ rust_binary {
    prefer_rlib: true,
    prefer_rlib: true,
    rustlibs: [
    rustlibs: [
        "android.hardware.uwb-V1-rust",
        "android.hardware.uwb-V1-rust",
        "liblibc",
        "liblogger",
        "liblogger",
        "liblog_rust",
        "liblog_rust",
        "libbinder_rs",
        "libbinder_rs",
        "libbinder_tokio_rs",
        "libbinder_tokio_rs",
        "libtokio",
        "libtokio",
        "libtokio_util",
        "libnix",
        "libnix",
        "libanyhow",
        "libanyhow",
        "libpdl_runtime",
        "libuwb_uci_packets",
    ],
    ],
    proc_macros: [
    proc_macros: [
        "libasync_trait",
        "libasync_trait",
+2 −5
Original line number Original line Diff line number Diff line
use android_hardware_uwb::aidl::android::hardware::uwb::IUwb::{self, IUwb as _};
use android_hardware_uwb::aidl::android::hardware::uwb::IUwb::{self, IUwb as _};
use android_hardware_uwb::binder;
use android_hardware_uwb::binder;


use tokio::runtime::Runtime;

use std::env;
use std::env;
use std::panic;
use std::panic;


@@ -25,13 +23,12 @@ fn main() -> anyhow::Result<()> {


    log::info!("UWB HAL starting up");
    log::info!("UWB HAL starting up");


    // Create the tokio runtime
    let rt = tokio::runtime::Runtime::new()?;
    let rt = Runtime::new()?;


    let chips = env::args()
    let chips = env::args()
        .skip(1) // Skip binary name
        .skip(1) // Skip binary name
        .enumerate()
        .enumerate()
        .map(|(i, arg)| uwb_chip::UwbChip::new(i.to_string(), arg));
        .map(|(i, arg)| rt.block_on(uwb_chip::UwbChip::new(i.to_string(), arg)));


    binder::add_service(
    binder::add_service(
        &format!("{}/default", IUwb::BpUwb::get_descriptor()),
        &format!("{}/default", IUwb::BpUwb::get_descriptor()),
+142 −221
Original line number Original line Diff line number Diff line
@@ -7,253 +7,146 @@ use async_trait::async_trait;
use binder::{DeathRecipient, IBinder, Result, Strong};
use binder::{DeathRecipient, IBinder, Result, Strong};


use std::sync::Arc;
use std::sync::Arc;
use tokio::io::unix::AsyncFd;
use tokio::fs;
use tokio::select;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::Mutex;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;


use std::fs::{File, OpenOptions};
enum ClientState {
use std::io::{self, Read, Write};
use std::os::unix::fs::OpenOptionsExt;

use pdl_runtime::Packet;
use uwb_uci_packets::{DeviceResetCmdBuilder, ResetConfig, UciControlPacket, UciControlPacketHal};

enum State {
    Closed,
    Closed,
    Opened {
    Opened {
        callbacks: Strong<dyn IUwbClientCallback>,
        callbacks: Strong<dyn IUwbClientCallback>,
        handle: tokio::task::JoinHandle<()>,
        _death_recipient: DeathRecipient,
        serial: File,
        death_recipient: DeathRecipient,
        token: CancellationToken,
    },
    },
}
}


pub struct UwbChip {
struct ServiceState {
    name: String,
    client_state: ClientState,
    path: String,
    writer: fs::File,
    state: Arc<Mutex<State>>,
}

impl UwbChip {
    pub fn new(name: String, path: String) -> Self {
        Self {
            name,
            path,
            state: Arc::new(Mutex::new(State::Closed)),
        }
    }
}

impl State {
    /// Terminate the reader task.
    async fn close(&mut self) -> Result<()> {
        if let State::Opened {
            ref mut token,
            ref callbacks,
            ref mut death_recipient,
            ref mut handle,
            ref mut serial,
        } = *self
        {
            log::info!("waiting for task cancellation");
            callbacks.as_binder().unlink_to_death(death_recipient)?;
            token.cancel();
            handle.await.unwrap();
            let packet: UciControlPacket = DeviceResetCmdBuilder {
                reset_config: ResetConfig::UwbsReset,
            }
            .build()
            .into();
            // DeviceResetCmd need to be send to reset the device to stop all running
            // activities on UWBS.
            let packet_vec: Vec<UciControlPacketHal> = packet.into();
            for hal_packet in packet_vec.into_iter() {
                serial
                    .write(&hal_packet.encode_to_vec().unwrap())
                    .map(|written| written as i32)
                    .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
            }
            consume_device_reset_rsp_and_ntf(
                &mut serial
                    .try_clone()
                    .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?,
            );
            log::info!("task successfully cancelled");
            callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
            *self = State::Closed;
        }
        Ok(())
    }
}
}


fn consume_device_reset_rsp_and_ntf(reader: &mut File) {
pub struct UwbChip {
    // Poll the DeviceResetRsp and DeviceStatusNtf before hal is closed to prevent
    name: String,
    // the host from getting response and notifications from a 'powered down' UWBS.
    _handle: tokio::task::JoinHandle<()>,
    // Do nothing when these packets are received.
    service_state: Arc<Mutex<ServiceState>>,
    const DEVICE_RESET_RSP: [u8; 5] = [64, 0, 0, 1, 0];
    const DEVICE_STATUS_NTF: [u8; 5] = [96, 1, 0, 1, 1];
    let mut buffer = vec![0; DEVICE_RESET_RSP.len() + DEVICE_STATUS_NTF.len()];
    read_exact(reader, &mut buffer).unwrap();

    // Make sure received packets are the expected ones.
    assert_eq!(&buffer[0..DEVICE_RESET_RSP.len()], &DEVICE_RESET_RSP);
    assert_eq!(&buffer[DEVICE_RESET_RSP.len()..], &DEVICE_STATUS_NTF);
}
}


pub fn makeraw(file: File) -> io::Result<File> {
/// Configure a file descriptor as raw fd.
    // Configure the file descriptor as raw fd.
pub fn makeraw(file: fs::File) -> std::io::Result<fs::File> {
    use nix::sys::termios::*;
    use nix::sys::termios::*;
    let mut attrs = tcgetattr(&file)?;
    let mut attrs = tcgetattr(&file)?;
    cfmakeraw(&mut attrs);
    cfmakeraw(&mut attrs);
    tcsetattr(&file, SetArg::TCSANOW, &attrs)?;
    tcsetattr(&file, SetArg::TCSANOW, &attrs)?;

    Ok(file)
    Ok(file)
}
}


/// Wrapper around Read::read to handle EWOULDBLOCK.
impl UwbChip {
/// /!\ will actively wait for more data, make sure to call
    pub async fn new(name: String, path: String) -> Self {
/// this method only when data is immediately expected.
        // Open the serial file and configure it as raw file
fn read_exact(file: &mut File, mut buf: &mut [u8]) -> io::Result<()> {
        // descriptor.
    while buf.len() > 0 {
        let mut reader = fs::OpenOptions::new()
        match file.read(buf) {
            Ok(0) => panic!("unexpectedly reached end of file"),
            Ok(read_len) => buf = &mut buf[read_len..],
            Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue,
            Err(err) => return Err(err),
        }
    }
    Ok(())
}

impl binder::Interface for UwbChip {}

#[async_trait]
impl IUwbChipAsyncServer for UwbChip {
    async fn getName(&self) -> Result<String> {
        Ok(self.name.clone())
    }

    async fn open(&self, callbacks: &Strong<dyn IUwbClientCallback>) -> Result<()> {
        log::debug!("open: {:?}", &self.path);

        let mut state = self.state.lock().await;

        if matches!(*state, State::Opened { .. }) {
            log::error!("the state is already opened");
            return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
        }

        let serial = OpenOptions::new()
            .read(true)
            .read(true)
            .write(true)
            .write(true)
            .create(false)
            .create(false)
            .custom_flags(libc::O_NONBLOCK)
            .open(&path)
            .open(&self.path)
            .await
            .and_then(makeraw)
            .and_then(makeraw)
            .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
            .expect("failed to open the serial device");

        let writer = reader
        let state_death_recipient = self.state.clone();
            .try_clone()
        let mut death_recipient = DeathRecipient::new(move || {
            .await
            let mut state = state_death_recipient.blocking_lock();
            .expect("failed to clone serial for writing");
            log::info!("Uwb service has died");
            if let State::Opened { ref mut token, .. } = *state {
                token.cancel();
                *state = State::Closed;
            }
        });

        callbacks.as_binder().link_to_death(&mut death_recipient)?;

        let token = CancellationToken::new();
        let cloned_token = token.clone();


        let client_callbacks = callbacks.clone();
        // Create the chip
        let service_state = Arc::new(Mutex::new(ServiceState {
            writer,
            client_state: ClientState::Closed,
        }));


        let reader = serial
        // Spawn the task that will run the polling loop.
            .try_clone()
        let handle = {
            .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
            let service_state = service_state.clone();


        let join_handle = tokio::task::spawn(async move {
            tokio::task::spawn(async move {
                log::info!("UCI reader task started");
                log::info!("UCI reader task started");
            let mut reader = AsyncFd::new(reader).unwrap();


            loop {
                const MESSAGE_TYPE_MASK: u8 = 0b11100000;
                const MESSAGE_TYPE_MASK: u8 = 0b11100000;
                const DATA_MESSAGE_TYPE: u8 = 0b000;
                const DATA_MESSAGE_TYPE: u8 = 0b000;
                const UWB_HEADER_SIZE: usize = 4;
                const UCI_HEADER_SIZE: usize = 4;
                let mut buffer = vec![0; UWB_HEADER_SIZE];
                const UCI_BUFFER_SIZE: usize = 1024;


                // The only time where the task can be safely
                let mut buffer = [0; UCI_BUFFER_SIZE];
                // cancelled is when no packet bytes have been read.
                //
                // - read_exact() cannot be used here since it is not
                //   cancellation safe.
                // - read() cannot be used because it cannot be cancelled:
                //   the syscall is executed blocking on the threadpool
                //   and completes after termination of the task when
                //   the pipe receives more data.
                let read_len = loop {
                    // On some platforms, the readiness detecting mechanism
                    // relies on edge-triggered notifications. This means that
                    // the OS will only notify Tokio when the file descriptor
                    // transitions from not-ready to ready. For this to work
                    // you should first try to read or write and only poll for
                    // readiness if that fails with an error of
                    // std::io::ErrorKind::WouldBlock.
                    match reader.get_mut().read(&mut buffer) {
                        Ok(0) => {
                            log::error!("file unexpectedly closed");
                            return;
                        }
                        Ok(read_len) => break read_len,
                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => (),
                        Err(_) => panic!("unexpected read failure"),
                    }

                    let mut guard = select! {
                        _ = cloned_token.cancelled() => {
                            log::info!("task is cancelled!");
                            return;
                        },
                        result = reader.readable() => result.unwrap()
                    };

                    guard.clear_ready();
                };

                // Read the remaining header bytes, if truncated.
                read_exact(reader.get_mut(), &mut buffer[read_len..]).unwrap();


                loop {
                    reader
                        .read_exact(&mut buffer[0..UCI_HEADER_SIZE])
                        .await
                        .expect("failed to read uci header bytes");
                    let common_header = buffer[0];
                    let common_header = buffer[0];
                    let mt = (common_header & MESSAGE_TYPE_MASK) >> 5;
                    let mt = (common_header & MESSAGE_TYPE_MASK) >> 5;
                    let payload_length = if mt == DATA_MESSAGE_TYPE {
                    let payload_length = if mt == DATA_MESSAGE_TYPE {
                    let payload_length_fields: [u8; 2] = buffer[2..=3].try_into().unwrap();
                        u16::from_le_bytes([buffer[2], buffer[3]]) as usize
                    u16::from_le_bytes(payload_length_fields) as usize
                    } else {
                    } else {
                        buffer[3] as usize
                        buffer[3] as usize
                    };
                    };


                let length = payload_length + UWB_HEADER_SIZE;
                    let total_packet_length = payload_length + UCI_HEADER_SIZE;
                buffer.resize(length, 0);
                    reader
                        .read_exact(&mut buffer[UCI_HEADER_SIZE..total_packet_length])
                        .await
                        .expect("failed to read uci payload bytes");

                    log::debug!(" <-- {:?}", &buffer[0..total_packet_length]);

                    let service_state = service_state.lock().await;
                    if let ClientState::Opened { ref callbacks, .. } = service_state.client_state {
                        callbacks
                            .onUciMessage(&buffer[0..total_packet_length])
                            .unwrap();
                    }
                }
            })
        };

        Self {
            name,
            _handle: handle,
            service_state,
        }
    }
}
impl binder::Interface for UwbChip {}

#[async_trait]
impl IUwbChipAsyncServer for UwbChip {
    async fn getName(&self) -> Result<String> {
        Ok(self.name.clone())
    }

    async fn open(&self, callbacks: &Strong<dyn IUwbClientCallback>) -> Result<()> {
        log::debug!("open");


                // Read the payload bytes.
        let mut service_state = self.service_state.lock().await;
                read_exact(reader.get_mut(), &mut buffer[UWB_HEADER_SIZE..]).unwrap();


                log::debug!(" <-- {:?}", buffer);
        if matches!(service_state.client_state, ClientState::Opened { .. }) {
                client_callbacks.onUciMessage(&buffer).unwrap();
            log::error!("the state is already opened");
            return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
        }
        }
        });


        let mut death_recipient = {
            let service_state = self.service_state.clone();
            DeathRecipient::new(move || {
                log::info!("Uwb service has died");
                let mut service_state = service_state.blocking_lock();
                service_state.client_state = ClientState::Closed;
            })
        };

        callbacks.as_binder().link_to_death(&mut death_recipient)?;
        callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?;
        callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?;


        *state = State::Opened {
        service_state.client_state = ClientState::Opened {
            callbacks: callbacks.clone(),
            callbacks: callbacks.clone(),
            handle: join_handle,
            _death_recipient: death_recipient,
            serial,
            death_recipient,
            token,
        };
        };


        Ok(())
        Ok(())
@@ -262,19 +155,42 @@ impl IUwbChipAsyncServer for UwbChip {
    async fn close(&self) -> Result<()> {
    async fn close(&self) -> Result<()> {
        log::debug!("close");
        log::debug!("close");


        let mut state = self.state.lock().await;
        let mut service_state = self.service_state.lock().await;


        if let State::Opened { .. } = *state {
        if matches!(service_state.client_state, ClientState::Closed) {
            state.close().await
            log::error!("the state is already closed");
        } else {
            return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
            Err(binder::ExceptionCode::ILLEGAL_STATE.into())
        }
        }

        // Send the command Device Reset to stop all running activities
        // on the UWBS emulator. This is necessary because the emulator
        // is otherwise not notified of the power down (the serial stays
        // open).
        //
        // The response to the command will be dropped by the polling loop,
        // as the callbacks will have been removed then.
        let uci_core_device_reset_cmd = [0x20, 0x00, 0x00, 0x01, 0x00];

        service_state
            .writer
            .write_all(&uci_core_device_reset_cmd)
            .await
            .expect("failed to write UCI Device Reset command");

        if let ClientState::Opened { ref callbacks, .. } = service_state.client_state {
            callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
        }

        service_state.client_state = ClientState::Closed;
        Ok(())
    }
    }


    async fn coreInit(&self) -> Result<()> {
    async fn coreInit(&self) -> Result<()> {
        log::debug!("coreInit");
        log::debug!("coreInit");


        if let State::Opened { ref callbacks, .. } = *self.state.lock().await {
        let service_state = self.service_state.lock().await;

        if let ClientState::Opened { ref callbacks, .. } = service_state.client_state {
            callbacks.onHalEvent(UwbEvent::POST_INIT_CPLT, UwbStatus::OK)?;
            callbacks.onHalEvent(UwbEvent::POST_INIT_CPLT, UwbStatus::OK)?;
            Ok(())
            Ok(())
        } else {
        } else {
@@ -289,22 +205,27 @@ impl IUwbChipAsyncServer for UwbChip {
    }
    }


    async fn getSupportedAndroidUciVersion(&self) -> Result<i32> {
    async fn getSupportedAndroidUciVersion(&self) -> Result<i32> {
        log::debug!("getSupportedAndroidUciVersion");

        Ok(1)
        Ok(1)
    }
    }


    async fn sendUciMessage(&self, data: &[u8]) -> Result<i32> {
    async fn sendUciMessage(&self, data: &[u8]) -> Result<i32> {
        log::debug!("sendUciMessage");
        log::debug!("sendUciMessage");


        if let State::Opened { ref mut serial, .. } = &mut *self.state.lock().await {
        let mut service_state = self.service_state.lock().await;

        if matches!(service_state.client_state, ClientState::Closed) {
            log::error!("the state is not opened");
            return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
        }

        log::debug!(" --> {:?}", data);
        log::debug!(" --> {:?}", data);
            let result = serial
        service_state
            .writer
            .write_all(data)
            .write_all(data)
            .await
            .map(|_| data.len() as i32)
            .map(|_| data.len() as i32)
                .map_err(|_| binder::StatusCode::UNKNOWN_ERROR.into());
            .map_err(|_| binder::StatusCode::UNKNOWN_ERROR.into())
            log::debug!(" status: {:?}", result);
            result
        } else {
            Err(binder::ExceptionCode::ILLEGAL_STATE.into())
        }
    }
    }
}
}