Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d6ca7dfd authored by Yu Shan's avatar Yu Shan Committed by Android (Google) Code Review
Browse files

Merge "Reland "Add timeout logic to TestWakeupClientServiceImpl.""

parents c841770a 9ed57888
Loading
Loading
Loading
Loading
+42 −1
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
#pragma once

#include <android-base/thread_annotations.h>
#include <utils/Looper.h>
#include <wakeup_client.grpc.pb.h>
#include <condition_variable>
#include <mutex>
@@ -41,20 +42,60 @@ class FakeTaskGenerator final {
    constexpr static uint8_t DATA[] = {0xde, 0xad, 0xbe, 0xef};
};

struct TaskInfo {
    // This is unique per-task. Note that a task might be popped and put back into the task queue,
    // it will have a new task ID but the same clientId in the task data.
    int taskId;
    int64_t timestampInMs;
    GetRemoteTasksResponse taskData;
};

struct TaskInfoComparator {
    // We want the smallest timestamp and smallest task ID on top.
    bool operator()(const TaskInfo& l, const TaskInfo& r) {
        return l.timestampInMs > r.timestampInMs ||
               (l.timestampInMs == r.timestampInMs && l.taskId > r.taskId);
    }
};

// forward-declaration.
class TaskQueue;

class TaskTimeoutMessageHandler final : public android::MessageHandler {
  public:
    TaskTimeoutMessageHandler(TaskQueue* taskQueue);
    void handleMessage(const android::Message& message) override;

  private:
    TaskQueue* mTaskQueue;
};

// TaskQueue is thread-safe.
class TaskQueue final {
  public:
    TaskQueue();
    ~TaskQueue();

    void add(const GetRemoteTasksResponse& response);
    std::optional<GetRemoteTasksResponse> maybePopOne();
    void waitForTask();
    void stopWait();
    void handleTaskTimeout();

  private:
    std::thread mCheckTaskTimeoutThread;
    std::mutex mLock;
    std::queue<GetRemoteTasksResponse> mTasks GUARDED_BY(mLock);
    std::priority_queue<TaskInfo, std::vector<TaskInfo>, TaskInfoComparator> mTasks
            GUARDED_BY(mLock);
    // A variable to notify mTasks is not empty.
    std::condition_variable mTasksNotEmptyCv;
    bool mStopped GUARDED_BY(mLock);
    android::sp<Looper> mLooper;
    android::sp<TaskTimeoutMessageHandler> mTaskTimeoutMessageHandler;
    std::atomic<int> mTaskIdCounter = 0;

    void checkForTestTimeoutLoop();
    void waitForTaskWithLock(std::unique_lock<std::mutex>& lock);
};

class TestWakeupClientServiceImpl final : public WakeupClient::Service {
+92 −8
Original line number Diff line number Diff line
@@ -17,7 +17,10 @@
#include "TestWakeupClientServiceImpl.h"

#include <android-base/stringprintf.h>
#include <inttypes.h>
#include <utils/Log.h>
#include <utils/Looper.h>
#include <utils/SystemClock.h>
#include <chrono>
#include <thread>

@@ -28,13 +31,15 @@ namespace remoteaccess {

namespace {

using ::android::uptimeMillis;
using ::android::base::ScopedLockAssertion;
using ::android::base::StringPrintf;
using ::grpc::ServerContext;
using ::grpc::ServerWriter;
using ::grpc::Status;

constexpr int kTaskIntervalInSec = 5;
constexpr int kTaskIntervalInMs = 5'000;
constexpr int64_t KTaskTimeoutInMs = 20'000;

}  // namespace

@@ -47,24 +52,68 @@ GetRemoteTasksResponse FakeTaskGenerator::generateTask() {
    return response;
}

TaskTimeoutMessageHandler::TaskTimeoutMessageHandler(TaskQueue* taskQueue)
    : mTaskQueue(taskQueue) {}

void TaskTimeoutMessageHandler::handleMessage(const android::Message& message) {
    mTaskQueue->handleTaskTimeout();
}

TaskQueue::TaskQueue() {
    mTaskTimeoutMessageHandler = android::sp<TaskTimeoutMessageHandler>::make(this);
    mLooper = Looper::prepare(/*opts=*/0);
    mCheckTaskTimeoutThread = std::thread([this] { checkForTestTimeoutLoop(); });
}

TaskQueue::~TaskQueue() {
    {
        std::lock_guard<std::mutex> lockGuard(mLock);
        mStopped = true;
    }
    while (true) {
        // Remove all pending timeout handlers from queue.
        if (!maybePopOne().has_value()) {
            break;
        }
    }
    if (mCheckTaskTimeoutThread.joinable()) {
        mCheckTaskTimeoutThread.join();
    }
}

std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() {
    std::lock_guard<std::mutex> lockGuard(mLock);
    if (mTasks.size() == 0) {
        return std::nullopt;
    }
    GetRemoteTasksResponse response = mTasks.front();
    TaskInfo response = std::move(mTasks.top());
    mTasks.pop();
    return std::move(response);
    mLooper->removeMessages(mTaskTimeoutMessageHandler, response.taskId);
    return std::move(response.taskData);
}

void TaskQueue::add(const GetRemoteTasksResponse& task) {
    // TODO (b/246841306): add timeout to tasks.
    std::lock_guard<std::mutex> lockGuard(mLock);
    mTasks.push(task);
    if (mStopped) {
        return;
    }
    int taskId = mTaskIdCounter++;
    mTasks.push(TaskInfo{
            .taskId = taskId,
            .timestampInMs = uptimeMillis(),
            .taskData = task,
    });
    android::Message message(taskId);
    mLooper->sendMessageDelayed(KTaskTimeoutInMs * 1000, mTaskTimeoutMessageHandler, message);
    mTasksNotEmptyCv.notify_all();
}

void TaskQueue::waitForTask() {
    std::unique_lock<std::mutex> lock(mLock);
    waitForTaskWithLock(lock);
}

void TaskQueue::waitForTaskWithLock(std::unique_lock<std::mutex>& lock) {
    mTasksNotEmptyCv.wait(lock, [this] {
        ScopedLockAssertion lockAssertion(mLock);
        return mTasks.size() > 0 || mStopped;
@@ -77,6 +126,41 @@ void TaskQueue::stopWait() {
    mTasksNotEmptyCv.notify_all();
}

void TaskQueue::checkForTestTimeoutLoop() {
    Looper::setForThread(mLooper);

    while (true) {
        {
            std::unique_lock<std::mutex> lock(mLock);
            if (mStopped) {
                ALOGW("The TestWakeupClientServiceImpl is stopping, "
                      "exiting checkForTestTimeoutLoop");
                return;
            }
        }

        mLooper->pollAll(/*timeoutMillis=*/-1);
    }
}

void TaskQueue::handleTaskTimeout() {
    // We know which task timed-out from the taskId in the message. However, there is no easy way
    // to remove a specific task with the task ID from the priority_queue, so we just check from
    // the top of the queue (which have the oldest tasks).
    std::lock_guard<std::mutex> lockGuard(mLock);
    int64_t now = uptimeMillis();
    while (mTasks.size() > 0) {
        const TaskInfo& taskInfo = mTasks.top();
        if (taskInfo.timestampInMs + KTaskTimeoutInMs > now) {
            break;
        }
        // In real implementation, this should report task failure to remote wakeup server.
        ALOGW("Task for client ID: %s timed-out, added at %" PRId64 " ms, now %" PRId64 " ms",
              taskInfo.taskData.clientid().c_str(), taskInfo.timestampInMs, now);
        mTasks.pop();
    }
}

TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() {
    mThread = std::thread([this] { fakeTaskGenerateLoop(); });
}
@@ -95,13 +179,13 @@ TestWakeupClientServiceImpl::~TestWakeupClientServiceImpl() {

void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() {
    // In actual implementation, this should communicate with the remote server and receives tasks
    // from it. Here we simulate receiving one remote task every {kTaskIntervalInSec}s.
    // from it. Here we simulate receiving one remote task every {kTaskIntervalInMs}ms.
    while (true) {
        mTaskQueue.add(mFakeTaskGenerator.generateTask());
        ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInSec);
        ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInMs);

        std::unique_lock lk(mLock);
        if (mServerStoppedCv.wait_for(lk, std::chrono::seconds(kTaskIntervalInSec), [this] {
        if (mServerStoppedCv.wait_for(lk, std::chrono::milliseconds(kTaskIntervalInMs), [this] {
                ScopedLockAssertion lockAssertion(mLock);
                return mServerStopped;
            })) {