Loading automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h +42 −1 Original line number Original line Diff line number Diff line Loading @@ -17,6 +17,7 @@ #pragma once #pragma once #include <android-base/thread_annotations.h> #include <android-base/thread_annotations.h> #include <utils/Looper.h> #include <wakeup_client.grpc.pb.h> #include <wakeup_client.grpc.pb.h> #include <condition_variable> #include <condition_variable> #include <mutex> #include <mutex> Loading @@ -41,20 +42,60 @@ class FakeTaskGenerator final { constexpr static uint8_t DATA[] = {0xde, 0xad, 0xbe, 0xef}; constexpr static uint8_t DATA[] = {0xde, 0xad, 0xbe, 0xef}; }; }; struct TaskInfo { // This is unique per-task. Note that a task might be popped and put back into the task queue, // it will have a new task ID but the same clientId in the task data. int taskId; long timestampInMs; GetRemoteTasksResponse taskData; }; struct TaskInfoComparator { // We want the smallest timestamp and smallest task ID on top. bool operator()(const TaskInfo& l, const TaskInfo& r) { return l.timestampInMs > r.timestampInMs || (l.timestampInMs == r.timestampInMs && l.taskId > r.taskId); } }; // forward-declaration. class TaskQueue; class TaskTimeoutMessageHandler final : public android::MessageHandler { public: TaskTimeoutMessageHandler(TaskQueue* taskQueue); void handleMessage(const android::Message& message) override; private: TaskQueue* mTaskQueue; }; // TaskQueue is thread-safe. // TaskQueue is thread-safe. class TaskQueue final { class TaskQueue final { public: public: TaskQueue(); ~TaskQueue(); void add(const GetRemoteTasksResponse& response); void add(const GetRemoteTasksResponse& response); std::optional<GetRemoteTasksResponse> maybePopOne(); std::optional<GetRemoteTasksResponse> maybePopOne(); void waitForTask(); void waitForTask(); void stopWait(); void stopWait(); void handleTaskTimeout(); private: private: std::thread mCheckTaskTimeoutThread; std::mutex mLock; std::mutex mLock; std::queue<GetRemoteTasksResponse> mTasks GUARDED_BY(mLock); std::priority_queue<TaskInfo, std::vector<TaskInfo>, TaskInfoComparator> mTasks GUARDED_BY(mLock); // A variable to notify mTasks is not empty. // A variable to notify mTasks is not empty. std::condition_variable mTasksNotEmptyCv; std::condition_variable mTasksNotEmptyCv; bool mStopped GUARDED_BY(mLock); bool mStopped GUARDED_BY(mLock); android::sp<Looper> mLooper; android::sp<TaskTimeoutMessageHandler> mTaskTimeoutMessageHandler; std::atomic<int> mTaskIdCounter = 0; void checkForTestTimeoutLoop(); void waitForTaskWithLock(std::unique_lock<std::mutex>& lock); }; }; class TestWakeupClientServiceImpl final : public WakeupClient::Service { class TestWakeupClientServiceImpl final : public WakeupClient::Service { Loading automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp +91 −8 Original line number Original line Diff line number Diff line Loading @@ -18,6 +18,8 @@ #include <android-base/stringprintf.h> #include <android-base/stringprintf.h> #include <utils/Log.h> #include <utils/Log.h> #include <utils/Looper.h> #include <utils/SystemClock.h> #include <chrono> #include <chrono> #include <thread> #include <thread> Loading @@ -28,13 +30,15 @@ namespace remoteaccess { namespace { namespace { using ::android::uptimeMillis; using ::android::base::ScopedLockAssertion; using ::android::base::ScopedLockAssertion; using ::android::base::StringPrintf; using ::android::base::StringPrintf; using ::grpc::ServerContext; using ::grpc::ServerContext; using ::grpc::ServerWriter; using ::grpc::ServerWriter; using ::grpc::Status; using ::grpc::Status; constexpr int kTaskIntervalInSec = 5; constexpr int kTaskIntervalInMs = 5'000; constexpr int KTaskTimeoutInMs = 20'000; } // namespace } // namespace Loading @@ -47,24 +51,68 @@ GetRemoteTasksResponse FakeTaskGenerator::generateTask() { return response; return response; } } TaskTimeoutMessageHandler::TaskTimeoutMessageHandler(TaskQueue* taskQueue) : mTaskQueue(taskQueue) {} void TaskTimeoutMessageHandler::handleMessage(const android::Message& message) { mTaskQueue->handleTaskTimeout(); } TaskQueue::TaskQueue() { mTaskTimeoutMessageHandler = android::sp<TaskTimeoutMessageHandler>::make(this); mLooper = Looper::prepare(/*opts=*/0); mCheckTaskTimeoutThread = std::thread([this] { checkForTestTimeoutLoop(); }); } TaskQueue::~TaskQueue() { { std::lock_guard<std::mutex> lockGuard(mLock); mStopped = true; } while (true) { // Remove all pending timeout handlers from queue. if (!maybePopOne().has_value()) { break; } } if (mCheckTaskTimeoutThread.joinable()) { mCheckTaskTimeoutThread.join(); } } std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() { std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() { std::lock_guard<std::mutex> lockGuard(mLock); std::lock_guard<std::mutex> lockGuard(mLock); if (mTasks.size() == 0) { if (mTasks.size() == 0) { return std::nullopt; return std::nullopt; } } GetRemoteTasksResponse response = mTasks.front(); TaskInfo response = std::move(mTasks.top()); mTasks.pop(); mTasks.pop(); return std::move(response); mLooper->removeMessages(mTaskTimeoutMessageHandler, response.taskId); return std::move(response.taskData); } } void TaskQueue::add(const GetRemoteTasksResponse& task) { void TaskQueue::add(const GetRemoteTasksResponse& task) { // TODO (b/246841306): add timeout to tasks. std::lock_guard<std::mutex> lockGuard(mLock); std::lock_guard<std::mutex> lockGuard(mLock); mTasks.push(task); if (mStopped) { return; } int taskId = mTaskIdCounter++; mTasks.push(TaskInfo{ .taskId = taskId, .timestampInMs = uptimeMillis(), .taskData = task, }); android::Message message(taskId); mLooper->sendMessageDelayed(KTaskTimeoutInMs * 1000, mTaskTimeoutMessageHandler, message); mTasksNotEmptyCv.notify_all(); mTasksNotEmptyCv.notify_all(); } } void TaskQueue::waitForTask() { void TaskQueue::waitForTask() { std::unique_lock<std::mutex> lock(mLock); std::unique_lock<std::mutex> lock(mLock); waitForTaskWithLock(lock); } void TaskQueue::waitForTaskWithLock(std::unique_lock<std::mutex>& lock) { mTasksNotEmptyCv.wait(lock, [this] { mTasksNotEmptyCv.wait(lock, [this] { ScopedLockAssertion lockAssertion(mLock); ScopedLockAssertion lockAssertion(mLock); return mTasks.size() > 0 || mStopped; return mTasks.size() > 0 || mStopped; Loading @@ -77,6 +125,41 @@ void TaskQueue::stopWait() { mTasksNotEmptyCv.notify_all(); mTasksNotEmptyCv.notify_all(); } } void TaskQueue::checkForTestTimeoutLoop() { Looper::setForThread(mLooper); while (true) { { std::unique_lock<std::mutex> lock(mLock); if (mStopped) { ALOGW("The TestWakeupClientServiceImpl is stopping, " "exiting checkForTestTimeoutLoop"); return; } } mLooper->pollAll(/*timeoutMillis=*/-1); } } void TaskQueue::handleTaskTimeout() { // We know which task timed-out from the taskId in the message. However, there is no easy way // to remove a specific task with the task ID from the priority_queue, so we just check from // the top of the queue (which have the oldest tasks). std::lock_guard<std::mutex> lockGuard(mLock); long now = uptimeMillis(); while (mTasks.size() > 0) { const TaskInfo& taskInfo = mTasks.top(); if (taskInfo.timestampInMs + KTaskTimeoutInMs > now) { break; } // In real implementation, this should report task failure to remote wakeup server. ALOGW("Task for client ID: %s timed-out, added at %ld ms, now %ld ms", taskInfo.taskData.clientid().c_str(), taskInfo.timestampInMs, now); mTasks.pop(); } } TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() { TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() { mThread = std::thread([this] { fakeTaskGenerateLoop(); }); mThread = std::thread([this] { fakeTaskGenerateLoop(); }); } } Loading @@ -95,13 +178,13 @@ TestWakeupClientServiceImpl::~TestWakeupClientServiceImpl() { void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() { void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() { // In actual implementation, this should communicate with the remote server and receives tasks // In actual implementation, this should communicate with the remote server and receives tasks // from it. Here we simulate receiving one remote task every {kTaskIntervalInSec}s. // from it. Here we simulate receiving one remote task every {kTaskIntervalInMs}ms. while (true) { while (true) { mTaskQueue.add(mFakeTaskGenerator.generateTask()); mTaskQueue.add(mFakeTaskGenerator.generateTask()); ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInSec); ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInMs); std::unique_lock lk(mLock); std::unique_lock lk(mLock); if (mServerStoppedCv.wait_for(lk, std::chrono::seconds(kTaskIntervalInSec), [this] { if (mServerStoppedCv.wait_for(lk, std::chrono::milliseconds(kTaskIntervalInMs), [this] { ScopedLockAssertion lockAssertion(mLock); ScopedLockAssertion lockAssertion(mLock); return mServerStopped; return mServerStopped; })) { })) { Loading Loading
automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h +42 −1 Original line number Original line Diff line number Diff line Loading @@ -17,6 +17,7 @@ #pragma once #pragma once #include <android-base/thread_annotations.h> #include <android-base/thread_annotations.h> #include <utils/Looper.h> #include <wakeup_client.grpc.pb.h> #include <wakeup_client.grpc.pb.h> #include <condition_variable> #include <condition_variable> #include <mutex> #include <mutex> Loading @@ -41,20 +42,60 @@ class FakeTaskGenerator final { constexpr static uint8_t DATA[] = {0xde, 0xad, 0xbe, 0xef}; constexpr static uint8_t DATA[] = {0xde, 0xad, 0xbe, 0xef}; }; }; struct TaskInfo { // This is unique per-task. Note that a task might be popped and put back into the task queue, // it will have a new task ID but the same clientId in the task data. int taskId; long timestampInMs; GetRemoteTasksResponse taskData; }; struct TaskInfoComparator { // We want the smallest timestamp and smallest task ID on top. bool operator()(const TaskInfo& l, const TaskInfo& r) { return l.timestampInMs > r.timestampInMs || (l.timestampInMs == r.timestampInMs && l.taskId > r.taskId); } }; // forward-declaration. class TaskQueue; class TaskTimeoutMessageHandler final : public android::MessageHandler { public: TaskTimeoutMessageHandler(TaskQueue* taskQueue); void handleMessage(const android::Message& message) override; private: TaskQueue* mTaskQueue; }; // TaskQueue is thread-safe. // TaskQueue is thread-safe. class TaskQueue final { class TaskQueue final { public: public: TaskQueue(); ~TaskQueue(); void add(const GetRemoteTasksResponse& response); void add(const GetRemoteTasksResponse& response); std::optional<GetRemoteTasksResponse> maybePopOne(); std::optional<GetRemoteTasksResponse> maybePopOne(); void waitForTask(); void waitForTask(); void stopWait(); void stopWait(); void handleTaskTimeout(); private: private: std::thread mCheckTaskTimeoutThread; std::mutex mLock; std::mutex mLock; std::queue<GetRemoteTasksResponse> mTasks GUARDED_BY(mLock); std::priority_queue<TaskInfo, std::vector<TaskInfo>, TaskInfoComparator> mTasks GUARDED_BY(mLock); // A variable to notify mTasks is not empty. // A variable to notify mTasks is not empty. std::condition_variable mTasksNotEmptyCv; std::condition_variable mTasksNotEmptyCv; bool mStopped GUARDED_BY(mLock); bool mStopped GUARDED_BY(mLock); android::sp<Looper> mLooper; android::sp<TaskTimeoutMessageHandler> mTaskTimeoutMessageHandler; std::atomic<int> mTaskIdCounter = 0; void checkForTestTimeoutLoop(); void waitForTaskWithLock(std::unique_lock<std::mutex>& lock); }; }; class TestWakeupClientServiceImpl final : public WakeupClient::Service { class TestWakeupClientServiceImpl final : public WakeupClient::Service { Loading
automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp +91 −8 Original line number Original line Diff line number Diff line Loading @@ -18,6 +18,8 @@ #include <android-base/stringprintf.h> #include <android-base/stringprintf.h> #include <utils/Log.h> #include <utils/Log.h> #include <utils/Looper.h> #include <utils/SystemClock.h> #include <chrono> #include <chrono> #include <thread> #include <thread> Loading @@ -28,13 +30,15 @@ namespace remoteaccess { namespace { namespace { using ::android::uptimeMillis; using ::android::base::ScopedLockAssertion; using ::android::base::ScopedLockAssertion; using ::android::base::StringPrintf; using ::android::base::StringPrintf; using ::grpc::ServerContext; using ::grpc::ServerContext; using ::grpc::ServerWriter; using ::grpc::ServerWriter; using ::grpc::Status; using ::grpc::Status; constexpr int kTaskIntervalInSec = 5; constexpr int kTaskIntervalInMs = 5'000; constexpr int KTaskTimeoutInMs = 20'000; } // namespace } // namespace Loading @@ -47,24 +51,68 @@ GetRemoteTasksResponse FakeTaskGenerator::generateTask() { return response; return response; } } TaskTimeoutMessageHandler::TaskTimeoutMessageHandler(TaskQueue* taskQueue) : mTaskQueue(taskQueue) {} void TaskTimeoutMessageHandler::handleMessage(const android::Message& message) { mTaskQueue->handleTaskTimeout(); } TaskQueue::TaskQueue() { mTaskTimeoutMessageHandler = android::sp<TaskTimeoutMessageHandler>::make(this); mLooper = Looper::prepare(/*opts=*/0); mCheckTaskTimeoutThread = std::thread([this] { checkForTestTimeoutLoop(); }); } TaskQueue::~TaskQueue() { { std::lock_guard<std::mutex> lockGuard(mLock); mStopped = true; } while (true) { // Remove all pending timeout handlers from queue. if (!maybePopOne().has_value()) { break; } } if (mCheckTaskTimeoutThread.joinable()) { mCheckTaskTimeoutThread.join(); } } std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() { std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() { std::lock_guard<std::mutex> lockGuard(mLock); std::lock_guard<std::mutex> lockGuard(mLock); if (mTasks.size() == 0) { if (mTasks.size() == 0) { return std::nullopt; return std::nullopt; } } GetRemoteTasksResponse response = mTasks.front(); TaskInfo response = std::move(mTasks.top()); mTasks.pop(); mTasks.pop(); return std::move(response); mLooper->removeMessages(mTaskTimeoutMessageHandler, response.taskId); return std::move(response.taskData); } } void TaskQueue::add(const GetRemoteTasksResponse& task) { void TaskQueue::add(const GetRemoteTasksResponse& task) { // TODO (b/246841306): add timeout to tasks. std::lock_guard<std::mutex> lockGuard(mLock); std::lock_guard<std::mutex> lockGuard(mLock); mTasks.push(task); if (mStopped) { return; } int taskId = mTaskIdCounter++; mTasks.push(TaskInfo{ .taskId = taskId, .timestampInMs = uptimeMillis(), .taskData = task, }); android::Message message(taskId); mLooper->sendMessageDelayed(KTaskTimeoutInMs * 1000, mTaskTimeoutMessageHandler, message); mTasksNotEmptyCv.notify_all(); mTasksNotEmptyCv.notify_all(); } } void TaskQueue::waitForTask() { void TaskQueue::waitForTask() { std::unique_lock<std::mutex> lock(mLock); std::unique_lock<std::mutex> lock(mLock); waitForTaskWithLock(lock); } void TaskQueue::waitForTaskWithLock(std::unique_lock<std::mutex>& lock) { mTasksNotEmptyCv.wait(lock, [this] { mTasksNotEmptyCv.wait(lock, [this] { ScopedLockAssertion lockAssertion(mLock); ScopedLockAssertion lockAssertion(mLock); return mTasks.size() > 0 || mStopped; return mTasks.size() > 0 || mStopped; Loading @@ -77,6 +125,41 @@ void TaskQueue::stopWait() { mTasksNotEmptyCv.notify_all(); mTasksNotEmptyCv.notify_all(); } } void TaskQueue::checkForTestTimeoutLoop() { Looper::setForThread(mLooper); while (true) { { std::unique_lock<std::mutex> lock(mLock); if (mStopped) { ALOGW("The TestWakeupClientServiceImpl is stopping, " "exiting checkForTestTimeoutLoop"); return; } } mLooper->pollAll(/*timeoutMillis=*/-1); } } void TaskQueue::handleTaskTimeout() { // We know which task timed-out from the taskId in the message. However, there is no easy way // to remove a specific task with the task ID from the priority_queue, so we just check from // the top of the queue (which have the oldest tasks). std::lock_guard<std::mutex> lockGuard(mLock); long now = uptimeMillis(); while (mTasks.size() > 0) { const TaskInfo& taskInfo = mTasks.top(); if (taskInfo.timestampInMs + KTaskTimeoutInMs > now) { break; } // In real implementation, this should report task failure to remote wakeup server. ALOGW("Task for client ID: %s timed-out, added at %ld ms, now %ld ms", taskInfo.taskData.clientid().c_str(), taskInfo.timestampInMs, now); mTasks.pop(); } } TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() { TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() { mThread = std::thread([this] { fakeTaskGenerateLoop(); }); mThread = std::thread([this] { fakeTaskGenerateLoop(); }); } } Loading @@ -95,13 +178,13 @@ TestWakeupClientServiceImpl::~TestWakeupClientServiceImpl() { void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() { void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() { // In actual implementation, this should communicate with the remote server and receives tasks // In actual implementation, this should communicate with the remote server and receives tasks // from it. Here we simulate receiving one remote task every {kTaskIntervalInSec}s. // from it. Here we simulate receiving one remote task every {kTaskIntervalInMs}ms. while (true) { while (true) { mTaskQueue.add(mFakeTaskGenerator.generateTask()); mTaskQueue.add(mFakeTaskGenerator.generateTask()); ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInSec); ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInMs); std::unique_lock lk(mLock); std::unique_lock lk(mLock); if (mServerStoppedCv.wait_for(lk, std::chrono::seconds(kTaskIntervalInSec), [this] { if (mServerStoppedCv.wait_for(lk, std::chrono::milliseconds(kTaskIntervalInMs), [this] { ScopedLockAssertion lockAssertion(mLock); ScopedLockAssertion lockAssertion(mLock); return mServerStopped; return mServerStopped; })) { })) { Loading