Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b138c42a authored by Yu Shan's avatar Yu Shan
Browse files

Implement receiving remote task.

Test: atest RemoteAccessServiceUnitTest
Bug: 241483300
Change-Id: I3d7f54f154beebba1bb1bdb7640dfe973ad012e4
parent e2974560
Loading
Loading
Loading
Loading
+24 −4
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
#include <aidl/android/hardware/automotive/remoteaccess/ApState.h>
#include <aidl/android/hardware/automotive/remoteaccess/BnRemoteAccess.h>
#include <aidl/android/hardware/automotive/remoteaccess/IRemoteTaskCallback.h>
#include <android-base/thread_annotations.h>
#include <wakeup_client.grpc.pb.h>

#include <string>
@@ -32,7 +33,9 @@ namespace remoteaccess {
class RemoteAccessService
    : public aidl::android::hardware::automotive::remoteaccess::BnRemoteAccess {
  public:
    RemoteAccessService(WakeupClient::StubInterface* grpcStub);
    explicit RemoteAccessService(WakeupClient::StubInterface* grpcStub);

    ~RemoteAccessService();

    ndk::ScopedAStatus getDeviceId(std::string* deviceId) override;

@@ -49,12 +52,29 @@ class RemoteAccessService
            const aidl::android::hardware::automotive::remoteaccess::ApState& newState) override;

  private:
    // For testing.
    friend class RemoteAccessServiceUnitTest;

    WakeupClient::StubInterface* mGrpcStub;
    std::shared_ptr<aidl::android::hardware::automotive::remoteaccess::IRemoteTaskCallback>
            mRemoteTaskCallback;
    std::thread mThread;
    std::mutex mLock;
    std::condition_variable mCv;
    std::shared_ptr<aidl::android::hardware::automotive::remoteaccess::IRemoteTaskCallback>
            mRemoteTaskCallback GUARDED_BY(mLock);
    std::unique_ptr<grpc::ClientContext> mGetRemoteTasksContext GUARDED_BY(mLock);
    // Associated with mCv to notify the task loop to stop waiting and exit.
    bool mTaskWaitStopped GUARDED_BY(mLock);
    // A mutex to make sure startTaskLoop does not overlap with stopTaskLoop.
    std::mutex mStartStopTaskLoopLock;
    bool mTaskLoopRunning GUARDED_BY(mStartStopTaskLoopLock);
    // Default wait time before retry connecting to remote access client is 10s.
    size_t mRetryWaitInMs = 10'000;

    void runTaskLoop();
    void maybeStartTaskLoop();
    void maybeStopTaskLoop();

    void taskLoop();
    void setRetryWaitInMs(size_t retryWaitInMs) { mRetryWaitInMs = retryWaitInMs; }
};

}  // namespace remoteaccess
+110 −6
Original line number Diff line number Diff line
@@ -31,23 +31,111 @@ namespace {

using ::aidl::android::hardware::automotive::remoteaccess::ApState;
using ::aidl::android::hardware::automotive::remoteaccess::IRemoteTaskCallback;
using ::android::base::ScopedLockAssertion;
using ::grpc::ClientContext;
using ::grpc::ClientReader;
using ::grpc::ClientReaderInterface;
using ::grpc::Status;
using ::grpc::StatusCode;
using ::ndk::ScopedAStatus;

const std::string WAKEUP_SERVICE_NAME = "com.google.vehicle.wakeup";

std::vector<uint8_t> stringToBytes(const std::string& s) {
    const char* data = s.data();
    return std::vector<uint8_t>(data, data + s.size());
}

ScopedAStatus rpcStatusToScopedAStatus(const Status& status, const std::string& errorMsg) {
    return ScopedAStatus::fromServiceSpecificErrorWithMessage(
            status.error_code(), (errorMsg + ", error: " + status.error_message()).c_str());
}

}  // namespace

RemoteAccessService::RemoteAccessService(WakeupClient::StubInterface* grpcStub)
    : mGrpcStub(grpcStub) {
    // mThread = std::thread([this]() { taskLoop(); });
    : mGrpcStub(grpcStub){};

RemoteAccessService::~RemoteAccessService() {
    maybeStopTaskLoop();
}

void RemoteAccessService::maybeStartTaskLoop() {
    std::lock_guard<std::mutex> lockGuard(mStartStopTaskLoopLock);
    if (mTaskLoopRunning) {
        return;
    }

    mThread = std::thread([this]() { runTaskLoop(); });

    mTaskLoopRunning = true;
}

void RemoteAccessService::maybeStopTaskLoop() {
    std::lock_guard<std::mutex> lockGuard(mStartStopTaskLoopLock);
    if (!mTaskLoopRunning) {
        return;
    }

    {
        std::lock_guard<std::mutex> lockGuard(mLock);
        // Try to stop the reading stream.
        if (mGetRemoteTasksContext) {
            mGetRemoteTasksContext->TryCancel();
            mGetRemoteTasksContext.reset();
        }
        mTaskWaitStopped = true;
        mCv.notify_all();
    }
    if (mThread.joinable()) {
        mThread.join();
    }

    mTaskLoopRunning = false;
}

void RemoteAccessService::taskLoop() {
    // TODO(b/241483300): handle remote tasks.
void RemoteAccessService::runTaskLoop() {
    GetRemoteTasksRequest request = {};
    std::unique_ptr<ClientReaderInterface<GetRemoteTasksResponse>> reader;
    while (true) {
        {
            std::lock_guard<std::mutex> lockGuard(mLock);
            mGetRemoteTasksContext.reset(new ClientContext());
            reader = mGrpcStub->GetRemoteTasks(mGetRemoteTasksContext.get(), request);
        }
        GetRemoteTasksResponse response;
        while (reader->Read(&response)) {
            std::shared_ptr<IRemoteTaskCallback> callback;
            {
                std::lock_guard<std::mutex> lockGuard(mLock);
                callback = mRemoteTaskCallback;
            }
            if (callback == nullptr) {
                continue;
            }
            ScopedAStatus callbackStatus = callback->onRemoteTaskRequested(
                    response.clientid(), stringToBytes(response.data()));
            if (!callbackStatus.isOk()) {
                ALOGE("Failed to call onRemoteTaskRequested callback, status: %d, message: %s",
                      callbackStatus.getStatus(), callbackStatus.getMessage());
            }
        }
        Status status = reader->Finish();

        ALOGE("GetRemoteTasks stream breaks, code: %d, message: %s, sleeping for 10s and retry",
              status.error_code(), status.error_message().c_str());
        // The long lasting connection should not return. But if the server returns, retry after
        // 10s.
        {
            std::unique_lock lk(mLock);
            if (mCv.wait_for(lk, std::chrono::milliseconds(mRetryWaitInMs), [this] {
                    ScopedLockAssertion lockAssertion(mLock);
                    return mTaskWaitStopped;
                })) {
                // If the stopped flag is set, we are quitting, exit the loop.
                break;
            }
        }
    }
}

ScopedAStatus RemoteAccessService::getDeviceId(std::string* deviceId) {
@@ -62,16 +150,32 @@ ScopedAStatus RemoteAccessService::getWakeupServiceName(std::string* wakeupServi

ScopedAStatus RemoteAccessService::setRemoteTaskCallback(
        [[maybe_unused]] const std::shared_ptr<IRemoteTaskCallback>& callback) {
    std::lock_guard<std::mutex> lockGuard(mLock);
    mRemoteTaskCallback = callback;
    return ScopedAStatus::ok();
}

ScopedAStatus RemoteAccessService::clearRemoteTaskCallback() {
    std::lock_guard<std::mutex> lockGuard(mLock);
    mRemoteTaskCallback.reset();
    return ScopedAStatus::ok();
}

ScopedAStatus RemoteAccessService::notifyApStateChange([[maybe_unused]] const ApState& newState) {
ScopedAStatus RemoteAccessService::notifyApStateChange(const ApState& newState) {
    ClientContext context;
    NotifyWakeupRequiredRequest request = {};
    request.set_iswakeuprequired(newState.isWakeupRequired);
    NotifyWakeupRequiredResponse response = {};
    Status status = mGrpcStub->NotifyWakeupRequired(&context, request, &response);
    if (!status.ok()) {
        return rpcStatusToScopedAStatus(status, "Failed to notify isWakeupRequired");
    }

    if (newState.isReadyForRemoteTask) {
        maybeStartTaskLoop();
    } else {
        maybeStopTaskLoop();
    }
    return ScopedAStatus::ok();
}

+200 −2
Original line number Diff line number Diff line
@@ -16,15 +16,25 @@

#include "RemoteAccessService.h"

#include <aidl/android/hardware/automotive/remoteaccess/ApState.h>
#include <aidl/android/hardware/automotive/remoteaccess/BnRemoteTaskCallback.h>
#include <gmock/gmock.h>
#include <grpcpp/test/mock_stream.h>
#include <gtest/gtest.h>
#include <wakeup_client.grpc.pb.h>
#include <chrono>
#include <thread>

namespace android {
namespace hardware {
namespace automotive {
namespace remoteaccess {

using ::android::base::ScopedLockAssertion;

using ::aidl::android::hardware::automotive::remoteaccess::ApState;
using ::aidl::android::hardware::automotive::remoteaccess::BnRemoteTaskCallback;

using ::grpc::ClientAsyncReaderInterface;
using ::grpc::ClientAsyncResponseReaderInterface;
using ::grpc::ClientContext;
@@ -32,8 +42,12 @@ using ::grpc::ClientReader;
using ::grpc::ClientReaderInterface;
using ::grpc::CompletionQueue;
using ::grpc::Status;

using ::grpc::testing::MockClientReader;
using ::ndk::ScopedAStatus;
using ::testing::_;
using ::testing::DoAll;
using ::testing::Return;
using ::testing::SetArgPointee;

class MockGrpcClientStub : public WakeupClient::StubInterface {
  public:
@@ -59,9 +73,37 @@ class MockGrpcClientStub : public WakeupClient::StubInterface {
                 CompletionQueue* cq));
};

class FakeRemoteTaskCallback : public BnRemoteTaskCallback {
  public:
    ScopedAStatus onRemoteTaskRequested(const std::string& clientId,
                                        const std::vector<uint8_t>& data) override {
        std::lock_guard<std::mutex> lockGuard(mLock);
        mDataByClientId[clientId] = data;
        mTaskCount++;
        mCv.notify_all();
        return ScopedAStatus::ok();
    }

    std::vector<uint8_t> getData(const std::string& clientId) { return mDataByClientId[clientId]; }

    bool wait(size_t taskCount, size_t timeoutInSec) {
        std::unique_lock<std::mutex> lock(mLock);
        return mCv.wait_for(lock, std::chrono::seconds(timeoutInSec), [taskCount, this] {
            ScopedLockAssertion lockAssertion(mLock);
            return mTaskCount >= taskCount;
        });
    }

  private:
    std::mutex mLock;
    std::unordered_map<std::string, std::vector<uint8_t>> mDataByClientId GUARDED_BY(mLock);
    size_t mTaskCount GUARDED_BY(mLock) = 0;
    std::condition_variable mCv;
};

class RemoteAccessServiceUnitTest : public ::testing::Test {
  public:
    RemoteAccessServiceUnitTest() {
    virtual void SetUp() override {
        mGrpcWakeupClientStub = std::make_unique<MockGrpcClientStub>();
        mService = ndk::SharedRefBase::make<RemoteAccessService>(mGrpcWakeupClientStub.get());
    }
@@ -70,9 +112,12 @@ class RemoteAccessServiceUnitTest : public ::testing::Test {

    RemoteAccessService* getService() { return mService.get(); }

    void setRetryWaitInMs(size_t retryWaitInMs) { mService->setRetryWaitInMs(retryWaitInMs); }

  private:
    std::unique_ptr<MockGrpcClientStub> mGrpcWakeupClientStub;
    std::shared_ptr<RemoteAccessService> mService;
    MockClientReader<GetRemoteTasksResponse>* mMockTaskReader;
};

TEST_F(RemoteAccessServiceUnitTest, TestGetWakeupServiceName) {
@@ -84,6 +129,159 @@ TEST_F(RemoteAccessServiceUnitTest, TestGetWakeupServiceName) {
    EXPECT_EQ(serviceName, "com.google.vehicle.wakeup");
}

TEST_F(RemoteAccessServiceUnitTest, TestNotifyApStateChangeWakeupRequired) {
    bool isWakeupRequired = false;
    EXPECT_CALL(*getGrpcWakeupClientStub(), NotifyWakeupRequired)
            .WillOnce([&isWakeupRequired]([[maybe_unused]] ClientContext* context,
                                          const NotifyWakeupRequiredRequest& request,
                                          [[maybe_unused]] NotifyWakeupRequiredResponse* response) {
                isWakeupRequired = request.iswakeuprequired();
                return Status();
            });

    ApState newState = {
            .isWakeupRequired = true,
    };
    ScopedAStatus status = getService()->notifyApStateChange(newState);

    EXPECT_TRUE(status.isOk());
    EXPECT_TRUE(isWakeupRequired);
}

TEST_F(RemoteAccessServiceUnitTest, TestGetRemoteTasks) {
    GetRemoteTasksResponse response1;
    std::vector<uint8_t> testData = {0xde, 0xad, 0xbe, 0xef};
    response1.set_clientid("1");
    response1.set_data(testData.data(), testData.size());
    GetRemoteTasksResponse response2;
    response2.set_clientid("2");
    std::shared_ptr<FakeRemoteTaskCallback> callback =
            ndk::SharedRefBase::make<FakeRemoteTaskCallback>();

    ON_CALL(*getGrpcWakeupClientStub(), GetRemoteTasksRaw)
            .WillByDefault(
                    [response1, response2]([[maybe_unused]] ClientContext* context,
                                           [[maybe_unused]] const GetRemoteTasksRequest& request) {
                        // mockReader ownership will be transferred to the client so we don't own it
                        // here.
                        MockClientReader<GetRemoteTasksResponse>* mockClientReader =
                                new MockClientReader<GetRemoteTasksResponse>();
                        EXPECT_CALL(*mockClientReader, Finish()).WillOnce(Return(Status::OK));
                        EXPECT_CALL(*mockClientReader, Read(_))
                                .WillOnce(DoAll(SetArgPointee<0>(response1), Return(true)))
                                .WillOnce(DoAll(SetArgPointee<0>(response2), Return(true)))
                                .WillRepeatedly(Return(false));
                        return mockClientReader;
                    });

    getService()->setRemoteTaskCallback(callback);
    // Start the long live connection to receive tasks.
    ApState newState = {
            .isReadyForRemoteTask = true,
    };
    ASSERT_TRUE(getService()->notifyApStateChange(newState).isOk());

    ASSERT_TRUE(callback->wait(/*taskCount=*/2, /*timeoutInSec=*/10))
            << "Did not receive enough tasks";
    EXPECT_EQ(callback->getData("1"), testData);
    EXPECT_EQ(callback->getData("2"), std::vector<uint8_t>());
}

TEST_F(RemoteAccessServiceUnitTest, TestGetRemoteTasksRetryConnection) {
    GetRemoteTasksResponse response;
    std::shared_ptr<FakeRemoteTaskCallback> callback =
            ndk::SharedRefBase::make<FakeRemoteTaskCallback>();

    ON_CALL(*getGrpcWakeupClientStub(), GetRemoteTasksRaw)
            .WillByDefault([response]([[maybe_unused]] ClientContext* context,
                                      [[maybe_unused]] const GetRemoteTasksRequest& request) {
                // mockReader ownership will be transferred to the client so we don't own it here.
                MockClientReader<GetRemoteTasksResponse>* mockClientReader =
                        new MockClientReader<GetRemoteTasksResponse>();
                EXPECT_CALL(*mockClientReader, Finish()).WillOnce(Return(Status::OK));
                // Connection fails after receiving one task. Should retry after some time.
                EXPECT_CALL(*mockClientReader, Read(_))
                        .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
                        .WillRepeatedly(Return(false));
                return mockClientReader;
            });

    getService()->setRemoteTaskCallback(callback);
    setRetryWaitInMs(100);
    // Start the long live connection to receive tasks.
    ApState newState = {
            .isReadyForRemoteTask = true,
    };
    ASSERT_TRUE(getService()->notifyApStateChange(newState).isOk());

    ASSERT_TRUE(callback->wait(/*taskCount=*/2, /*timeoutInSec=*/10))
            << "Did not receive enough tasks";
}

TEST_F(RemoteAccessServiceUnitTest, TestGetRemoteTasksDefaultNotReady) {
    GetRemoteTasksResponse response1;
    std::vector<uint8_t> testData = {0xde, 0xad, 0xbe, 0xef};
    response1.set_clientid("1");
    response1.set_data(testData.data(), testData.size());
    GetRemoteTasksResponse response2;
    response2.set_clientid("2");
    std::shared_ptr<FakeRemoteTaskCallback> callback =
            ndk::SharedRefBase::make<FakeRemoteTaskCallback>();

    EXPECT_CALL(*getGrpcWakeupClientStub(), GetRemoteTasksRaw).Times(0);

    // Default state is not ready for remote tasks, so no callback will be called.
    getService()->setRemoteTaskCallback(callback);

    std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

TEST_F(RemoteAccessServiceUnitTest, TestGetRemoteTasksNotReadyAfterReady) {
    GetRemoteTasksResponse response1;
    std::vector<uint8_t> testData = {0xde, 0xad, 0xbe, 0xef};
    response1.set_clientid("1");
    response1.set_data(testData.data(), testData.size());
    GetRemoteTasksResponse response2;
    response2.set_clientid("2");
    std::shared_ptr<FakeRemoteTaskCallback> callback =
            ndk::SharedRefBase::make<FakeRemoteTaskCallback>();

    ON_CALL(*getGrpcWakeupClientStub(), GetRemoteTasksRaw)
            .WillByDefault(
                    [response1, response2]([[maybe_unused]] ClientContext* context,
                                           [[maybe_unused]] const GetRemoteTasksRequest& request) {
                        // mockReader ownership will be transferred to the client so we don't own it
                        // here.
                        MockClientReader<GetRemoteTasksResponse>* mockClientReader =
                                new MockClientReader<GetRemoteTasksResponse>();
                        EXPECT_CALL(*mockClientReader, Finish()).WillOnce(Return(Status::OK));
                        EXPECT_CALL(*mockClientReader, Read(_))
                                .WillOnce(DoAll(SetArgPointee<0>(response1), Return(true)))
                                .WillOnce(DoAll(SetArgPointee<0>(response2), Return(true)))
                                .WillRepeatedly(Return(false));
                        return mockClientReader;
                    });
    // Should only be called once when is is ready for remote task.
    EXPECT_CALL(*getGrpcWakeupClientStub(), GetRemoteTasksRaw).Times(1);

    getService()->setRemoteTaskCallback(callback);
    setRetryWaitInMs(100);
    // Start the long live connection to receive tasks.
    ApState newState = {
            .isReadyForRemoteTask = true,
    };
    ASSERT_TRUE(getService()->notifyApStateChange(newState).isOk());
    ASSERT_TRUE(callback->wait(/*taskCount=*/2, /*timeoutInSec=*/10))
            << "Did not receive enough tasks";

    // Stop the long live connection.
    newState.isReadyForRemoteTask = false;
    ASSERT_TRUE(getService()->notifyApStateChange(newState).isOk());

    // Wait for the retry delay, but the loop should already exit.
    std::this_thread::sleep_for(std::chrono::milliseconds(150));
}

}  // namespace remoteaccess
}  // namespace automotive
}  // namespace hardware