Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e032b158 authored by Yu Shan's avatar Yu Shan
Browse files

Use subscription manager in VHAL.

This CL renames GetSetValuesClient to ConnectedClient and add
SubscribeClient to the supported client type. The SubscribeClient
would provide a different timeout and on-results callback.

This CL adds a map for subscribe clients in VHAL and initializes
subscription manager.

Test: None, will add after we implement subscribe.
Bug: 200737967
Change-Id: I222b155e4365dcf0fa9344c7da5901142433f055
parent 686ec516
Loading
Loading
Loading
Loading
+32 −0
Original line number Diff line number Diff line
@@ -101,6 +101,38 @@ class GetSetValuesClient final : public ConnectedClient {
    std::shared_ptr<const std::function<void(std::vector<ResultType>)>> mResultCallback;
};

// A class to represent a client that calls {@code IVehicle.subscribe}.
class SubscriptionClient final : public ConnectedClient {
  public:
    SubscriptionClient(
            std::shared_ptr<PendingRequestPool> requestPool,
            std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>
                    callback);

    // Gets the callback to be called when the request for this client has finished.
    std::shared_ptr<const std::function<
            void(std::vector<::aidl::android::hardware::automotive::vehicle::GetValueResult>)>>
    getResultCallback();

  protected:
    // Gets the callback to be called when the request for this client has timeout.
    std::shared_ptr<const PendingRequestPool::TimeoutCallbackFunc> getTimeoutCallback() override;

  private:
    // The following members are only initialized during construction.
    std::shared_ptr<const PendingRequestPool::TimeoutCallbackFunc> mTimeoutCallback;
    std::shared_ptr<const std::function<void(
            std::vector<::aidl::android::hardware::automotive::vehicle::GetValueResult>)>>
            mResultCallback;

    static void onGetValueResults(
            const void* clientId,
            std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>
                    callback,
            std::shared_ptr<PendingRequestPool> requestPool,
            std::vector<::aidl::android::hardware::automotive::vehicle::GetValueResult> results);
};

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
+13 −0
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@
#include "ConnectedClient.h"
#include "ParcelableUtils.h"
#include "PendingRequestPool.h"
#include "SubscriptionManager.h"

#include <IVehicleHardware.h>
#include <VehicleUtils.h>
@@ -52,6 +53,8 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve

    explicit DefaultVehicleHal(std::unique_ptr<IVehicleHardware> hardware);

    ~DefaultVehicleHal();

    ::ndk::ScopedAStatus getAllPropConfigs(
            ::aidl::android::hardware::automotive::vehicle::VehiclePropConfigs* returnConfigs)
            override;
@@ -104,12 +107,18 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve
    std::unique_ptr<::ndk::ScopedFileDescriptor> mConfigFile;
    // PendingRequestPool is thread-safe.
    std::shared_ptr<PendingRequestPool> mPendingRequestPool;
    // SubscriptionManager is thread-safe.
    std::unique_ptr<SubscriptionManager> mSubscriptionManager;

    std::mutex mLock;
    std::unordered_map<CallbackType, std::shared_ptr<GetValuesClient>> mGetValuesClients
            GUARDED_BY(mLock);
    std::unordered_map<CallbackType, std::shared_ptr<SetValuesClient>> mSetValuesClients
            GUARDED_BY(mLock);
    std::unordered_map<CallbackType, std::shared_ptr<SubscriptionClient>> mSubscriptionClients
            GUARDED_BY(mLock);
    // An increasing request ID we keep for subscribe clients.
    std::unordered_map<CallbackType, int64_t> mSubscribeIdByClient GUARDED_BY(mLock);

    template <class T>
    std::shared_ptr<T> getOrCreateClient(
@@ -127,6 +136,10 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve
            const std::vector<::aidl::android::hardware::automotive::vehicle::SetValueRequest>&
                    requests);

    void getValueFromHardwareCallCallback(
            const CallbackType& callback,
            const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue& value);

    // Test-only
    // Set the default timeout for pending requests.
    void setTimeout(int64_t timeoutInNano);
+87 −0
Original line number Diff line number Diff line
@@ -244,6 +244,93 @@ void GetSetValuesClient<ResultType, ResultsType>::sendResultsSeparately(
template class GetSetValuesClient<GetValueResult, GetValueResults>;
template class GetSetValuesClient<SetValueResult, SetValueResults>;

SubscriptionClient::SubscriptionClient(std::shared_ptr<PendingRequestPool> requestPool,
                                       std::shared_ptr<IVehicleCallback> callback)
    : ConnectedClient(requestPool, callback) {
    mTimeoutCallback = std::make_shared<const PendingRequestPool::TimeoutCallbackFunc>(
            [](std::unordered_set<int64_t> timeoutIds) {
                for (int64_t id : timeoutIds) {
                    ALOGW("subscribe: requests with IDs: %" PRId64
                          " has timed-out, not client informed, "
                          "possibly one of recurrent requests for this subscription failed",
                          id);
                }
            });
    auto requestPoolCopy = mRequestPool;
    const void* clientId = reinterpret_cast<const void*>(this);
    mResultCallback = std::make_shared<const std::function<void(std::vector<GetValueResult>)>>(
            [clientId, callback, requestPoolCopy](std::vector<GetValueResult> results) {
                onGetValueResults(clientId, callback, requestPoolCopy, results);
            });
}

std::shared_ptr<const std::function<void(std::vector<GetValueResult>)>>
SubscriptionClient::getResultCallback() {
    return mResultCallback;
}

std::shared_ptr<const PendingRequestPool::TimeoutCallbackFunc>
SubscriptionClient::getTimeoutCallback() {
    return mTimeoutCallback;
}

void SubscriptionClient::onGetValueResults(const void* clientId,
                                           std::shared_ptr<IVehicleCallback> callback,
                                           std::shared_ptr<PendingRequestPool> requestPool,
                                           std::vector<GetValueResult> results) {
    std::unordered_set<int64_t> requestIds;
    for (const auto& result : results) {
        requestIds.insert(result.requestId);
    }

    auto finishedRequests = requestPool->tryFinishRequests(clientId, requestIds);
    std::vector<VehiclePropValue> propValues;
    for (auto& result : results) {
        int64_t requestId = result.requestId;
        if (finishedRequests.find(requestId) == finishedRequests.end()) {
            ALOGE("subscribe[%" PRId64
                  "]: no pending request for the result from hardware, "
                  "possibly already time-out",
                  requestId);
            continue;
        }
        if (result.status != StatusCode::OK) {
            ALOGE("subscribe[%" PRId64
                  "]: hardware returns non-ok status for getValues, status: "
                  "%d",
                  requestId, toInt(result.status));
            continue;
        }
        if (!result.prop.has_value()) {
            ALOGE("subscribe[%" PRId64 "]: no prop value in getValues result", requestId);
            continue;
        }
        propValues.push_back(std::move(result.prop.value()));
    }

    if (propValues.empty()) {
        return;
    }
    // TODO(b/205189110): Use memory pool here and fill in sharedMemoryId.
    VehiclePropValues vehiclePropValues;
    int32_t sharedMemoryFileCount = 0;
    ScopedAStatus status = vectorToStableLargeParcelable(propValues, &vehiclePropValues);
    if (!status.isOk()) {
        int statusCode = status.getServiceSpecificError();
        ALOGE("failed to marshal result into large parcelable, error: "
              "%s, code: %d",
              status.getMessage(), statusCode);
        return;
    }

    if (ScopedAStatus callbackStatus =
                callback->onPropertyEvent(vehiclePropValues, sharedMemoryFileCount);
        !callbackStatus.isOk()) {
        ALOGE("failed to call callback, error: %s, code: %d", status.getMessage(),
              status.getServiceSpecificError());
    }
}

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
+88 −26
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@
#include <android-base/result.h>
#include <utils/Log.h>

#include <inttypes.h>
#include <set>
#include <unordered_set>

@@ -33,6 +34,8 @@ namespace hardware {
namespace automotive {
namespace vehicle {

namespace {

using ::aidl::android::hardware::automotive::vehicle::GetValueRequest;
using ::aidl::android::hardware::automotive::vehicle::GetValueRequests;
using ::aidl::android::hardware::automotive::vehicle::GetValueResult;
@@ -54,6 +57,19 @@ using ::android::base::expected;
using ::android::base::Result;
using ::ndk::ScopedAStatus;

std::string toString(const std::unordered_set<int64_t>& values) {
    std::string str = "";
    for (auto it = values.begin(); it != values.end(); it++) {
        str += std::to_string(*it);
        if (std::next(it, 1) != values.end()) {
            str += ", ";
        }
    }
    return str;
}

}  // namespace

DefaultVehicleHal::DefaultVehicleHal(std::unique_ptr<IVehicleHardware> hardware)
    : mVehicleHardware(std::move(hardware)),
      mPendingRequestPool(std::make_shared<PendingRequestPool>(TIMEOUT_IN_NANO)) {
@@ -73,23 +89,16 @@ DefaultVehicleHal::DefaultVehicleHal(std::unique_ptr<IVehicleHardware> hardware)
    if (result.value() != nullptr) {
        mConfigFile = std::move(result.value());
    }
}

void DefaultVehicleHal::setTimeout(int64_t timeoutInNano) {
    mPendingRequestPool = std::make_unique<PendingRequestPool>(timeoutInNano);
    mSubscriptionManager = std::make_unique<SubscriptionManager>(
            [this](const CallbackType& callback, const VehiclePropValue& value) {
                getValueFromHardwareCallCallback(callback, value);
            });
}

ScopedAStatus DefaultVehicleHal::getAllPropConfigs(VehiclePropConfigs* output) {
    if (mConfigFile != nullptr) {
        output->payloads.clear();
        output->sharedMemoryFd.set(dup(mConfigFile->get()));
        return ScopedAStatus::ok();
    }
    output->payloads.reserve(mConfigsByPropId.size());
    for (const auto& [_, config] : mConfigsByPropId) {
        output->payloads.push_back(config);
    }
    return ScopedAStatus::ok();
DefaultVehicleHal::~DefaultVehicleHal() {
    // mSubscriptionManager has reference to this, so must be destroyed before other members.
    mSubscriptionManager.reset();
}

template <class T>
@@ -107,11 +116,63 @@ template std::shared_ptr<DefaultVehicleHal::GetValuesClient>
DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::GetValuesClient>(
        std::unordered_map<CallbackType, std::shared_ptr<GetValuesClient>>* clients,
        const CallbackType& callback);

template std::shared_ptr<DefaultVehicleHal::SetValuesClient>
DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::SetValuesClient>(
        std::unordered_map<CallbackType, std::shared_ptr<SetValuesClient>>* clients,
        const CallbackType& callback);
template std::shared_ptr<SubscriptionClient>
DefaultVehicleHal::getOrCreateClient<SubscriptionClient>(
        std::unordered_map<CallbackType, std::shared_ptr<SubscriptionClient>>* clients,
        const CallbackType& callback);

void DefaultVehicleHal::getValueFromHardwareCallCallback(const CallbackType& callback,
                                                         const VehiclePropValue& value) {
    int64_t subscribeId;
    std::shared_ptr<SubscriptionClient> client;
    {
        std::scoped_lock<std::mutex> lockGuard(mLock);
        // This is initialized to 0 if callback does not exist in the map.
        subscribeId = (mSubscribeIdByClient[callback])++;
        client = getOrCreateClient(&mSubscriptionClients, callback);
    }
    if (auto addRequestResult = client->addRequests({subscribeId}); !addRequestResult.ok()) {
        ALOGE("subscribe[%" PRId64 "]: too many pending requests, ignore the getValue request",
              subscribeId);
        return;
    }

    std::vector<GetValueRequest> hardwareRequests = {{
            .requestId = subscribeId,
            .prop = value,
    }};

    if (StatusCode status =
                mVehicleHardware->getValues(client->getResultCallback(), hardwareRequests);
        status != StatusCode::OK) {
        // If the hardware returns error, finish all the pending requests for this request because
        // we never expect hardware to call callback for these requests.
        client->tryFinishRequests({subscribeId});
        ALOGE("subscribe[%" PRId64 "]: failed to get value from VehicleHardware, code: %d",
              subscribeId, toInt(status));
    }
}

void DefaultVehicleHal::setTimeout(int64_t timeoutInNano) {
    mPendingRequestPool = std::make_unique<PendingRequestPool>(timeoutInNano);
}

ScopedAStatus DefaultVehicleHal::getAllPropConfigs(VehiclePropConfigs* output) {
    if (mConfigFile != nullptr) {
        output->payloads.clear();
        output->sharedMemoryFd.set(dup(mConfigFile->get()));
        return ScopedAStatus::ok();
    }
    output->payloads.reserve(mConfigsByPropId.size());
    for (const auto& [_, config] : mConfigsByPropId) {
        output->payloads.push_back(config);
    }
    return ScopedAStatus::ok();
}

Result<void> DefaultVehicleHal::checkProperty(const VehiclePropValue& propValue) {
    int32_t propId = propValue.prop;
@@ -151,7 +212,7 @@ ScopedAStatus DefaultVehicleHal::getValues(const CallbackType& callback,

    auto maybeRequestIds = checkDuplicateRequests(getValueRequests);
    if (!maybeRequestIds.ok()) {
        ALOGE("duplicate request ID");
        ALOGE("getValues: duplicate request ID");
        return toScopedAStatus(maybeRequestIds, StatusCode::INVALID_ARG);
    }
    // The set of request Ids that we would send to hardware.
@@ -165,8 +226,8 @@ ScopedAStatus DefaultVehicleHal::getValues(const CallbackType& callback,
    }
    // Register the pending hardware requests and also check for duplicate request Ids.
    if (auto addRequestResult = client->addRequests(hardwareRequestIds); !addRequestResult.ok()) {
        ALOGE("failed to add pending requests, error: %s",
              addRequestResult.error().message().c_str());
        ALOGE("getValues[%s]: failed to add pending requests, error: %s",
              toString(hardwareRequestIds).c_str(), addRequestResult.error().message().c_str());
        return toScopedAStatus(addRequestResult);
    }

@@ -176,7 +237,8 @@ ScopedAStatus DefaultVehicleHal::getValues(const CallbackType& callback,
        // If the hardware returns error, finish all the pending requests for this request because
        // we never expect hardware to call callback for these requests.
        client->tryFinishRequests(hardwareRequestIds);
        ALOGE("failed to get value from VehicleHardware, status: %d", toInt(status));
        ALOGE("getValues[%s]: failed to get value from VehicleHardware, status: %d",
              toString(hardwareRequestIds).c_str(), toInt(status));
        return ScopedAStatus::fromServiceSpecificErrorWithMessage(
                toInt(status), "failed to get value from VehicleHardware");
    }
@@ -201,14 +263,15 @@ ScopedAStatus DefaultVehicleHal::setValues(const CallbackType& callback,

    auto maybeRequestIds = checkDuplicateRequests(setValueRequests);
    if (!maybeRequestIds.ok()) {
        ALOGE("duplicate request ID");
        ALOGE("setValues: duplicate request ID");
        return toScopedAStatus(maybeRequestIds, StatusCode::INVALID_ARG);
    }

    for (auto& request : setValueRequests) {
        int64_t requestId = request.requestId;
        if (auto result = checkProperty(request.value); !result.ok()) {
            ALOGW("property not valid: %s", result.error().message().c_str());
            ALOGW("setValues[%" PRId64 "]: property not valid: %s", requestId,
                  result.error().message().c_str());
            failedResults.push_back(SetValueResult{
                    .requestId = requestId,
                    .status = StatusCode::INVALID_ARG,
@@ -233,8 +296,8 @@ ScopedAStatus DefaultVehicleHal::setValues(const CallbackType& callback,

    // Register the pending hardware requests and also check for duplicate request Ids.
    if (auto addRequestResult = client->addRequests(hardwareRequestIds); !addRequestResult.ok()) {
        ALOGE("failed to add pending requests, error: %s",
              addRequestResult.error().message().c_str());
        ALOGE("setValues[%s], failed to add pending requests, error: %s",
              toString(hardwareRequestIds).c_str(), addRequestResult.error().message().c_str());
        return toScopedAStatus(addRequestResult, StatusCode::INVALID_ARG);
    }

@@ -249,7 +312,8 @@ ScopedAStatus DefaultVehicleHal::setValues(const CallbackType& callback,
        // If the hardware returns error, finish all the pending requests for this request because
        // we never expect hardware to call callback for these requests.
        client->tryFinishRequests(hardwareRequestIds);
        ALOGE("failed to set value to VehicleHardware, status: %d", toInt(status));
        ALOGE("setValues[%s], failed to set value to VehicleHardware, status: %d",
              toString(hardwareRequestIds).c_str(), toInt(status));
        return ScopedAStatus::fromServiceSpecificErrorWithMessage(
                toInt(status), "failed to set value to VehicleHardware");
    }
@@ -298,12 +362,10 @@ ScopedAStatus DefaultVehicleHal::getPropConfigs(const std::vector<int32_t>& prop

ScopedAStatus DefaultVehicleHal::subscribe(const CallbackType&,
                                           const std::vector<SubscribeOptions>&, int32_t) {
    // TODO(b/200737967): implement this.
    return ScopedAStatus::ok();
}

ScopedAStatus DefaultVehicleHal::unsubscribe(const CallbackType&, const std::vector<int32_t>&) {
    // TODO(b/200737967): implement this.
    return ScopedAStatus::ok();
}

+1 −0
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@
 * limitations under the License.
 */

#include "ConnectedClient.h"
#include "DefaultVehicleHal.h"
#include "MockVehicleCallback.h"