Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 931cef83 authored by TreeHugger Robot's avatar TreeHugger Robot Committed by Android (Google) Code Review
Browse files

Merge "Add PendingRequestPool to handle pending requests."

parents c681ac99 54cfc5a1
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -154,7 +154,7 @@ class FakeVehicleHardwareTest : public ::testing::Test {
        return toInt(result.error());
    }

    void onSetValues(const std::vector<SetValueResult> results) {
    void onSetValues(std::vector<SetValueResult> results) {
        for (auto& result : results) {
            mSetValueResults.push_back(result);
        }
@@ -162,7 +162,7 @@ class FakeVehicleHardwareTest : public ::testing::Test {

    const std::vector<SetValueResult>& getSetValueResults() { return mSetValueResults; }

    void onGetValues(const std::vector<GetValueResult> results) {
    void onGetValues(std::vector<GetValueResult> results) {
        for (auto& result : results) {
            mGetValueResults.push_back(result);
        }
@@ -170,7 +170,7 @@ class FakeVehicleHardwareTest : public ::testing::Test {

    const std::vector<GetValueResult>& getGetValueResults() { return mGetValueResults; }

    void onPropertyChangeEvent(const std::vector<VehiclePropValue>& values) {
    void onPropertyChangeEvent(std::vector<VehiclePropValue> values) {
        for (auto& value : values) {
            mChangedProperties.push_back(value);
        }
+1 −0
Original line number Diff line number Diff line
@@ -56,6 +56,7 @@ cc_library {
    srcs: [
        "src/ConnectedClient.cpp",
        "src/DefaultVehicleHal.cpp",
        "src/PendingRequestPool.cpp",
    ],
    static_libs: [
        "VehicleHalUtils",
+5 −0
Original line number Diff line number Diff line
@@ -88,12 +88,17 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve
            GetSetValuesClient<::aidl::android::hardware::automotive::vehicle::SetValueResult,
                               ::aidl::android::hardware::automotive::vehicle::SetValueResults>;

    // The default timeout of get or set value requests is 30s.
    // TODO(b/214605968): define TIMEOUT_IN_NANO in IVehicle and allow getValues/setValues/subscribe
    // to specify custom timeouts.
    static constexpr int64_t TIMEOUT_IN_NANO = 30'000'000'000;
    const std::unique_ptr<IVehicleHardware> mVehicleHardware;

    // mConfigsByPropId and mConfigFile are only modified during initialization, so no need to
    // lock guard them.
    std::unordered_map<int32_t, ::aidl::android::hardware::automotive::vehicle::VehiclePropConfig>
            mConfigsByPropId;
    // Only modified in constructor, so thread-safe.
    std::unique_ptr<::ndk::ScopedFileDescriptor> mConfigFile;

    std::mutex mLock;
+100 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef android_hardware_automotive_vehicle_aidl_impl_vhal_include_PendingRequestPool_H_
#define android_hardware_automotive_vehicle_aidl_impl_vhal_include_PendingRequestPool_H_

#include <android-base/result.h>
#include <android-base/thread_annotations.h>

#include <atomic>
#include <list>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <unordered_set>

namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {

// A thread-safe pending request pool that tracks whether each request has timed-out.
class PendingRequestPool final {
  public:
    using TimeoutCallbackFunc = std::function<void(const std::unordered_set<int64_t>&)>;

    explicit PendingRequestPool(int64_t timeoutInSec);

    ~PendingRequestPool();

    // Adds a list of requests to the request pool.
    // The clientId is the key for all the requests. It could be a number or an address to a data
    // structure that represents a client. The caller must maintain this data structure.
    // All the request IDs must be unique for one client, if any of the requestIds is duplicate with
    // any pending request IDs for the client, this function returns error and no requests would be
    // added. Otherwise, they would be added to the request pool.
    // The callback would be called if requests are not finished within {@code mTimeoutInNano}
    // seconds.
    android::base::Result<void> addRequests(const void* clientId,
                                            const std::unordered_set<int64_t>& requestIds,
                                            std::shared_ptr<TimeoutCallbackFunc> callback);

    // Checks whether the request is currently pending.
    bool isRequestPending(const void* clientId, int64_t requestId) const;

    // Tries to mark the requests as finished and remove them from the pool if the request is
    // currently pending. Returns the list of request that is pending and has been finished
    // successfully. This function would try to finish any valid requestIds even though some of the
    // requestIds are not valid.
    std::unordered_set<int64_t> tryFinishRequests(const void* clientId,
                                                  const std::unordered_set<int64_t>& requestIds);

    // Returns how many pending requests in the pool, for testing purpose.
    size_t countPendingRequests(const void* clientId) const;

  private:
    // The maximum number of pending requests allowed per client. If exceeds this number, adding
    // more requests would fail. This is to prevent spamming from client.
    static constexpr size_t MAX_PENDING_REQUEST_PER_CLIENT = 10000;

    struct PendingRequest {
        std::unordered_set<int64_t> requestIds;
        int64_t timeoutTimestamp;
        std::shared_ptr<TimeoutCallbackFunc> callback;
    };

    int64_t mTimeoutInNano;
    mutable std::mutex mLock;
    std::unordered_map<const void*, std::list<PendingRequest>> mPendingRequestsByClient
            GUARDED_BY(mLock);
    std::thread mThread;
    std::atomic<bool> mThreadStop = false;
    std::condition_variable mCv;
    std::mutex mCvLock;

    bool isRequestPendingLocked(const void* clientId, int64_t requestId) const REQUIRES(mLock);

    // Checks whether the requests in the pool has timed-out, run periodically in a separate thread.
    void checkTimeout();
};

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
}  // namespace android

#endif  // android_hardware_automotive_vehicle_aidl_impl_vhal_include_PendingRequestPool_H_
+220 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "PendingRequestPool.h"

#include <VehicleHalTypes.h>
#include <VehicleUtils.h>

#include <utils/Log.h>
#include <utils/SystemClock.h>

#include <vector>

namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {

namespace {

using ::aidl::android::hardware::automotive::vehicle::StatusCode;
using ::android::base::Error;
using ::android::base::Result;

// At least check every 1s.
constexpr int64_t CHECK_TIME_IN_NANO = 1000000000;

}  // namespace

PendingRequestPool::PendingRequestPool(int64_t timeoutInNano)
    : mTimeoutInNano(timeoutInNano), mThread([this] {
          // [this] must be alive within this thread because destructor would wait for this thread
          // to exit.
          int64_t sleepTime = std::min(mTimeoutInNano, static_cast<int64_t>(CHECK_TIME_IN_NANO));
          std::unique_lock<std::mutex> lk(mCvLock);
          while (!mCv.wait_for(lk, std::chrono::nanoseconds(sleepTime),
                               [this] { return mThreadStop.load(); })) {
              checkTimeout();
          }
      }) {}

PendingRequestPool::~PendingRequestPool() {
    mThreadStop = true;
    mCv.notify_all();
    if (mThread.joinable()) {
        mThread.join();
    }

    // If this pool is being destructed, send out all pending requests as timeout.
    {
        std::scoped_lock<std::mutex> lockGuard(mLock);

        for (auto& [_, pendingRequests] : mPendingRequestsByClient) {
            for (const auto& request : pendingRequests) {
                (*request.callback)(request.requestIds);
            }
        }
        mPendingRequestsByClient.clear();
    }
}

Result<void> PendingRequestPool::addRequests(const void* clientId,
                                             const std::unordered_set<int64_t>& requestIds,
                                             std::shared_ptr<TimeoutCallbackFunc> callback) {
    std::scoped_lock<std::mutex> lockGuard(mLock);
    std::list<PendingRequest>* pendingRequests;
    size_t pendingRequestCount = 0;
    if (mPendingRequestsByClient.find(clientId) != mPendingRequestsByClient.end()) {
        pendingRequests = &mPendingRequestsByClient[clientId];
        for (const auto& pendingRequest : *pendingRequests) {
            const auto& pendingRequestIds = pendingRequest.requestIds;
            for (int64_t requestId : requestIds) {
                if (pendingRequestIds.find(requestId) != pendingRequestIds.end()) {
                    return Error(toInt(StatusCode::INVALID_ARG))
                           << "duplicate request ID: " << requestId;
                }
            }
            pendingRequestCount += pendingRequestIds.size();
        }
    } else {
        // Create a new empty list for this client.
        pendingRequests = &mPendingRequestsByClient[clientId];
    }

    if (requestIds.size() > MAX_PENDING_REQUEST_PER_CLIENT - pendingRequestCount) {
        return Error(toInt(StatusCode::TRY_AGAIN)) << "too many pending requests";
    }

    int64_t currentTime = elapsedRealtimeNano();
    int64_t timeoutTimestamp = currentTime + mTimeoutInNano;

    pendingRequests->push_back({
            .requestIds = std::unordered_set<int64_t>(requestIds.begin(), requestIds.end()),
            .timeoutTimestamp = timeoutTimestamp,
            .callback = callback,
    });

    return {};
}

bool PendingRequestPool::isRequestPending(const void* clientId, int64_t requestId) const {
    std::scoped_lock<std::mutex> lockGuard(mLock);

    return isRequestPendingLocked(clientId, requestId);
}

size_t PendingRequestPool::countPendingRequests(const void* clientId) const {
    std::scoped_lock<std::mutex> lockGuard(mLock);

    auto it = mPendingRequestsByClient.find(clientId);
    if (it == mPendingRequestsByClient.end()) {
        return 0;
    }

    size_t count = 0;
    for (const auto& pendingRequest : it->second) {
        count += pendingRequest.requestIds.size();
    }

    return count;
}

bool PendingRequestPool::isRequestPendingLocked(const void* clientId, int64_t requestId) const {
    auto it = mPendingRequestsByClient.find(clientId);
    if (it == mPendingRequestsByClient.end()) {
        return false;
    }
    for (const auto& pendingRequest : it->second) {
        const auto& requestIds = pendingRequest.requestIds;
        if (requestIds.find(requestId) != requestIds.end()) {
            return true;
        }
    }
    return false;
}

void PendingRequestPool::checkTimeout() {
    std::vector<PendingRequest> timeoutRequests;
    {
        std::scoped_lock<std::mutex> lockGuard(mLock);

        int64_t currentTime = elapsedRealtimeNano();

        std::vector<const void*> clientsWithEmptyRequests;

        for (auto& [clientId, pendingRequests] : mPendingRequestsByClient) {
            auto it = pendingRequests.begin();
            while (it != pendingRequests.end()) {
                if (it->timeoutTimestamp >= currentTime) {
                    break;
                }
                timeoutRequests.push_back(std::move(*it));
                it = pendingRequests.erase(it);
            }

            if (pendingRequests.empty()) {
                clientsWithEmptyRequests.push_back(clientId);
            }
        }

        for (const void* clientId : clientsWithEmptyRequests) {
            mPendingRequestsByClient.erase(clientId);
        }
    }

    // Call the callback outside the lock.
    for (const auto& request : timeoutRequests) {
        (*request.callback)(request.requestIds);
    }
}

std::unordered_set<int64_t> PendingRequestPool::tryFinishRequests(
        const void* clientId, const std::unordered_set<int64_t>& requestIds) {
    std::scoped_lock<std::mutex> lockGuard(mLock);

    std::unordered_set<int64_t> foundIds;

    if (mPendingRequestsByClient.find(clientId) == mPendingRequestsByClient.end()) {
        return foundIds;
    }

    auto& pendingRequests = mPendingRequestsByClient[clientId];
    auto it = pendingRequests.begin();
    while (it != pendingRequests.end()) {
        auto& pendingRequestIds = it->requestIds;
        for (int64_t requestId : requestIds) {
            auto idIt = pendingRequestIds.find(requestId);
            if (idIt == pendingRequestIds.end()) {
                continue;
            }
            pendingRequestIds.erase(idIt);
            foundIds.insert(requestId);
        }
        if (pendingRequestIds.empty()) {
            it = pendingRequests.erase(it);
            continue;
        }
        it++;
    }

    return foundIds;
}

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
}  // namespace android
Loading