Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0b274d6a authored by Yu Shan's avatar Yu Shan Committed by Automerger Merge Worker
Browse files

Merge "Reland "Add timeout logic to TestWakeupClientServiceImpl."" am: d6ca7dfd

parents f6c365dc d6ca7dfd
Loading
Loading
Loading
Loading
+42 −1
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
#pragma once

#include <android-base/thread_annotations.h>
#include <utils/Looper.h>
#include <wakeup_client.grpc.pb.h>
#include <condition_variable>
#include <mutex>
@@ -41,20 +42,60 @@ class FakeTaskGenerator final {
    constexpr static uint8_t DATA[] = {0xde, 0xad, 0xbe, 0xef};
};

struct TaskInfo {
    // This is unique per-task. Note that a task might be popped and put back into the task queue,
    // it will have a new task ID but the same clientId in the task data.
    int taskId;
    int64_t timestampInMs;
    GetRemoteTasksResponse taskData;
};

struct TaskInfoComparator {
    // We want the smallest timestamp and smallest task ID on top.
    bool operator()(const TaskInfo& l, const TaskInfo& r) {
        return l.timestampInMs > r.timestampInMs ||
               (l.timestampInMs == r.timestampInMs && l.taskId > r.taskId);
    }
};

// forward-declaration.
class TaskQueue;

class TaskTimeoutMessageHandler final : public android::MessageHandler {
  public:
    TaskTimeoutMessageHandler(TaskQueue* taskQueue);
    void handleMessage(const android::Message& message) override;

  private:
    TaskQueue* mTaskQueue;
};

// TaskQueue is thread-safe.
class TaskQueue final {
  public:
    TaskQueue();
    ~TaskQueue();

    void add(const GetRemoteTasksResponse& response);
    std::optional<GetRemoteTasksResponse> maybePopOne();
    void waitForTask();
    void stopWait();
    void handleTaskTimeout();

  private:
    std::thread mCheckTaskTimeoutThread;
    std::mutex mLock;
    std::queue<GetRemoteTasksResponse> mTasks GUARDED_BY(mLock);
    std::priority_queue<TaskInfo, std::vector<TaskInfo>, TaskInfoComparator> mTasks
            GUARDED_BY(mLock);
    // A variable to notify mTasks is not empty.
    std::condition_variable mTasksNotEmptyCv;
    bool mStopped GUARDED_BY(mLock);
    android::sp<Looper> mLooper;
    android::sp<TaskTimeoutMessageHandler> mTaskTimeoutMessageHandler;
    std::atomic<int> mTaskIdCounter = 0;

    void checkForTestTimeoutLoop();
    void waitForTaskWithLock(std::unique_lock<std::mutex>& lock);
};

class TestWakeupClientServiceImpl final : public WakeupClient::Service {
+92 −8
Original line number Diff line number Diff line
@@ -17,7 +17,10 @@
#include "TestWakeupClientServiceImpl.h"

#include <android-base/stringprintf.h>
#include <inttypes.h>
#include <utils/Log.h>
#include <utils/Looper.h>
#include <utils/SystemClock.h>
#include <chrono>
#include <thread>

@@ -28,13 +31,15 @@ namespace remoteaccess {

namespace {

using ::android::uptimeMillis;
using ::android::base::ScopedLockAssertion;
using ::android::base::StringPrintf;
using ::grpc::ServerContext;
using ::grpc::ServerWriter;
using ::grpc::Status;

constexpr int kTaskIntervalInSec = 5;
constexpr int kTaskIntervalInMs = 5'000;
constexpr int64_t KTaskTimeoutInMs = 20'000;

}  // namespace

@@ -47,24 +52,68 @@ GetRemoteTasksResponse FakeTaskGenerator::generateTask() {
    return response;
}

TaskTimeoutMessageHandler::TaskTimeoutMessageHandler(TaskQueue* taskQueue)
    : mTaskQueue(taskQueue) {}

void TaskTimeoutMessageHandler::handleMessage(const android::Message& message) {
    mTaskQueue->handleTaskTimeout();
}

TaskQueue::TaskQueue() {
    mTaskTimeoutMessageHandler = android::sp<TaskTimeoutMessageHandler>::make(this);
    mLooper = Looper::prepare(/*opts=*/0);
    mCheckTaskTimeoutThread = std::thread([this] { checkForTestTimeoutLoop(); });
}

TaskQueue::~TaskQueue() {
    {
        std::lock_guard<std::mutex> lockGuard(mLock);
        mStopped = true;
    }
    while (true) {
        // Remove all pending timeout handlers from queue.
        if (!maybePopOne().has_value()) {
            break;
        }
    }
    if (mCheckTaskTimeoutThread.joinable()) {
        mCheckTaskTimeoutThread.join();
    }
}

std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() {
    std::lock_guard<std::mutex> lockGuard(mLock);
    if (mTasks.size() == 0) {
        return std::nullopt;
    }
    GetRemoteTasksResponse response = mTasks.front();
    TaskInfo response = std::move(mTasks.top());
    mTasks.pop();
    return std::move(response);
    mLooper->removeMessages(mTaskTimeoutMessageHandler, response.taskId);
    return std::move(response.taskData);
}

void TaskQueue::add(const GetRemoteTasksResponse& task) {
    // TODO (b/246841306): add timeout to tasks.
    std::lock_guard<std::mutex> lockGuard(mLock);
    mTasks.push(task);
    if (mStopped) {
        return;
    }
    int taskId = mTaskIdCounter++;
    mTasks.push(TaskInfo{
            .taskId = taskId,
            .timestampInMs = uptimeMillis(),
            .taskData = task,
    });
    android::Message message(taskId);
    mLooper->sendMessageDelayed(KTaskTimeoutInMs * 1000, mTaskTimeoutMessageHandler, message);
    mTasksNotEmptyCv.notify_all();
}

void TaskQueue::waitForTask() {
    std::unique_lock<std::mutex> lock(mLock);
    waitForTaskWithLock(lock);
}

void TaskQueue::waitForTaskWithLock(std::unique_lock<std::mutex>& lock) {
    mTasksNotEmptyCv.wait(lock, [this] {
        ScopedLockAssertion lockAssertion(mLock);
        return mTasks.size() > 0 || mStopped;
@@ -77,6 +126,41 @@ void TaskQueue::stopWait() {
    mTasksNotEmptyCv.notify_all();
}

void TaskQueue::checkForTestTimeoutLoop() {
    Looper::setForThread(mLooper);

    while (true) {
        {
            std::unique_lock<std::mutex> lock(mLock);
            if (mStopped) {
                ALOGW("The TestWakeupClientServiceImpl is stopping, "
                      "exiting checkForTestTimeoutLoop");
                return;
            }
        }

        mLooper->pollAll(/*timeoutMillis=*/-1);
    }
}

void TaskQueue::handleTaskTimeout() {
    // We know which task timed-out from the taskId in the message. However, there is no easy way
    // to remove a specific task with the task ID from the priority_queue, so we just check from
    // the top of the queue (which have the oldest tasks).
    std::lock_guard<std::mutex> lockGuard(mLock);
    int64_t now = uptimeMillis();
    while (mTasks.size() > 0) {
        const TaskInfo& taskInfo = mTasks.top();
        if (taskInfo.timestampInMs + KTaskTimeoutInMs > now) {
            break;
        }
        // In real implementation, this should report task failure to remote wakeup server.
        ALOGW("Task for client ID: %s timed-out, added at %" PRId64 " ms, now %" PRId64 " ms",
              taskInfo.taskData.clientid().c_str(), taskInfo.timestampInMs, now);
        mTasks.pop();
    }
}

TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() {
    mThread = std::thread([this] { fakeTaskGenerateLoop(); });
}
@@ -95,13 +179,13 @@ TestWakeupClientServiceImpl::~TestWakeupClientServiceImpl() {

void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() {
    // In actual implementation, this should communicate with the remote server and receives tasks
    // from it. Here we simulate receiving one remote task every {kTaskIntervalInSec}s.
    // from it. Here we simulate receiving one remote task every {kTaskIntervalInMs}ms.
    while (true) {
        mTaskQueue.add(mFakeTaskGenerator.generateTask());
        ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInSec);
        ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInMs);

        std::unique_lock lk(mLock);
        if (mServerStoppedCv.wait_for(lk, std::chrono::seconds(kTaskIntervalInSec), [this] {
        if (mServerStoppedCv.wait_for(lk, std::chrono::milliseconds(kTaskIntervalInMs), [this] {
                ScopedLockAssertion lockAssertion(mLock);
                return mServerStopped;
            })) {