Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4e3c5aca authored by Bob Wang's avatar Bob Wang Committed by Gerrit Code Review
Browse files

Merge "[UWB HAL] Use AsyncFd to read the buffer only when it's readable." into main

parents 8dbc5115 4fec12a6
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ rust_binary {
        "libbinder_rs",
        "libbinder_tokio_rs",
        "libtokio",
        "libtokio_util",
        "libnix",
        "libanyhow",
    ],
+144 −60
Original line number Diff line number Diff line
@@ -4,32 +4,34 @@ use android_hardware_uwb::aidl::android::hardware::uwb::{
};
use android_hardware_uwb::binder;
use async_trait::async_trait;
use binder::{Result, Strong};
use binder::{DeathRecipient, IBinder, Result, Strong};

use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use log::info;
use std::sync::Arc;
use tokio::io::unix::AsyncFd;
use tokio::select;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;

use std::fs::{File, OpenOptions};
use std::io::{self, Read, Write};
use std::os::fd::AsRawFd;

use std::io;

use nix::sys::termios;

enum State {
    Closed,
    Opened {
        callbacks: Strong<dyn IUwbClientCallback>,
        #[allow(dead_code)]
        tasks: tokio::task::JoinSet<()>,
        _handle: tokio::task::JoinHandle<()>,
        serial: File,
        death_recipient: DeathRecipient,
        token: CancellationToken,
    },
}

pub struct UwbChip {
    name: String,
    path: String,
    state: Mutex<State>,
    state: Arc<Mutex<State>>,
}

impl UwbChip {
@@ -37,23 +39,59 @@ impl UwbChip {
        Self {
            name,
            path,
            state: Mutex::new(State::Closed),
            state: Arc::new(Mutex::new(State::Closed)),
        }
    }
}

impl State {
    /// Terminate the reader task.
    #[allow(dead_code)]
    fn close(&mut self) -> Result<()> {
        if let State::Opened { ref mut token, ref callbacks, ref mut death_recipient, .. } = *self {
            log::info!("waiting for task cancellation");
            callbacks.as_binder().unlink_to_death(death_recipient)?;
            token.cancel();
            log::info!("task successfully cancelled");
            callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
            *self = State::Closed;
        }
        Ok(())
    }
}

pub fn makeraw(file: File) -> io::Result<File> {
    let fd = file.as_raw_fd();

    let mut attrs = termios::tcgetattr(fd)?;

    termios::cfmakeraw(&mut attrs);
    // Configure the file descritpro as raw fd.
    use nix::sys::termios::*;
    let mut attrs = tcgetattr(fd)?;
    cfmakeraw(&mut attrs);
    tcsetattr(fd, SetArg::TCSANOW, &attrs)?;

    termios::tcsetattr(fd, termios::SetArg::TCSANOW, &attrs)?;
    // Configure the file descriptor as non blocking.
    use nix::fcntl::*;
    let flags = OFlag::from_bits(fcntl(fd, FcntlArg::F_GETFL)?).unwrap();
    fcntl(fd, FcntlArg::F_SETFL(flags | OFlag::O_NONBLOCK))?;

    Ok(file)
}

/// Wrapper around Read::read to handle EWOULDBLOCK.
/// /!\ will actively wait for more data, make sure to call
/// this method only when data is immediately expected.
fn read_exact(file: &mut File, mut buf: &mut [u8]) -> io::Result<()> {
    while buf.len() > 0 {
        match file.read(buf) {
            Ok(0) => panic!("unexpectedly reached end of file"),
            Ok(read_len) => buf = &mut buf[read_len..],
            Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue,
            Err(err) => return Err(err),
        }
    }
    Ok(())
}

impl binder::Interface for UwbChip {}

#[async_trait]
@@ -65,45 +103,95 @@ impl IUwbChipAsyncServer for UwbChip {
    async fn open(&self, callbacks: &Strong<dyn IUwbClientCallback>) -> Result<()> {
        log::debug!("open: {:?}", &self.path);

        let mut state = self.state.lock().await;

        if matches!(*state, State::Opened { .. }) {
            log::error!("the state is already opened");
            return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
        }

        let serial = OpenOptions::new()
            .read(true)
            .write(true)
            .create(false)
            .open(&self.path)
            .await
            .and_then(makeraw)
            .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;

        let mut state = self.state.lock().await;
        let state_death_recipient = self.state.clone();
        let mut death_recipient = DeathRecipient::new(move || {
            let mut state = state_death_recipient.blocking_lock();
            log::info!("Uwb service has died");
            state.close().unwrap();
        });

        callbacks.as_binder().link_to_death(&mut death_recipient)?;

        let token = CancellationToken::new();
        let cloned_token = token.clone();

        if let State::Closed = *state {
        let client_callbacks = callbacks.clone();

            let mut tasks = tokio::task::JoinSet::new();
            let mut reader = serial
        let reader = serial
            .try_clone()
                .await
            .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;

            tasks.spawn(async move {
        let join_handle = tokio::task::spawn(async move {
            info!("UCI reader task started");
            let mut reader = AsyncFd::new(reader).unwrap();

            loop {
                const UWB_HEADER_SIZE: usize = 4;

                let mut buffer = vec![0; UWB_HEADER_SIZE];
                    reader
                        .read_exact(&mut buffer[0..UWB_HEADER_SIZE])
                        .await
                        .unwrap();

                    let length = buffer[3] as usize + UWB_HEADER_SIZE;
                // The only time where the task can be safely
                // cancelled is when no packet bytes have been read.
                //
                // - read_exact() cannot be used here since it is not
                //   cancellation safe.
                // - read() cannot be used because it cannot be cancelled:
                //   the syscall is executed blocking on the threadpool
                //   and completes after termination of the task when
                //   the pipe receives more data.
                let read_len = loop {
                    // On some platforms, the readiness detecting mechanism
                    // relies on edge-triggered notifications. This means that
                    // the OS will only notify Tokio when the file descriptor
                    // transitions from not-ready to ready. For this to work
                    // you should first try to read or write and only poll for
                    // readiness if that fails with an error of 
                    // std::io::ErrorKind::WouldBlock.
                    match reader.get_mut().read(&mut buffer) {
                        Ok(0) => {
                            log::error!("file unexpectedly closed");
                            return;
                        }
                        Ok(read_len) => break read_len,
                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => (),
                        Err(_) => panic!("unexpected read failure"),
                    }

                    let mut guard = select! {
                        _ = cloned_token.cancelled() => {
                            info!("task is cancelled!");
                            return;
                        },
                        result = reader.readable() => result.unwrap()
                    };

                    guard.clear_ready();
                };

                // Read the remaining header bytes, if truncated.
                read_exact(reader.get_mut(), &mut buffer[read_len..]).unwrap();

                let length = buffer[3] as usize + UWB_HEADER_SIZE;
                buffer.resize(length, 0);
                    reader
                        .read_exact(&mut buffer[UWB_HEADER_SIZE..length])
                        .await
                        .unwrap();

                    client_callbacks.onUciMessage(&buffer[..]).unwrap();
                // Read the payload bytes.
                read_exact(reader.get_mut(), &mut buffer[UWB_HEADER_SIZE..]).unwrap();

                client_callbacks.onUciMessage(&buffer).unwrap();
            }
        });

@@ -111,14 +199,13 @@ impl IUwbChipAsyncServer for UwbChip {

        *state = State::Opened {
            callbacks: callbacks.clone(),
                tasks,
            _handle: join_handle,
            serial,
            death_recipient,
            token,
        };

        Ok(())
        } else {
            Err(binder::ExceptionCode::ILLEGAL_STATE.into())
        }
    }

    async fn close(&self) -> Result<()> {
@@ -126,10 +213,8 @@ impl IUwbChipAsyncServer for UwbChip {

        let mut state = self.state.lock().await;

        if let State::Opened { ref callbacks, .. } = *state {
            callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
            *state = State::Closed;
            Ok(())
        if matches!(*state, State::Opened { .. }) {
            state.close()
        } else {
            Err(binder::ExceptionCode::ILLEGAL_STATE.into())
        }
@@ -162,7 +247,6 @@ impl IUwbChipAsyncServer for UwbChip {
        if let State::Opened { ref mut serial, .. } = &mut *self.state.lock().await {
            serial
                .write(data)
                .await
                .map(|written| written as i32)
                .map_err(|_| binder::StatusCode::UNKNOWN_ERROR.into())
        } else {