Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a90f9c4a authored by Yu Shan's avatar Yu Shan
Browse files

Add timeout logic to TestWakeupClientServiceImpl.

Add timeout logic for fake tasks. They will timeout after 20s and
print an error message if not received by the remote access HAL.

Test: Manually run TestWakeupClientServiceImpl and verify the log:
Task for client ID: [ID] timed-out
is printed.
Bug: 246841306

Change-Id: I2173c931da9e0ea40c7b16f9e25a75592fa255c0
parent 22f5a24b
Loading
Loading
Loading
Loading
+42 −1
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
#pragma once

#include <android-base/thread_annotations.h>
#include <utils/Looper.h>
#include <wakeup_client.grpc.pb.h>
#include <condition_variable>
#include <mutex>
@@ -41,20 +42,60 @@ class FakeTaskGenerator final {
    constexpr static uint8_t DATA[] = {0xde, 0xad, 0xbe, 0xef};
};

struct TaskInfo {
    // This is unique per-task. Note that a task might be popped and put back into the task queue,
    // it will have a new task ID but the same clientId in the task data.
    int taskId;
    long timestampInMs;
    GetRemoteTasksResponse taskData;
};

struct TaskInfoComparator {
    // We want the smallest timestamp and smallest task ID on top.
    bool operator()(const TaskInfo& l, const TaskInfo& r) {
        return l.timestampInMs > r.timestampInMs ||
               (l.timestampInMs == r.timestampInMs && l.taskId > r.taskId);
    }
};

// forward-declaration.
class TaskQueue;

class TaskTimeoutMessageHandler final : public android::MessageHandler {
  public:
    TaskTimeoutMessageHandler(TaskQueue* taskQueue);
    void handleMessage(const android::Message& message) override;

  private:
    TaskQueue* mTaskQueue;
};

// TaskQueue is thread-safe.
class TaskQueue final {
  public:
    TaskQueue();
    ~TaskQueue();

    void add(const GetRemoteTasksResponse& response);
    std::optional<GetRemoteTasksResponse> maybePopOne();
    void waitForTask();
    void stopWait();
    void handleTaskTimeout();

  private:
    std::thread mCheckTaskTimeoutThread;
    std::mutex mLock;
    std::queue<GetRemoteTasksResponse> mTasks GUARDED_BY(mLock);
    std::priority_queue<TaskInfo, std::vector<TaskInfo>, TaskInfoComparator> mTasks
            GUARDED_BY(mLock);
    // A variable to notify mTasks is not empty.
    std::condition_variable mTasksNotEmptyCv;
    bool mStopped GUARDED_BY(mLock);
    android::sp<Looper> mLooper;
    android::sp<TaskTimeoutMessageHandler> mTaskTimeoutMessageHandler;
    std::atomic<int> mTaskIdCounter = 0;

    void checkForTestTimeoutLoop();
    void waitForTaskWithLock(std::unique_lock<std::mutex>& lock);
};

class TestWakeupClientServiceImpl final : public WakeupClient::Service {
+91 −8
Original line number Diff line number Diff line
@@ -18,6 +18,8 @@

#include <android-base/stringprintf.h>
#include <utils/Log.h>
#include <utils/Looper.h>
#include <utils/SystemClock.h>
#include <chrono>
#include <thread>

@@ -28,13 +30,15 @@ namespace remoteaccess {

namespace {

using ::android::uptimeMillis;
using ::android::base::ScopedLockAssertion;
using ::android::base::StringPrintf;
using ::grpc::ServerContext;
using ::grpc::ServerWriter;
using ::grpc::Status;

constexpr int kTaskIntervalInSec = 5;
constexpr int kTaskIntervalInMs = 5'000;
constexpr int KTaskTimeoutInMs = 20'000;

}  // namespace

@@ -47,24 +51,68 @@ GetRemoteTasksResponse FakeTaskGenerator::generateTask() {
    return response;
}

TaskTimeoutMessageHandler::TaskTimeoutMessageHandler(TaskQueue* taskQueue)
    : mTaskQueue(taskQueue) {}

void TaskTimeoutMessageHandler::handleMessage(const android::Message& message) {
    mTaskQueue->handleTaskTimeout();
}

TaskQueue::TaskQueue() {
    mTaskTimeoutMessageHandler = android::sp<TaskTimeoutMessageHandler>::make(this);
    mLooper = Looper::prepare(/*opts=*/0);
    mCheckTaskTimeoutThread = std::thread([this] { checkForTestTimeoutLoop(); });
}

TaskQueue::~TaskQueue() {
    {
        std::lock_guard<std::mutex> lockGuard(mLock);
        mStopped = true;
    }
    while (true) {
        // Remove all pending timeout handlers from queue.
        if (!maybePopOne().has_value()) {
            break;
        }
    }
    if (mCheckTaskTimeoutThread.joinable()) {
        mCheckTaskTimeoutThread.join();
    }
}

std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() {
    std::lock_guard<std::mutex> lockGuard(mLock);
    if (mTasks.size() == 0) {
        return std::nullopt;
    }
    GetRemoteTasksResponse response = mTasks.front();
    TaskInfo response = std::move(mTasks.top());
    mTasks.pop();
    return std::move(response);
    mLooper->removeMessages(mTaskTimeoutMessageHandler, response.taskId);
    return std::move(response.taskData);
}

void TaskQueue::add(const GetRemoteTasksResponse& task) {
    // TODO (b/246841306): add timeout to tasks.
    std::lock_guard<std::mutex> lockGuard(mLock);
    mTasks.push(task);
    if (mStopped) {
        return;
    }
    int taskId = mTaskIdCounter++;
    mTasks.push(TaskInfo{
            .taskId = taskId,
            .timestampInMs = uptimeMillis(),
            .taskData = task,
    });
    android::Message message(taskId);
    mLooper->sendMessageDelayed(KTaskTimeoutInMs * 1000, mTaskTimeoutMessageHandler, message);
    mTasksNotEmptyCv.notify_all();
}

void TaskQueue::waitForTask() {
    std::unique_lock<std::mutex> lock(mLock);
    waitForTaskWithLock(lock);
}

void TaskQueue::waitForTaskWithLock(std::unique_lock<std::mutex>& lock) {
    mTasksNotEmptyCv.wait(lock, [this] {
        ScopedLockAssertion lockAssertion(mLock);
        return mTasks.size() > 0 || mStopped;
@@ -77,6 +125,41 @@ void TaskQueue::stopWait() {
    mTasksNotEmptyCv.notify_all();
}

void TaskQueue::checkForTestTimeoutLoop() {
    Looper::setForThread(mLooper);

    while (true) {
        {
            std::unique_lock<std::mutex> lock(mLock);
            if (mStopped) {
                ALOGW("The TestWakeupClientServiceImpl is stopping, "
                      "exiting checkForTestTimeoutLoop");
                return;
            }
        }

        mLooper->pollAll(/*timeoutMillis=*/-1);
    }
}

void TaskQueue::handleTaskTimeout() {
    // We know which task timed-out from the taskId in the message. However, there is no easy way
    // to remove a specific task with the task ID from the priority_queue, so we just check from
    // the top of the queue (which have the oldest tasks).
    std::lock_guard<std::mutex> lockGuard(mLock);
    long now = uptimeMillis();
    while (mTasks.size() > 0) {
        const TaskInfo& taskInfo = mTasks.top();
        if (taskInfo.timestampInMs + KTaskTimeoutInMs > now) {
            break;
        }
        // In real implementation, this should report task failure to remote wakeup server.
        ALOGW("Task for client ID: %s timed-out, added at %ld ms, now %ld ms",
              taskInfo.taskData.clientid().c_str(), taskInfo.timestampInMs, now);
        mTasks.pop();
    }
}

TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() {
    mThread = std::thread([this] { fakeTaskGenerateLoop(); });
}
@@ -95,13 +178,13 @@ TestWakeupClientServiceImpl::~TestWakeupClientServiceImpl() {

void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() {
    // In actual implementation, this should communicate with the remote server and receives tasks
    // from it. Here we simulate receiving one remote task every {kTaskIntervalInSec}s.
    // from it. Here we simulate receiving one remote task every {kTaskIntervalInMs}ms.
    while (true) {
        mTaskQueue.add(mFakeTaskGenerator.generateTask());
        ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInSec);
        ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInMs);

        std::unique_lock lk(mLock);
        if (mServerStoppedCv.wait_for(lk, std::chrono::seconds(kTaskIntervalInSec), [this] {
        if (mServerStoppedCv.wait_for(lk, std::chrono::milliseconds(kTaskIntervalInMs), [this] {
                ScopedLockAssertion lockAssertion(mLock);
                return mServerStopped;
            })) {