Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 26f5968a authored by Android Build Coastguard Worker's avatar Android Build Coastguard Worker
Browse files

Snap for 11685790 from 2f80d49f to 24Q3-release

Change-Id: I77b9b453affb08c34a7613ffb763bd3005b44f59
parents fbdf25fc 2f80d49f
Loading
Loading
Loading
Loading
+23 −21
Original line number Diff line number Diff line
@@ -19,9 +19,10 @@
#include "InputFilterCallbacks.h"
#include <aidl/com/android/server/inputflinger/BnInputThread.h>
#include <android/binder_auto_utils.h>
#include <utils/Looper.h>
#include <utils/StrongPointer.h>
#include <utils/Thread.h>
#include <functional>
#include "InputThread.h"

namespace android {

@@ -38,36 +39,37 @@ namespace {

using namespace aidl::com::android::server::inputflinger;

class InputFilterThreadImpl : public Thread {
public:
    explicit InputFilterThreadImpl(std::function<void()> loop)
          : Thread(/*canCallJava=*/true), mThreadLoop(loop) {}

    ~InputFilterThreadImpl() {}

private:
    std::function<void()> mThreadLoop;

    bool threadLoop() override {
        mThreadLoop();
        return true;
    }
};

class InputFilterThread : public BnInputThread {
public:
    InputFilterThread(std::shared_ptr<IInputThreadCallback> callback) : mCallback(callback) {
        mThread = sp<InputFilterThreadImpl>::make([this]() { loopOnce(); });
        mThread->run("InputFilterThread", ANDROID_PRIORITY_URGENT_DISPLAY);
        mLooper = sp<Looper>::make(/*allowNonCallbacks=*/false);
        mThread = std::make_unique<InputThread>(
                "InputFilter", [this]() { loopOnce(); }, [this]() { mLooper->wake(); });
    }

    ndk::ScopedAStatus finish() override {
        mThread->requestExit();
        if (mThread && mThread->isCallingThread()) {
            ALOGE("InputFilterThread cannot be stopped on itself!");
            return ndk::ScopedAStatus::fromStatus(INVALID_OPERATION);
        }
        mThread.reset();
        return ndk::ScopedAStatus::ok();
    }

    ndk::ScopedAStatus sleepUntil(nsecs_t when) override {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        mLooper->pollOnce(toMillisecondTimeoutDelay(now, when));
        return ndk::ScopedAStatus::ok();
    }

    ndk::ScopedAStatus wake() override {
        mLooper->wake();
        return ndk::ScopedAStatus::ok();
    }

private:
    sp<Thread> mThread;
    sp<Looper> mLooper;
    std::unique_ptr<InputThread> mThread;
    std::shared_ptr<IInputThreadCallback> mCallback;

    void loopOnce() { LOG_ALWAYS_FATAL_IF(!mCallback->loopOnce().isOk()); }
+17 −0
Original line number Diff line number Diff line
@@ -21,6 +21,13 @@ package com.android.server.inputflinger;
  * infrastructure.
  *
  * <p>
  * Earlier, we used rust thread park()/unpark() to put the thread to sleep and wake up from sleep.
  * But that caused some breakages after migrating the rust system crates to 2021 edition. Since,
  * the threads are created in C++, it was more reliable to rely on C++ side of the implementation
  * to implement the sleep and wake functions.
  * </p>
  *
  * <p>
  * NOTE: Tried using rust provided threading infrastructure but that uses std::thread which doesn't
  * have JNI support and can't call into Java policy that we use currently. libutils provided
  * Thread.h also recommends against using std::thread and using the provided infrastructure that
@@ -33,6 +40,16 @@ interface IInputThread {
    /** Finish input thread (if not running, this call does nothing) */
    void finish();

    /** Wakes up the thread (if sleeping) */
    void wake();

    /**
      * Puts the thread to sleep until a future time provided.
      *
      * NOTE: The thread can be awaken before the provided time using {@link wake()} function.
      */
    void sleepUntil(long whenNanos);

    /** Callbacks from C++ to call into inputflinger rust components */
    interface IInputThreadCallback {
        /**
+82 −24
Original line number Diff line number Diff line
@@ -396,14 +396,16 @@ pub mod test_callbacks {
        IInputThread::{BnInputThread, IInputThread, IInputThreadCallback::IInputThreadCallback},
        KeyEvent::KeyEvent,
    };
    use std::sync::{Arc, RwLock, RwLockWriteGuard};
    use nix::{sys::time::TimeValLike, time::clock_gettime, time::ClockId};
    use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc, RwLock, RwLockWriteGuard};
    use std::time::Duration;

    #[derive(Default)]
    struct TestCallbacksInner {
        last_modifier_state: u32,
        last_locked_modifier_state: u32,
        last_event: Option<KeyEvent>,
        test_thread: Option<TestThread>,
        test_thread: Option<FakeCppThread>,
    }

    #[derive(Default, Clone)]
@@ -438,13 +440,9 @@ pub mod test_callbacks {
            self.0.read().unwrap().last_locked_modifier_state
        }

        pub fn is_thread_created(&self) -> bool {
            self.0.read().unwrap().test_thread.is_some()
        }

        pub fn is_thread_finished(&self) -> bool {
        pub fn is_thread_running(&self) -> bool {
            if let Some(test_thread) = &self.0.read().unwrap().test_thread {
                return test_thread.is_finish_called();
                return test_thread.is_running();
            }
            false
        }
@@ -468,41 +466,101 @@ pub mod test_callbacks {

        fn createInputFilterThread(
            &self,
            _callback: &Strong<dyn IInputThreadCallback>,
            callback: &Strong<dyn IInputThreadCallback>,
        ) -> std::result::Result<Strong<dyn IInputThread>, binder::Status> {
            let test_thread = TestThread::new();
            let test_thread = FakeCppThread::new(callback.clone());
            test_thread.start_looper();
            self.inner().test_thread = Some(test_thread.clone());
            Result::Ok(BnInputThread::new_binder(test_thread, BinderFeatures::default()))
        }
    }

    #[derive(Default)]
    struct TestThreadInner {
        is_finish_called: bool,
    struct FakeCppThreadInner {
        join_handle: Option<std::thread::JoinHandle<()>>,
    }

    #[derive(Default, Clone)]
    struct TestThread(Arc<RwLock<TestThreadInner>>);
    #[derive(Clone)]
    struct FakeCppThread {
        callback: Arc<RwLock<Strong<dyn IInputThreadCallback>>>,
        inner: Arc<RwLock<FakeCppThreadInner>>,
        exit_flag: Arc<AtomicBool>,
    }

    impl Interface for TestThread {}
    impl Interface for FakeCppThread {}

    impl TestThread {
        pub fn new() -> Self {
            Default::default()
    impl FakeCppThread {
        pub fn new(callback: Strong<dyn IInputThreadCallback>) -> Self {
            let thread = Self {
                callback: Arc::new(RwLock::new(callback)),
                inner: Arc::new(RwLock::new(FakeCppThreadInner { join_handle: None })),
                exit_flag: Arc::new(AtomicBool::new(true)),
            };
            thread.create_looper();
            thread
        }

        fn inner(&self) -> RwLockWriteGuard<'_, TestThreadInner> {
            self.0.write().unwrap()
        fn inner(&self) -> RwLockWriteGuard<'_, FakeCppThreadInner> {
            self.inner.write().unwrap()
        }

        fn create_looper(&self) {
            let clone = self.clone();
            let join_handle = std::thread::Builder::new()
                .name("fake_cpp_thread".to_string())
                .spawn(move || loop {
                    if !clone.exit_flag.load(Ordering::Relaxed) {
                        clone.loop_once();
                    }
                })
                .unwrap();
            self.inner().join_handle = Some(join_handle);
            // Sleep until the looper thread starts
            std::thread::sleep(Duration::from_millis(10));
        }

        pub fn start_looper(&self) {
            self.exit_flag.store(false, Ordering::Relaxed);
        }

        pub fn is_finish_called(&self) -> bool {
            self.0.read().unwrap().is_finish_called
        pub fn stop_looper(&self) {
            self.exit_flag.store(true, Ordering::Relaxed);
            if let Some(join_handle) = &self.inner.read().unwrap().join_handle {
                join_handle.thread().unpark();
            }
        }

        pub fn is_running(&self) -> bool {
            !self.exit_flag.load(Ordering::Relaxed)
        }

    impl IInputThread for TestThread {
        fn loop_once(&self) {
            let _ = self.callback.read().unwrap().loopOnce();
        }
    }

    impl IInputThread for FakeCppThread {
        fn finish(&self) -> binder::Result<()> {
            self.inner().is_finish_called = true;
            self.stop_looper();
            Result::Ok(())
        }

        fn wake(&self) -> binder::Result<()> {
            if let Some(join_handle) = &self.inner.read().unwrap().join_handle {
                join_handle.thread().unpark();
            }
            Result::Ok(())
        }

        fn sleepUntil(&self, wake_up_time: i64) -> binder::Result<()> {
            let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap().num_nanoseconds();
            if wake_up_time == i64::MAX {
                std::thread::park();
            } else {
                let duration_now = Duration::from_nanos(now as u64);
                let duration_wake_up = Duration::from_nanos(wake_up_time as u64);
                std::thread::park_timeout(duration_wake_up - duration_now);
            }
            Result::Ok(())
        }
    }
+87 −167
Original line number Diff line number Diff line
@@ -33,8 +33,6 @@ use com_android_server_inputflinger::aidl::com::android::server::inputflinger::I
use log::{debug, error};
use nix::{sys::time::TimeValLike, time::clock_gettime, time::ClockId};
use std::sync::{Arc, RwLock, RwLockWriteGuard};
use std::time::Duration;
use std::{thread, thread::Thread};

/// Interface to receive callback from Input filter thread
pub trait ThreadCallback {
@@ -54,15 +52,18 @@ pub struct InputFilterThread {
    thread_creator: InputFilterThreadCreator,
    thread_callback_handler: ThreadCallbackHandler,
    inner: Arc<RwLock<InputFilterThreadInner>>,
    looper: Arc<RwLock<Looper>>,
}

struct InputFilterThreadInner {
    cpp_thread: Option<Strong<dyn IInputThread>>,
    looper: Option<Thread>,
    next_timeout: i64,
    is_finishing: bool,
}

struct Looper {
    cpp_thread: Option<Strong<dyn IInputThread>>,
}

impl InputFilterThread {
    /// Create a new InputFilterThread instance.
    /// NOTE: This will create a new thread. Clone the existing instance to reuse the same thread.
@@ -71,11 +72,10 @@ impl InputFilterThread {
            thread_creator,
            thread_callback_handler: ThreadCallbackHandler::new(),
            inner: Arc::new(RwLock::new(InputFilterThreadInner {
                cpp_thread: None,
                looper: None,
                next_timeout: i64::MAX,
                is_finishing: false,
            })),
            looper: Arc::new(RwLock::new(Looper { cpp_thread: None })),
        }
    }

@@ -83,12 +83,17 @@ impl InputFilterThread {
    /// time on the input filter thread.
    /// {@see ThreadCallback.notify_timeout_expired(...)}
    pub fn request_timeout_at_time(&self, when_nanos: i64) {
        let mut need_wake = false;
        {
            // acquire filter lock
            let filter_thread = &mut self.filter_thread();
            if when_nanos < filter_thread.next_timeout {
                filter_thread.next_timeout = when_nanos;
            if let Some(looper) = &filter_thread.looper {
                looper.unpark();
                need_wake = true;
            }
        } // release filter lock
        if need_wake {
            self.wake();
        }
    }

@@ -120,29 +125,36 @@ impl InputFilterThread {

    fn start(&self) {
        debug!("InputFilterThread: start thread");
        let filter_thread = &mut self.filter_thread();
        if filter_thread.cpp_thread.is_none() {
            filter_thread.cpp_thread = Some(self.thread_creator.create(
        {
            // acquire looper lock
            let looper = &mut self.looper();
            if looper.cpp_thread.is_none() {
                looper.cpp_thread = Some(self.thread_creator.create(
                    &BnInputThreadCallback::new_binder(self.clone(), BinderFeatures::default()),
                ));
            filter_thread.looper = None;
            filter_thread.is_finishing = false;
            }
        } // release looper lock
        self.set_finishing(false);
    }

    fn stop(&self) {
        debug!("InputFilterThread: stop thread");
        let filter_thread = &mut self.filter_thread();
        filter_thread.is_finishing = true;
        if let Some(looper) = &filter_thread.looper {
            looper.unpark();
        }
        if let Some(cpp_thread) = &filter_thread.cpp_thread {
        self.set_finishing(true);
        self.wake();
        {
            // acquire looper lock
            let looper = &mut self.looper();
            if let Some(cpp_thread) = &looper.cpp_thread {
                let _ = cpp_thread.finish();
            }
            // Clear all references
        filter_thread.cpp_thread = None;
        filter_thread.looper = None;
            looper.cpp_thread = None;
        } // release looper lock
    }

    fn set_finishing(&self, is_finishing: bool) {
        let filter_thread = &mut self.filter_thread();
        filter_thread.is_finishing = is_finishing;
    }

    fn loop_once(&self, now: i64) {
@@ -163,25 +175,34 @@ impl InputFilterThread {
                    wake_up_time = filter_thread.next_timeout;
                }
            }
            if filter_thread.looper.is_none() {
                filter_thread.looper = Some(std::thread::current());
            }
        } // release thread lock
        if timeout_expired {
            self.thread_callback_handler.notify_timeout_expired(now);
        }
        if wake_up_time == i64::MAX {
            thread::park();
        } else {
            let duration_now = Duration::from_nanos(now as u64);
            let duration_wake_up = Duration::from_nanos(wake_up_time as u64);
            thread::park_timeout(duration_wake_up - duration_now);
        }
        self.sleep_until(wake_up_time);
    }

    fn filter_thread(&self) -> RwLockWriteGuard<'_, InputFilterThreadInner> {
        self.inner.write().unwrap()
    }

    fn sleep_until(&self, when_nanos: i64) {
        let looper = self.looper.read().unwrap();
        if let Some(cpp_thread) = &looper.cpp_thread {
            let _ = cpp_thread.sleepUntil(when_nanos);
        }
    }

    fn wake(&self) {
        let looper = self.looper.read().unwrap();
        if let Some(cpp_thread) = &looper.cpp_thread {
            let _ = cpp_thread.wake();
        }
    }

    fn looper(&self) -> RwLockWriteGuard<'_, Looper> {
        self.looper.write().unwrap()
    }
}

impl Interface for InputFilterThread {}
@@ -252,165 +273,64 @@ impl ThreadCallbackHandler {

#[cfg(test)]
mod tests {
    use crate::input_filter::test_callbacks::TestCallbacks;
    use crate::input_filter_thread::{
        test_thread::TestThread, test_thread_callback::TestThreadCallback,
    };
    use crate::input_filter::{test_callbacks::TestCallbacks, InputFilterThreadCreator};
    use crate::input_filter_thread::{test_thread_callback::TestThreadCallback, InputFilterThread};
    use binder::Strong;
    use nix::{sys::time::TimeValLike, time::clock_gettime, time::ClockId};
    use std::sync::{Arc, RwLock};
    use std::time::Duration;

    #[test]
    fn test_register_callback_creates_cpp_thread() {
        let test_callbacks = TestCallbacks::new();
        let test_thread = TestThread::new(test_callbacks.clone());
        let test_thread = get_thread(test_callbacks.clone());
        let test_thread_callback = TestThreadCallback::new();
        test_thread.register_thread_callback(test_thread_callback);
        assert!(test_callbacks.is_thread_created());
        test_thread.register_thread_callback(Box::new(test_thread_callback));
        assert!(test_callbacks.is_thread_running());
    }

    #[test]
    fn test_unregister_callback_finishes_cpp_thread() {
        let test_callbacks = TestCallbacks::new();
        let test_thread = TestThread::new(test_callbacks.clone());
        let test_thread = get_thread(test_callbacks.clone());
        let test_thread_callback = TestThreadCallback::new();
        test_thread.register_thread_callback(test_thread_callback.clone());
        test_thread.unregister_thread_callback(test_thread_callback);
        assert!(test_callbacks.is_thread_finished());
        test_thread.register_thread_callback(Box::new(test_thread_callback.clone()));
        test_thread.unregister_thread_callback(Box::new(test_thread_callback));
        assert!(!test_callbacks.is_thread_running());
    }

    #[test]
    fn test_notify_timeout_called_after_timeout_expired() {
        let test_callbacks = TestCallbacks::new();
        let test_thread = TestThread::new(test_callbacks.clone());
        let test_thread = get_thread(test_callbacks.clone());
        let test_thread_callback = TestThreadCallback::new();
        test_thread.register_thread_callback(test_thread_callback.clone());
        test_thread.start_looper();

        test_thread.request_timeout_at_time(500);
        test_thread.dispatch_next();
        test_thread.register_thread_callback(Box::new(test_thread_callback.clone()));

        test_thread.move_time_forward(500);
        let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap().num_milliseconds();
        test_thread.request_timeout_at_time((now + 10) * 1000000);

        test_thread.stop_looper();
        std::thread::sleep(Duration::from_millis(20));
        assert!(test_thread_callback.is_notify_timeout_called());
    }

    #[test]
    fn test_notify_timeout_not_called_before_timeout_expired() {
        let test_callbacks = TestCallbacks::new();
        let test_thread = TestThread::new(test_callbacks.clone());
        let test_thread = get_thread(test_callbacks.clone());
        let test_thread_callback = TestThreadCallback::new();
        test_thread.register_thread_callback(test_thread_callback.clone());
        test_thread.start_looper();

        test_thread.request_timeout_at_time(500);
        test_thread.dispatch_next();

        test_thread.move_time_forward(100);

        test_thread.stop_looper();
        assert!(!test_thread_callback.is_notify_timeout_called());
    }
}

#[cfg(test)]
pub mod test_thread {
        test_thread.register_thread_callback(Box::new(test_thread_callback.clone()));

    use crate::input_filter::{test_callbacks::TestCallbacks, InputFilterThreadCreator};
    use crate::input_filter_thread::{test_thread_callback::TestThreadCallback, InputFilterThread};
    use binder::Strong;
    use std::sync::{
        atomic::AtomicBool, atomic::AtomicI64, atomic::Ordering, Arc, RwLock, RwLockWriteGuard,
    };
    use std::time::Duration;

    #[derive(Clone)]
    pub struct TestThread {
        input_thread: InputFilterThread,
        inner: Arc<RwLock<TestThreadInner>>,
        exit_flag: Arc<AtomicBool>,
        now: Arc<AtomicI64>,
    }

    struct TestThreadInner {
        join_handle: Option<std::thread::JoinHandle<()>>,
    }

    impl TestThread {
        pub fn new(callbacks: TestCallbacks) -> TestThread {
            Self {
                input_thread: InputFilterThread::new(InputFilterThreadCreator::new(Arc::new(
                    RwLock::new(Strong::new(Box::new(callbacks))),
                ))),
                inner: Arc::new(RwLock::new(TestThreadInner { join_handle: None })),
                exit_flag: Arc::new(AtomicBool::new(false)),
                now: Arc::new(AtomicI64::new(0)),
            }
        }

        fn inner(&self) -> RwLockWriteGuard<'_, TestThreadInner> {
            self.inner.write().unwrap()
        }

        pub fn get_input_thread(&self) -> InputFilterThread {
            self.input_thread.clone()
        }

        pub fn register_thread_callback(&self, thread_callback: TestThreadCallback) {
            self.input_thread.register_thread_callback(Box::new(thread_callback));
        }

        pub fn unregister_thread_callback(&self, thread_callback: TestThreadCallback) {
            self.input_thread.unregister_thread_callback(Box::new(thread_callback));
        }

        pub fn start_looper(&self) {
            self.exit_flag.store(false, Ordering::Relaxed);
            let clone = self.clone();
            let join_handle = std::thread::Builder::new()
                .name("test_thread".to_string())
                .spawn(move || {
                    while !clone.exit_flag.load(Ordering::Relaxed) {
                        clone.loop_once();
                    }
                })
                .unwrap();
            self.inner().join_handle = Some(join_handle);
            // Sleep until the looper thread starts
            std::thread::sleep(Duration::from_millis(10));
        }

        pub fn stop_looper(&self) {
            self.exit_flag.store(true, Ordering::Relaxed);
            {
                let mut inner = self.inner();
                if let Some(join_handle) = &inner.join_handle {
                    join_handle.thread().unpark();
                }
                inner.join_handle.take().map(std::thread::JoinHandle::join);
                inner.join_handle = None;
            }
            self.exit_flag.store(false, Ordering::Relaxed);
        }
        let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap().num_milliseconds();
        test_thread.request_timeout_at_time((now + 100) * 1000000);

        pub fn move_time_forward(&self, value: i64) {
            let _ = self.now.fetch_add(value, Ordering::Relaxed);
            self.dispatch_next();
        }

        pub fn dispatch_next(&self) {
            if let Some(join_handle) = &self.inner().join_handle {
                join_handle.thread().unpark();
            }
            // Sleep until the looper thread runs a loop
        std::thread::sleep(Duration::from_millis(10));
        assert!(!test_thread_callback.is_notify_timeout_called());
    }

        fn loop_once(&self) {
            self.input_thread.loop_once(self.now.load(Ordering::Relaxed));
        }

        pub fn request_timeout_at_time(&self, when_nanos: i64) {
            self.input_thread.request_timeout_at_time(when_nanos);
        }
    fn get_thread(callbacks: TestCallbacks) -> InputFilterThread {
        InputFilterThread::new(InputFilterThreadCreator::new(Arc::new(RwLock::new(Strong::new(
            Box::new(callbacks),
        )))))
    }
}

+88 −50

File changed.

Preview size limit exceeded, changes collapsed.

Loading