Loading services/inputflinger/InputFilterCallbacks.cpp +23 −21 Original line number Diff line number Diff line Loading @@ -19,9 +19,10 @@ #include "InputFilterCallbacks.h" #include <aidl/com/android/server/inputflinger/BnInputThread.h> #include <android/binder_auto_utils.h> #include <utils/Looper.h> #include <utils/StrongPointer.h> #include <utils/Thread.h> #include <functional> #include "InputThread.h" namespace android { Loading @@ -38,36 +39,37 @@ namespace { using namespace aidl::com::android::server::inputflinger; class InputFilterThreadImpl : public Thread { public: explicit InputFilterThreadImpl(std::function<void()> loop) : Thread(/*canCallJava=*/true), mThreadLoop(loop) {} ~InputFilterThreadImpl() {} private: std::function<void()> mThreadLoop; bool threadLoop() override { mThreadLoop(); return true; } }; class InputFilterThread : public BnInputThread { public: InputFilterThread(std::shared_ptr<IInputThreadCallback> callback) : mCallback(callback) { mThread = sp<InputFilterThreadImpl>::make([this]() { loopOnce(); }); mThread->run("InputFilterThread", ANDROID_PRIORITY_URGENT_DISPLAY); mLooper = sp<Looper>::make(/*allowNonCallbacks=*/false); mThread = std::make_unique<InputThread>( "InputFilter", [this]() { loopOnce(); }, [this]() { mLooper->wake(); }); } ndk::ScopedAStatus finish() override { mThread->requestExit(); if (mThread && mThread->isCallingThread()) { ALOGE("InputFilterThread cannot be stopped on itself!"); return ndk::ScopedAStatus::fromStatus(INVALID_OPERATION); } mThread.reset(); return ndk::ScopedAStatus::ok(); } ndk::ScopedAStatus sleepUntil(nsecs_t when) override { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); mLooper->pollOnce(toMillisecondTimeoutDelay(now, when)); return ndk::ScopedAStatus::ok(); } ndk::ScopedAStatus wake() override { mLooper->wake(); return ndk::ScopedAStatus::ok(); } private: sp<Thread> mThread; sp<Looper> mLooper; std::unique_ptr<InputThread> mThread; std::shared_ptr<IInputThreadCallback> mCallback; void loopOnce() { LOG_ALWAYS_FATAL_IF(!mCallback->loopOnce().isOk()); } Loading services/inputflinger/aidl/com/android/server/inputflinger/IInputThread.aidl +17 −0 Original line number Diff line number Diff line Loading @@ -21,6 +21,13 @@ package com.android.server.inputflinger; * infrastructure. * * <p> * Earlier, we used rust thread park()/unpark() to put the thread to sleep and wake up from sleep. * But that caused some breakages after migrating the rust system crates to 2021 edition. Since, * the threads are created in C++, it was more reliable to rely on C++ side of the implementation * to implement the sleep and wake functions. * </p> * * <p> * NOTE: Tried using rust provided threading infrastructure but that uses std::thread which doesn't * have JNI support and can't call into Java policy that we use currently. libutils provided * Thread.h also recommends against using std::thread and using the provided infrastructure that Loading @@ -33,6 +40,16 @@ interface IInputThread { /** Finish input thread (if not running, this call does nothing) */ void finish(); /** Wakes up the thread (if sleeping) */ void wake(); /** * Puts the thread to sleep until a future time provided. * * NOTE: The thread can be awaken before the provided time using {@link wake()} function. */ void sleepUntil(long whenNanos); /** Callbacks from C++ to call into inputflinger rust components */ interface IInputThreadCallback { /** Loading services/inputflinger/rust/input_filter.rs +82 −24 Original line number Diff line number Diff line Loading @@ -396,14 +396,16 @@ pub mod test_callbacks { IInputThread::{BnInputThread, IInputThread, IInputThreadCallback::IInputThreadCallback}, KeyEvent::KeyEvent, }; use std::sync::{Arc, RwLock, RwLockWriteGuard}; use nix::{sys::time::TimeValLike, time::clock_gettime, time::ClockId}; use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc, RwLock, RwLockWriteGuard}; use std::time::Duration; #[derive(Default)] struct TestCallbacksInner { last_modifier_state: u32, last_locked_modifier_state: u32, last_event: Option<KeyEvent>, test_thread: Option<TestThread>, test_thread: Option<FakeCppThread>, } #[derive(Default, Clone)] Loading Loading @@ -438,13 +440,9 @@ pub mod test_callbacks { self.0.read().unwrap().last_locked_modifier_state } pub fn is_thread_created(&self) -> bool { self.0.read().unwrap().test_thread.is_some() } pub fn is_thread_finished(&self) -> bool { pub fn is_thread_running(&self) -> bool { if let Some(test_thread) = &self.0.read().unwrap().test_thread { return test_thread.is_finish_called(); return test_thread.is_running(); } false } Loading @@ -468,41 +466,101 @@ pub mod test_callbacks { fn createInputFilterThread( &self, _callback: &Strong<dyn IInputThreadCallback>, callback: &Strong<dyn IInputThreadCallback>, ) -> std::result::Result<Strong<dyn IInputThread>, binder::Status> { let test_thread = TestThread::new(); let test_thread = FakeCppThread::new(callback.clone()); test_thread.start_looper(); self.inner().test_thread = Some(test_thread.clone()); Result::Ok(BnInputThread::new_binder(test_thread, BinderFeatures::default())) } } #[derive(Default)] struct TestThreadInner { is_finish_called: bool, struct FakeCppThreadInner { join_handle: Option<std::thread::JoinHandle<()>>, } #[derive(Default, Clone)] struct TestThread(Arc<RwLock<TestThreadInner>>); #[derive(Clone)] struct FakeCppThread { callback: Arc<RwLock<Strong<dyn IInputThreadCallback>>>, inner: Arc<RwLock<FakeCppThreadInner>>, exit_flag: Arc<AtomicBool>, } impl Interface for TestThread {} impl Interface for FakeCppThread {} impl TestThread { pub fn new() -> Self { Default::default() impl FakeCppThread { pub fn new(callback: Strong<dyn IInputThreadCallback>) -> Self { let thread = Self { callback: Arc::new(RwLock::new(callback)), inner: Arc::new(RwLock::new(FakeCppThreadInner { join_handle: None })), exit_flag: Arc::new(AtomicBool::new(true)), }; thread.create_looper(); thread } fn inner(&self) -> RwLockWriteGuard<'_, TestThreadInner> { self.0.write().unwrap() fn inner(&self) -> RwLockWriteGuard<'_, FakeCppThreadInner> { self.inner.write().unwrap() } fn create_looper(&self) { let clone = self.clone(); let join_handle = std::thread::Builder::new() .name("fake_cpp_thread".to_string()) .spawn(move || loop { if !clone.exit_flag.load(Ordering::Relaxed) { clone.loop_once(); } }) .unwrap(); self.inner().join_handle = Some(join_handle); // Sleep until the looper thread starts std::thread::sleep(Duration::from_millis(10)); } pub fn start_looper(&self) { self.exit_flag.store(false, Ordering::Relaxed); } pub fn is_finish_called(&self) -> bool { self.0.read().unwrap().is_finish_called pub fn stop_looper(&self) { self.exit_flag.store(true, Ordering::Relaxed); if let Some(join_handle) = &self.inner.read().unwrap().join_handle { join_handle.thread().unpark(); } } pub fn is_running(&self) -> bool { !self.exit_flag.load(Ordering::Relaxed) } impl IInputThread for TestThread { fn loop_once(&self) { let _ = self.callback.read().unwrap().loopOnce(); } } impl IInputThread for FakeCppThread { fn finish(&self) -> binder::Result<()> { self.inner().is_finish_called = true; self.stop_looper(); Result::Ok(()) } fn wake(&self) -> binder::Result<()> { if let Some(join_handle) = &self.inner.read().unwrap().join_handle { join_handle.thread().unpark(); } Result::Ok(()) } fn sleepUntil(&self, wake_up_time: i64) -> binder::Result<()> { let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap().num_nanoseconds(); if wake_up_time == i64::MAX { std::thread::park(); } else { let duration_now = Duration::from_nanos(now as u64); let duration_wake_up = Duration::from_nanos(wake_up_time as u64); std::thread::park_timeout(duration_wake_up - duration_now); } Result::Ok(()) } } Loading services/inputflinger/rust/input_filter_thread.rs +87 −167 Original line number Diff line number Diff line Loading @@ -33,8 +33,6 @@ use com_android_server_inputflinger::aidl::com::android::server::inputflinger::I use log::{debug, error}; use nix::{sys::time::TimeValLike, time::clock_gettime, time::ClockId}; use std::sync::{Arc, RwLock, RwLockWriteGuard}; use std::time::Duration; use std::{thread, thread::Thread}; /// Interface to receive callback from Input filter thread pub trait ThreadCallback { Loading @@ -54,15 +52,18 @@ pub struct InputFilterThread { thread_creator: InputFilterThreadCreator, thread_callback_handler: ThreadCallbackHandler, inner: Arc<RwLock<InputFilterThreadInner>>, looper: Arc<RwLock<Looper>>, } struct InputFilterThreadInner { cpp_thread: Option<Strong<dyn IInputThread>>, looper: Option<Thread>, next_timeout: i64, is_finishing: bool, } struct Looper { cpp_thread: Option<Strong<dyn IInputThread>>, } impl InputFilterThread { /// Create a new InputFilterThread instance. /// NOTE: This will create a new thread. Clone the existing instance to reuse the same thread. Loading @@ -71,11 +72,10 @@ impl InputFilterThread { thread_creator, thread_callback_handler: ThreadCallbackHandler::new(), inner: Arc::new(RwLock::new(InputFilterThreadInner { cpp_thread: None, looper: None, next_timeout: i64::MAX, is_finishing: false, })), looper: Arc::new(RwLock::new(Looper { cpp_thread: None })), } } Loading @@ -83,12 +83,17 @@ impl InputFilterThread { /// time on the input filter thread. /// {@see ThreadCallback.notify_timeout_expired(...)} pub fn request_timeout_at_time(&self, when_nanos: i64) { let mut need_wake = false; { // acquire filter lock let filter_thread = &mut self.filter_thread(); if when_nanos < filter_thread.next_timeout { filter_thread.next_timeout = when_nanos; if let Some(looper) = &filter_thread.looper { looper.unpark(); need_wake = true; } } // release filter lock if need_wake { self.wake(); } } Loading Loading @@ -120,29 +125,36 @@ impl InputFilterThread { fn start(&self) { debug!("InputFilterThread: start thread"); let filter_thread = &mut self.filter_thread(); if filter_thread.cpp_thread.is_none() { filter_thread.cpp_thread = Some(self.thread_creator.create( { // acquire looper lock let looper = &mut self.looper(); if looper.cpp_thread.is_none() { looper.cpp_thread = Some(self.thread_creator.create( &BnInputThreadCallback::new_binder(self.clone(), BinderFeatures::default()), )); filter_thread.looper = None; filter_thread.is_finishing = false; } } // release looper lock self.set_finishing(false); } fn stop(&self) { debug!("InputFilterThread: stop thread"); let filter_thread = &mut self.filter_thread(); filter_thread.is_finishing = true; if let Some(looper) = &filter_thread.looper { looper.unpark(); } if let Some(cpp_thread) = &filter_thread.cpp_thread { self.set_finishing(true); self.wake(); { // acquire looper lock let looper = &mut self.looper(); if let Some(cpp_thread) = &looper.cpp_thread { let _ = cpp_thread.finish(); } // Clear all references filter_thread.cpp_thread = None; filter_thread.looper = None; looper.cpp_thread = None; } // release looper lock } fn set_finishing(&self, is_finishing: bool) { let filter_thread = &mut self.filter_thread(); filter_thread.is_finishing = is_finishing; } fn loop_once(&self, now: i64) { Loading @@ -163,25 +175,34 @@ impl InputFilterThread { wake_up_time = filter_thread.next_timeout; } } if filter_thread.looper.is_none() { filter_thread.looper = Some(std::thread::current()); } } // release thread lock if timeout_expired { self.thread_callback_handler.notify_timeout_expired(now); } if wake_up_time == i64::MAX { thread::park(); } else { let duration_now = Duration::from_nanos(now as u64); let duration_wake_up = Duration::from_nanos(wake_up_time as u64); thread::park_timeout(duration_wake_up - duration_now); } self.sleep_until(wake_up_time); } fn filter_thread(&self) -> RwLockWriteGuard<'_, InputFilterThreadInner> { self.inner.write().unwrap() } fn sleep_until(&self, when_nanos: i64) { let looper = self.looper.read().unwrap(); if let Some(cpp_thread) = &looper.cpp_thread { let _ = cpp_thread.sleepUntil(when_nanos); } } fn wake(&self) { let looper = self.looper.read().unwrap(); if let Some(cpp_thread) = &looper.cpp_thread { let _ = cpp_thread.wake(); } } fn looper(&self) -> RwLockWriteGuard<'_, Looper> { self.looper.write().unwrap() } } impl Interface for InputFilterThread {} Loading Loading @@ -252,165 +273,64 @@ impl ThreadCallbackHandler { #[cfg(test)] mod tests { use crate::input_filter::test_callbacks::TestCallbacks; use crate::input_filter_thread::{ test_thread::TestThread, test_thread_callback::TestThreadCallback, }; use crate::input_filter::{test_callbacks::TestCallbacks, InputFilterThreadCreator}; use crate::input_filter_thread::{test_thread_callback::TestThreadCallback, InputFilterThread}; use binder::Strong; use nix::{sys::time::TimeValLike, time::clock_gettime, time::ClockId}; use std::sync::{Arc, RwLock}; use std::time::Duration; #[test] fn test_register_callback_creates_cpp_thread() { let test_callbacks = TestCallbacks::new(); let test_thread = TestThread::new(test_callbacks.clone()); let test_thread = get_thread(test_callbacks.clone()); let test_thread_callback = TestThreadCallback::new(); test_thread.register_thread_callback(test_thread_callback); assert!(test_callbacks.is_thread_created()); test_thread.register_thread_callback(Box::new(test_thread_callback)); assert!(test_callbacks.is_thread_running()); } #[test] fn test_unregister_callback_finishes_cpp_thread() { let test_callbacks = TestCallbacks::new(); let test_thread = TestThread::new(test_callbacks.clone()); let test_thread = get_thread(test_callbacks.clone()); let test_thread_callback = TestThreadCallback::new(); test_thread.register_thread_callback(test_thread_callback.clone()); test_thread.unregister_thread_callback(test_thread_callback); assert!(test_callbacks.is_thread_finished()); test_thread.register_thread_callback(Box::new(test_thread_callback.clone())); test_thread.unregister_thread_callback(Box::new(test_thread_callback)); assert!(!test_callbacks.is_thread_running()); } #[test] fn test_notify_timeout_called_after_timeout_expired() { let test_callbacks = TestCallbacks::new(); let test_thread = TestThread::new(test_callbacks.clone()); let test_thread = get_thread(test_callbacks.clone()); let test_thread_callback = TestThreadCallback::new(); test_thread.register_thread_callback(test_thread_callback.clone()); test_thread.start_looper(); test_thread.request_timeout_at_time(500); test_thread.dispatch_next(); test_thread.register_thread_callback(Box::new(test_thread_callback.clone())); test_thread.move_time_forward(500); let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap().num_milliseconds(); test_thread.request_timeout_at_time((now + 10) * 1000000); test_thread.stop_looper(); std::thread::sleep(Duration::from_millis(20)); assert!(test_thread_callback.is_notify_timeout_called()); } #[test] fn test_notify_timeout_not_called_before_timeout_expired() { let test_callbacks = TestCallbacks::new(); let test_thread = TestThread::new(test_callbacks.clone()); let test_thread = get_thread(test_callbacks.clone()); let test_thread_callback = TestThreadCallback::new(); test_thread.register_thread_callback(test_thread_callback.clone()); test_thread.start_looper(); test_thread.request_timeout_at_time(500); test_thread.dispatch_next(); test_thread.move_time_forward(100); test_thread.stop_looper(); assert!(!test_thread_callback.is_notify_timeout_called()); } } #[cfg(test)] pub mod test_thread { test_thread.register_thread_callback(Box::new(test_thread_callback.clone())); use crate::input_filter::{test_callbacks::TestCallbacks, InputFilterThreadCreator}; use crate::input_filter_thread::{test_thread_callback::TestThreadCallback, InputFilterThread}; use binder::Strong; use std::sync::{ atomic::AtomicBool, atomic::AtomicI64, atomic::Ordering, Arc, RwLock, RwLockWriteGuard, }; use std::time::Duration; #[derive(Clone)] pub struct TestThread { input_thread: InputFilterThread, inner: Arc<RwLock<TestThreadInner>>, exit_flag: Arc<AtomicBool>, now: Arc<AtomicI64>, } struct TestThreadInner { join_handle: Option<std::thread::JoinHandle<()>>, } impl TestThread { pub fn new(callbacks: TestCallbacks) -> TestThread { Self { input_thread: InputFilterThread::new(InputFilterThreadCreator::new(Arc::new( RwLock::new(Strong::new(Box::new(callbacks))), ))), inner: Arc::new(RwLock::new(TestThreadInner { join_handle: None })), exit_flag: Arc::new(AtomicBool::new(false)), now: Arc::new(AtomicI64::new(0)), } } fn inner(&self) -> RwLockWriteGuard<'_, TestThreadInner> { self.inner.write().unwrap() } pub fn get_input_thread(&self) -> InputFilterThread { self.input_thread.clone() } pub fn register_thread_callback(&self, thread_callback: TestThreadCallback) { self.input_thread.register_thread_callback(Box::new(thread_callback)); } pub fn unregister_thread_callback(&self, thread_callback: TestThreadCallback) { self.input_thread.unregister_thread_callback(Box::new(thread_callback)); } pub fn start_looper(&self) { self.exit_flag.store(false, Ordering::Relaxed); let clone = self.clone(); let join_handle = std::thread::Builder::new() .name("test_thread".to_string()) .spawn(move || { while !clone.exit_flag.load(Ordering::Relaxed) { clone.loop_once(); } }) .unwrap(); self.inner().join_handle = Some(join_handle); // Sleep until the looper thread starts std::thread::sleep(Duration::from_millis(10)); } pub fn stop_looper(&self) { self.exit_flag.store(true, Ordering::Relaxed); { let mut inner = self.inner(); if let Some(join_handle) = &inner.join_handle { join_handle.thread().unpark(); } inner.join_handle.take().map(std::thread::JoinHandle::join); inner.join_handle = None; } self.exit_flag.store(false, Ordering::Relaxed); } let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap().num_milliseconds(); test_thread.request_timeout_at_time((now + 100) * 1000000); pub fn move_time_forward(&self, value: i64) { let _ = self.now.fetch_add(value, Ordering::Relaxed); self.dispatch_next(); } pub fn dispatch_next(&self) { if let Some(join_handle) = &self.inner().join_handle { join_handle.thread().unpark(); } // Sleep until the looper thread runs a loop std::thread::sleep(Duration::from_millis(10)); assert!(!test_thread_callback.is_notify_timeout_called()); } fn loop_once(&self) { self.input_thread.loop_once(self.now.load(Ordering::Relaxed)); } pub fn request_timeout_at_time(&self, when_nanos: i64) { self.input_thread.request_timeout_at_time(when_nanos); } fn get_thread(callbacks: TestCallbacks) -> InputFilterThread { InputFilterThread::new(InputFilterThreadCreator::new(Arc::new(RwLock::new(Strong::new( Box::new(callbacks), ))))) } } Loading services/inputflinger/rust/slow_keys_filter.rs +88 −50 File changed.Preview size limit exceeded, changes collapsed. Show changes Loading
services/inputflinger/InputFilterCallbacks.cpp +23 −21 Original line number Diff line number Diff line Loading @@ -19,9 +19,10 @@ #include "InputFilterCallbacks.h" #include <aidl/com/android/server/inputflinger/BnInputThread.h> #include <android/binder_auto_utils.h> #include <utils/Looper.h> #include <utils/StrongPointer.h> #include <utils/Thread.h> #include <functional> #include "InputThread.h" namespace android { Loading @@ -38,36 +39,37 @@ namespace { using namespace aidl::com::android::server::inputflinger; class InputFilterThreadImpl : public Thread { public: explicit InputFilterThreadImpl(std::function<void()> loop) : Thread(/*canCallJava=*/true), mThreadLoop(loop) {} ~InputFilterThreadImpl() {} private: std::function<void()> mThreadLoop; bool threadLoop() override { mThreadLoop(); return true; } }; class InputFilterThread : public BnInputThread { public: InputFilterThread(std::shared_ptr<IInputThreadCallback> callback) : mCallback(callback) { mThread = sp<InputFilterThreadImpl>::make([this]() { loopOnce(); }); mThread->run("InputFilterThread", ANDROID_PRIORITY_URGENT_DISPLAY); mLooper = sp<Looper>::make(/*allowNonCallbacks=*/false); mThread = std::make_unique<InputThread>( "InputFilter", [this]() { loopOnce(); }, [this]() { mLooper->wake(); }); } ndk::ScopedAStatus finish() override { mThread->requestExit(); if (mThread && mThread->isCallingThread()) { ALOGE("InputFilterThread cannot be stopped on itself!"); return ndk::ScopedAStatus::fromStatus(INVALID_OPERATION); } mThread.reset(); return ndk::ScopedAStatus::ok(); } ndk::ScopedAStatus sleepUntil(nsecs_t when) override { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); mLooper->pollOnce(toMillisecondTimeoutDelay(now, when)); return ndk::ScopedAStatus::ok(); } ndk::ScopedAStatus wake() override { mLooper->wake(); return ndk::ScopedAStatus::ok(); } private: sp<Thread> mThread; sp<Looper> mLooper; std::unique_ptr<InputThread> mThread; std::shared_ptr<IInputThreadCallback> mCallback; void loopOnce() { LOG_ALWAYS_FATAL_IF(!mCallback->loopOnce().isOk()); } Loading
services/inputflinger/aidl/com/android/server/inputflinger/IInputThread.aidl +17 −0 Original line number Diff line number Diff line Loading @@ -21,6 +21,13 @@ package com.android.server.inputflinger; * infrastructure. * * <p> * Earlier, we used rust thread park()/unpark() to put the thread to sleep and wake up from sleep. * But that caused some breakages after migrating the rust system crates to 2021 edition. Since, * the threads are created in C++, it was more reliable to rely on C++ side of the implementation * to implement the sleep and wake functions. * </p> * * <p> * NOTE: Tried using rust provided threading infrastructure but that uses std::thread which doesn't * have JNI support and can't call into Java policy that we use currently. libutils provided * Thread.h also recommends against using std::thread and using the provided infrastructure that Loading @@ -33,6 +40,16 @@ interface IInputThread { /** Finish input thread (if not running, this call does nothing) */ void finish(); /** Wakes up the thread (if sleeping) */ void wake(); /** * Puts the thread to sleep until a future time provided. * * NOTE: The thread can be awaken before the provided time using {@link wake()} function. */ void sleepUntil(long whenNanos); /** Callbacks from C++ to call into inputflinger rust components */ interface IInputThreadCallback { /** Loading
services/inputflinger/rust/input_filter.rs +82 −24 Original line number Diff line number Diff line Loading @@ -396,14 +396,16 @@ pub mod test_callbacks { IInputThread::{BnInputThread, IInputThread, IInputThreadCallback::IInputThreadCallback}, KeyEvent::KeyEvent, }; use std::sync::{Arc, RwLock, RwLockWriteGuard}; use nix::{sys::time::TimeValLike, time::clock_gettime, time::ClockId}; use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc, RwLock, RwLockWriteGuard}; use std::time::Duration; #[derive(Default)] struct TestCallbacksInner { last_modifier_state: u32, last_locked_modifier_state: u32, last_event: Option<KeyEvent>, test_thread: Option<TestThread>, test_thread: Option<FakeCppThread>, } #[derive(Default, Clone)] Loading Loading @@ -438,13 +440,9 @@ pub mod test_callbacks { self.0.read().unwrap().last_locked_modifier_state } pub fn is_thread_created(&self) -> bool { self.0.read().unwrap().test_thread.is_some() } pub fn is_thread_finished(&self) -> bool { pub fn is_thread_running(&self) -> bool { if let Some(test_thread) = &self.0.read().unwrap().test_thread { return test_thread.is_finish_called(); return test_thread.is_running(); } false } Loading @@ -468,41 +466,101 @@ pub mod test_callbacks { fn createInputFilterThread( &self, _callback: &Strong<dyn IInputThreadCallback>, callback: &Strong<dyn IInputThreadCallback>, ) -> std::result::Result<Strong<dyn IInputThread>, binder::Status> { let test_thread = TestThread::new(); let test_thread = FakeCppThread::new(callback.clone()); test_thread.start_looper(); self.inner().test_thread = Some(test_thread.clone()); Result::Ok(BnInputThread::new_binder(test_thread, BinderFeatures::default())) } } #[derive(Default)] struct TestThreadInner { is_finish_called: bool, struct FakeCppThreadInner { join_handle: Option<std::thread::JoinHandle<()>>, } #[derive(Default, Clone)] struct TestThread(Arc<RwLock<TestThreadInner>>); #[derive(Clone)] struct FakeCppThread { callback: Arc<RwLock<Strong<dyn IInputThreadCallback>>>, inner: Arc<RwLock<FakeCppThreadInner>>, exit_flag: Arc<AtomicBool>, } impl Interface for TestThread {} impl Interface for FakeCppThread {} impl TestThread { pub fn new() -> Self { Default::default() impl FakeCppThread { pub fn new(callback: Strong<dyn IInputThreadCallback>) -> Self { let thread = Self { callback: Arc::new(RwLock::new(callback)), inner: Arc::new(RwLock::new(FakeCppThreadInner { join_handle: None })), exit_flag: Arc::new(AtomicBool::new(true)), }; thread.create_looper(); thread } fn inner(&self) -> RwLockWriteGuard<'_, TestThreadInner> { self.0.write().unwrap() fn inner(&self) -> RwLockWriteGuard<'_, FakeCppThreadInner> { self.inner.write().unwrap() } fn create_looper(&self) { let clone = self.clone(); let join_handle = std::thread::Builder::new() .name("fake_cpp_thread".to_string()) .spawn(move || loop { if !clone.exit_flag.load(Ordering::Relaxed) { clone.loop_once(); } }) .unwrap(); self.inner().join_handle = Some(join_handle); // Sleep until the looper thread starts std::thread::sleep(Duration::from_millis(10)); } pub fn start_looper(&self) { self.exit_flag.store(false, Ordering::Relaxed); } pub fn is_finish_called(&self) -> bool { self.0.read().unwrap().is_finish_called pub fn stop_looper(&self) { self.exit_flag.store(true, Ordering::Relaxed); if let Some(join_handle) = &self.inner.read().unwrap().join_handle { join_handle.thread().unpark(); } } pub fn is_running(&self) -> bool { !self.exit_flag.load(Ordering::Relaxed) } impl IInputThread for TestThread { fn loop_once(&self) { let _ = self.callback.read().unwrap().loopOnce(); } } impl IInputThread for FakeCppThread { fn finish(&self) -> binder::Result<()> { self.inner().is_finish_called = true; self.stop_looper(); Result::Ok(()) } fn wake(&self) -> binder::Result<()> { if let Some(join_handle) = &self.inner.read().unwrap().join_handle { join_handle.thread().unpark(); } Result::Ok(()) } fn sleepUntil(&self, wake_up_time: i64) -> binder::Result<()> { let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap().num_nanoseconds(); if wake_up_time == i64::MAX { std::thread::park(); } else { let duration_now = Duration::from_nanos(now as u64); let duration_wake_up = Duration::from_nanos(wake_up_time as u64); std::thread::park_timeout(duration_wake_up - duration_now); } Result::Ok(()) } } Loading
services/inputflinger/rust/input_filter_thread.rs +87 −167 Original line number Diff line number Diff line Loading @@ -33,8 +33,6 @@ use com_android_server_inputflinger::aidl::com::android::server::inputflinger::I use log::{debug, error}; use nix::{sys::time::TimeValLike, time::clock_gettime, time::ClockId}; use std::sync::{Arc, RwLock, RwLockWriteGuard}; use std::time::Duration; use std::{thread, thread::Thread}; /// Interface to receive callback from Input filter thread pub trait ThreadCallback { Loading @@ -54,15 +52,18 @@ pub struct InputFilterThread { thread_creator: InputFilterThreadCreator, thread_callback_handler: ThreadCallbackHandler, inner: Arc<RwLock<InputFilterThreadInner>>, looper: Arc<RwLock<Looper>>, } struct InputFilterThreadInner { cpp_thread: Option<Strong<dyn IInputThread>>, looper: Option<Thread>, next_timeout: i64, is_finishing: bool, } struct Looper { cpp_thread: Option<Strong<dyn IInputThread>>, } impl InputFilterThread { /// Create a new InputFilterThread instance. /// NOTE: This will create a new thread. Clone the existing instance to reuse the same thread. Loading @@ -71,11 +72,10 @@ impl InputFilterThread { thread_creator, thread_callback_handler: ThreadCallbackHandler::new(), inner: Arc::new(RwLock::new(InputFilterThreadInner { cpp_thread: None, looper: None, next_timeout: i64::MAX, is_finishing: false, })), looper: Arc::new(RwLock::new(Looper { cpp_thread: None })), } } Loading @@ -83,12 +83,17 @@ impl InputFilterThread { /// time on the input filter thread. /// {@see ThreadCallback.notify_timeout_expired(...)} pub fn request_timeout_at_time(&self, when_nanos: i64) { let mut need_wake = false; { // acquire filter lock let filter_thread = &mut self.filter_thread(); if when_nanos < filter_thread.next_timeout { filter_thread.next_timeout = when_nanos; if let Some(looper) = &filter_thread.looper { looper.unpark(); need_wake = true; } } // release filter lock if need_wake { self.wake(); } } Loading Loading @@ -120,29 +125,36 @@ impl InputFilterThread { fn start(&self) { debug!("InputFilterThread: start thread"); let filter_thread = &mut self.filter_thread(); if filter_thread.cpp_thread.is_none() { filter_thread.cpp_thread = Some(self.thread_creator.create( { // acquire looper lock let looper = &mut self.looper(); if looper.cpp_thread.is_none() { looper.cpp_thread = Some(self.thread_creator.create( &BnInputThreadCallback::new_binder(self.clone(), BinderFeatures::default()), )); filter_thread.looper = None; filter_thread.is_finishing = false; } } // release looper lock self.set_finishing(false); } fn stop(&self) { debug!("InputFilterThread: stop thread"); let filter_thread = &mut self.filter_thread(); filter_thread.is_finishing = true; if let Some(looper) = &filter_thread.looper { looper.unpark(); } if let Some(cpp_thread) = &filter_thread.cpp_thread { self.set_finishing(true); self.wake(); { // acquire looper lock let looper = &mut self.looper(); if let Some(cpp_thread) = &looper.cpp_thread { let _ = cpp_thread.finish(); } // Clear all references filter_thread.cpp_thread = None; filter_thread.looper = None; looper.cpp_thread = None; } // release looper lock } fn set_finishing(&self, is_finishing: bool) { let filter_thread = &mut self.filter_thread(); filter_thread.is_finishing = is_finishing; } fn loop_once(&self, now: i64) { Loading @@ -163,25 +175,34 @@ impl InputFilterThread { wake_up_time = filter_thread.next_timeout; } } if filter_thread.looper.is_none() { filter_thread.looper = Some(std::thread::current()); } } // release thread lock if timeout_expired { self.thread_callback_handler.notify_timeout_expired(now); } if wake_up_time == i64::MAX { thread::park(); } else { let duration_now = Duration::from_nanos(now as u64); let duration_wake_up = Duration::from_nanos(wake_up_time as u64); thread::park_timeout(duration_wake_up - duration_now); } self.sleep_until(wake_up_time); } fn filter_thread(&self) -> RwLockWriteGuard<'_, InputFilterThreadInner> { self.inner.write().unwrap() } fn sleep_until(&self, when_nanos: i64) { let looper = self.looper.read().unwrap(); if let Some(cpp_thread) = &looper.cpp_thread { let _ = cpp_thread.sleepUntil(when_nanos); } } fn wake(&self) { let looper = self.looper.read().unwrap(); if let Some(cpp_thread) = &looper.cpp_thread { let _ = cpp_thread.wake(); } } fn looper(&self) -> RwLockWriteGuard<'_, Looper> { self.looper.write().unwrap() } } impl Interface for InputFilterThread {} Loading Loading @@ -252,165 +273,64 @@ impl ThreadCallbackHandler { #[cfg(test)] mod tests { use crate::input_filter::test_callbacks::TestCallbacks; use crate::input_filter_thread::{ test_thread::TestThread, test_thread_callback::TestThreadCallback, }; use crate::input_filter::{test_callbacks::TestCallbacks, InputFilterThreadCreator}; use crate::input_filter_thread::{test_thread_callback::TestThreadCallback, InputFilterThread}; use binder::Strong; use nix::{sys::time::TimeValLike, time::clock_gettime, time::ClockId}; use std::sync::{Arc, RwLock}; use std::time::Duration; #[test] fn test_register_callback_creates_cpp_thread() { let test_callbacks = TestCallbacks::new(); let test_thread = TestThread::new(test_callbacks.clone()); let test_thread = get_thread(test_callbacks.clone()); let test_thread_callback = TestThreadCallback::new(); test_thread.register_thread_callback(test_thread_callback); assert!(test_callbacks.is_thread_created()); test_thread.register_thread_callback(Box::new(test_thread_callback)); assert!(test_callbacks.is_thread_running()); } #[test] fn test_unregister_callback_finishes_cpp_thread() { let test_callbacks = TestCallbacks::new(); let test_thread = TestThread::new(test_callbacks.clone()); let test_thread = get_thread(test_callbacks.clone()); let test_thread_callback = TestThreadCallback::new(); test_thread.register_thread_callback(test_thread_callback.clone()); test_thread.unregister_thread_callback(test_thread_callback); assert!(test_callbacks.is_thread_finished()); test_thread.register_thread_callback(Box::new(test_thread_callback.clone())); test_thread.unregister_thread_callback(Box::new(test_thread_callback)); assert!(!test_callbacks.is_thread_running()); } #[test] fn test_notify_timeout_called_after_timeout_expired() { let test_callbacks = TestCallbacks::new(); let test_thread = TestThread::new(test_callbacks.clone()); let test_thread = get_thread(test_callbacks.clone()); let test_thread_callback = TestThreadCallback::new(); test_thread.register_thread_callback(test_thread_callback.clone()); test_thread.start_looper(); test_thread.request_timeout_at_time(500); test_thread.dispatch_next(); test_thread.register_thread_callback(Box::new(test_thread_callback.clone())); test_thread.move_time_forward(500); let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap().num_milliseconds(); test_thread.request_timeout_at_time((now + 10) * 1000000); test_thread.stop_looper(); std::thread::sleep(Duration::from_millis(20)); assert!(test_thread_callback.is_notify_timeout_called()); } #[test] fn test_notify_timeout_not_called_before_timeout_expired() { let test_callbacks = TestCallbacks::new(); let test_thread = TestThread::new(test_callbacks.clone()); let test_thread = get_thread(test_callbacks.clone()); let test_thread_callback = TestThreadCallback::new(); test_thread.register_thread_callback(test_thread_callback.clone()); test_thread.start_looper(); test_thread.request_timeout_at_time(500); test_thread.dispatch_next(); test_thread.move_time_forward(100); test_thread.stop_looper(); assert!(!test_thread_callback.is_notify_timeout_called()); } } #[cfg(test)] pub mod test_thread { test_thread.register_thread_callback(Box::new(test_thread_callback.clone())); use crate::input_filter::{test_callbacks::TestCallbacks, InputFilterThreadCreator}; use crate::input_filter_thread::{test_thread_callback::TestThreadCallback, InputFilterThread}; use binder::Strong; use std::sync::{ atomic::AtomicBool, atomic::AtomicI64, atomic::Ordering, Arc, RwLock, RwLockWriteGuard, }; use std::time::Duration; #[derive(Clone)] pub struct TestThread { input_thread: InputFilterThread, inner: Arc<RwLock<TestThreadInner>>, exit_flag: Arc<AtomicBool>, now: Arc<AtomicI64>, } struct TestThreadInner { join_handle: Option<std::thread::JoinHandle<()>>, } impl TestThread { pub fn new(callbacks: TestCallbacks) -> TestThread { Self { input_thread: InputFilterThread::new(InputFilterThreadCreator::new(Arc::new( RwLock::new(Strong::new(Box::new(callbacks))), ))), inner: Arc::new(RwLock::new(TestThreadInner { join_handle: None })), exit_flag: Arc::new(AtomicBool::new(false)), now: Arc::new(AtomicI64::new(0)), } } fn inner(&self) -> RwLockWriteGuard<'_, TestThreadInner> { self.inner.write().unwrap() } pub fn get_input_thread(&self) -> InputFilterThread { self.input_thread.clone() } pub fn register_thread_callback(&self, thread_callback: TestThreadCallback) { self.input_thread.register_thread_callback(Box::new(thread_callback)); } pub fn unregister_thread_callback(&self, thread_callback: TestThreadCallback) { self.input_thread.unregister_thread_callback(Box::new(thread_callback)); } pub fn start_looper(&self) { self.exit_flag.store(false, Ordering::Relaxed); let clone = self.clone(); let join_handle = std::thread::Builder::new() .name("test_thread".to_string()) .spawn(move || { while !clone.exit_flag.load(Ordering::Relaxed) { clone.loop_once(); } }) .unwrap(); self.inner().join_handle = Some(join_handle); // Sleep until the looper thread starts std::thread::sleep(Duration::from_millis(10)); } pub fn stop_looper(&self) { self.exit_flag.store(true, Ordering::Relaxed); { let mut inner = self.inner(); if let Some(join_handle) = &inner.join_handle { join_handle.thread().unpark(); } inner.join_handle.take().map(std::thread::JoinHandle::join); inner.join_handle = None; } self.exit_flag.store(false, Ordering::Relaxed); } let now = clock_gettime(ClockId::CLOCK_MONOTONIC).unwrap().num_milliseconds(); test_thread.request_timeout_at_time((now + 100) * 1000000); pub fn move_time_forward(&self, value: i64) { let _ = self.now.fetch_add(value, Ordering::Relaxed); self.dispatch_next(); } pub fn dispatch_next(&self) { if let Some(join_handle) = &self.inner().join_handle { join_handle.thread().unpark(); } // Sleep until the looper thread runs a loop std::thread::sleep(Duration::from_millis(10)); assert!(!test_thread_callback.is_notify_timeout_called()); } fn loop_once(&self) { self.input_thread.loop_once(self.now.load(Ordering::Relaxed)); } pub fn request_timeout_at_time(&self, when_nanos: i64) { self.input_thread.request_timeout_at_time(when_nanos); } fn get_thread(callbacks: TestCallbacks) -> InputFilterThread { InputFilterThread::new(InputFilterThreadCreator::new(Arc::new(RwLock::new(Strong::new( Box::new(callbacks), ))))) } } Loading
services/inputflinger/rust/slow_keys_filter.rs +88 −50 File changed.Preview size limit exceeded, changes collapsed. Show changes