Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 686ec516 authored by Yu Shan's avatar Yu Shan
Browse files

Add subscription manager.

Add a class to manage VHAL subscription. It supports subscribing
and unsubscribing to properties. For continuous property, it uses
recurrent timer to recurrently calls the registered function. For
on-change property, it would return all subscribed clients for a
given property.

Test: atest DefaultVehicleHalTest
Bug: 200737967

Change-Id: I3e1a0401fd465dc31fe08ea77d5a6651fa7bbfaf
parent 5ad921ae
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@
#include <aidl/android/hardware/automotive/vehicle/SetValueResult.h>
#include <aidl/android/hardware/automotive/vehicle/SetValueResults.h>
#include <aidl/android/hardware/automotive/vehicle/StatusCode.h>
#include <aidl/android/hardware/automotive/vehicle/SubscribeOptions.h>
#include <aidl/android/hardware/automotive/vehicle/VehicleApPowerStateReport.h>
#include <aidl/android/hardware/automotive/vehicle/VehicleApPowerStateReq.h>
#include <aidl/android/hardware/automotive/vehicle/VehicleArea.h>
+1 −0
Original line number Diff line number Diff line
@@ -58,6 +58,7 @@ cc_library {
        "src/DefaultVehicleHal.cpp",
        "src/PendingRequestPool.cpp",
        "src/RecurrentTimer.cpp",
        "src/SubscriptionManager.cpp",
    ],
    static_libs: [
        "VehicleHalUtils",
+151 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef android_hardware_automotive_vehicle_aidl_impl_vhal_include_SubscriptionManager_H_
#define android_hardware_automotive_vehicle_aidl_impl_vhal_include_SubscriptionManager_H_

#include "RecurrentTimer.h"

#include <VehicleHalTypes.h>

#include <aidl/android/hardware/automotive/vehicle/IVehicleCallback.h>
#include <android-base/result.h>
#include <android-base/thread_annotations.h>

#include <mutex>
#include <unordered_map>
#include <unordered_set>
#include <vector>

namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {

// A thread-safe subscription manager that manages all VHAL subscriptions.
class SubscriptionManager final {
  public:
    using CallbackType =
            std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>;
    using GetValueFunc = std::function<void(
            const CallbackType& callback,
            const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue& value)>;

    explicit SubscriptionManager(GetValueFunc&& action);
    ~SubscriptionManager();

    // Subscribes to properties according to {@code SubscribeOptions}. Note that all option must
    // contain non-empty areaIds field, which contains all area IDs to subscribe. As a result,
    // the options here is different from the options passed from VHAL client.
    // Returns error if any of the subscribe options is not valid. If error is returned, no
    // properties would be subscribed.
    // Returns ok if all the options are parsed correctly and all the properties are subscribed.
    ::android::base::Result<void> subscribe(
            const CallbackType& callback,
            const std::vector<::aidl::android::hardware::automotive::vehicle::SubscribeOptions>&
                    options,
            bool isContinuousProperty);

    // Unsubscribes from the properties for the callback.
    // Returns error if the callback was not subscribed before or one of the given property was not
    // subscribed. If error is returned, no property would be unsubscribed.
    // Returns ok if all the requested properties for the callback are unsubscribed.
    ::android::base::Result<void> unsubscribe(const CallbackType& callback,
                                              const std::vector<int32_t>& propIds);

    // Unsubscribes to all the properties for the callback.
    // Returns error if the callback was not subscribed before. If error is returned, no property
    // would be unsubscribed.
    // Returns ok if all the properties for the callback are unsubscribed.
    ::android::base::Result<void> unsubscribe(const CallbackType& callback);

    // For a list of updated properties, returns a map that maps clients subscribing to
    // the updated properties to a list of updated values. This would only return on-change property
    // clients that should be informed for the given updated values.
    std::unordered_map<
            std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>,
            std::vector<const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue*>>
    getSubscribedClients(
            const std::vector<::aidl::android::hardware::automotive::vehicle::VehiclePropValue>&
                    updatedValues);

    // Checks whether the sample rate is valid.
    static bool checkSampleRate(float sampleRate);

  private:
    struct PropIdAreaId {
        int32_t propId;
        int32_t areaId;

        bool operator==(const PropIdAreaId& other) const;
    };

    struct PropIdAreaIdHash {
        size_t operator()(const PropIdAreaId& propIdAreaId) const;
    };

    // A class to represent a registered subscription.
    class Subscription {
      public:
        Subscription() = default;

        Subscription(const Subscription&) = delete;

        virtual ~Subscription() = default;

        virtual bool isOnChange();
    };

    // A subscription for OnContinuous property. The registered action would be called recurrently
    // until this class is destructed.
    class RecurrentSubscription final : public Subscription {
      public:
        explicit RecurrentSubscription(std::shared_ptr<RecurrentTimer> timer,
                                       std::function<void()>&& action, int64_t interval);
        ~RecurrentSubscription();

        bool isOnChange() override;

      private:
        std::shared_ptr<std::function<void()>> mAction;
        std::shared_ptr<RecurrentTimer> mTimer;
    };

    // A subscription for OnChange property.
    class OnChangeSubscription final : public Subscription {
      public:
        bool isOnChange() override;
    };

    mutable std::mutex mLock;
    std::unordered_map<PropIdAreaId, std::unordered_set<CallbackType>, PropIdAreaIdHash>
            mClientsByPropIdArea GUARDED_BY(mLock);
    std::unordered_map<CallbackType, std::unordered_map<PropIdAreaId, std::unique_ptr<Subscription>,
                                                        PropIdAreaIdHash>>
            mSubscriptionsByClient GUARDED_BY(mLock);
    // RecurrentTimer is thread-safe.
    std::shared_ptr<RecurrentTimer> mTimer;
    const GetValueFunc mGetValue;

    static ::android::base::Result<int64_t> getInterval(float sampleRate);
};

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
}  // namespace android

#endif  // android_hardware_automotive_vehicle_aidl_impl_vhal_include_SubscriptionManager_H_
+240 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "SubscriptionManager.h"

#include <math/HashCombine.h>
#include <utils/Log.h>

namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {

namespace {

constexpr float ONE_SECOND_IN_NANO = 1000000000.;

}  // namespace

using ::aidl::android::hardware::automotive::vehicle::IVehicleCallback;
using ::aidl::android::hardware::automotive::vehicle::SubscribeOptions;
using ::aidl::android::hardware::automotive::vehicle::VehiclePropValue;
using ::android::base::Error;
using ::android::base::Result;
using ::ndk::ScopedAStatus;

bool SubscriptionManager::PropIdAreaId::operator==(const PropIdAreaId& other) const {
    return areaId == other.areaId && propId == other.propId;
}

size_t SubscriptionManager::PropIdAreaIdHash::operator()(PropIdAreaId const& propIdAreaId) const {
    size_t res = 0;
    hashCombine(res, propIdAreaId.propId);
    hashCombine(res, propIdAreaId.areaId);
    return res;
}

SubscriptionManager::SubscriptionManager(GetValueFunc&& action)
    : mTimer(std::make_shared<RecurrentTimer>()), mGetValue(std::move(action)) {}

SubscriptionManager::~SubscriptionManager() {
    std::scoped_lock<std::mutex> lockGuard(mLock);

    mClientsByPropIdArea.clear();
    mSubscriptionsByClient.clear();
}

bool SubscriptionManager::checkSampleRate(float sampleRate) {
    return getInterval(sampleRate).ok();
}

Result<int64_t> SubscriptionManager::getInterval(float sampleRate) {
    int64_t interval = 0;
    if (sampleRate <= 0) {
        return Error() << "invalid sample rate, must be a positive number";
    }
    if (sampleRate <= (ONE_SECOND_IN_NANO / static_cast<float>(INT64_MAX))) {
        return Error() << "invalid sample rate: " << sampleRate << ", too small";
    }
    interval = static_cast<int64_t>(ONE_SECOND_IN_NANO / sampleRate);
    return interval;
}

Result<void> SubscriptionManager::subscribe(const std::shared_ptr<IVehicleCallback>& callback,
                                            const std::vector<SubscribeOptions>& options,
                                            bool isContinuousProperty) {
    std::scoped_lock<std::mutex> lockGuard(mLock);

    std::vector<int64_t> intervals;

    for (const auto& option : options) {
        float sampleRate = option.sampleRate;

        if (isContinuousProperty) {
            auto intervalResult = getInterval(sampleRate);
            if (!intervalResult.ok()) {
                return intervalResult.error();
            }
            intervals.push_back(intervalResult.value());
        }

        if (option.areaIds.empty()) {
            ALOGE("area IDs to subscribe must not be empty");
            return Error() << "area IDs to subscribe must not be empty";
        }
    }

    size_t intervalIndex = 0;
    for (const auto& option : options) {
        int32_t propId = option.propId;
        const std::vector<int32_t>& areaIds = option.areaIds;
        int64_t interval = 0;
        if (isContinuousProperty) {
            interval = intervals[intervalIndex];
            intervalIndex++;
        }
        for (int32_t areaId : areaIds) {
            PropIdAreaId propIdAreaId = {
                    .propId = propId,
                    .areaId = areaId,
            };
            if (isContinuousProperty) {
                VehiclePropValue propValueRequest{
                        .prop = propId,
                        .areaId = areaId,
                };
                mSubscriptionsByClient[callback][propIdAreaId] =
                        std::make_unique<RecurrentSubscription>(
                                mTimer,
                                [this, callback, propValueRequest] {
                                    mGetValue(callback, propValueRequest);
                                },
                                interval);
            } else {
                mSubscriptionsByClient[callback][propIdAreaId] =
                        std::make_unique<OnChangeSubscription>();
            }
            mClientsByPropIdArea[propIdAreaId].insert(callback);
        }
    }
    return {};
}

Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCallback>& callback,
                                              const std::vector<int32_t>& propIds) {
    std::scoped_lock<std::mutex> lockGuard(mLock);

    if (mSubscriptionsByClient.find(callback) == mSubscriptionsByClient.end()) {
        return Error() << "No property was subscribed for the callback";
    }
    std::unordered_set<int32_t> subscribedPropIds;
    for (auto const& [propIdAreaId, _] : mSubscriptionsByClient[callback]) {
        subscribedPropIds.insert(propIdAreaId.propId);
    }

    for (int32_t propId : propIds) {
        if (subscribedPropIds.find(propId) == subscribedPropIds.end()) {
            return Error() << "property ID: " << propId << " is not subscribed";
        }
    }

    auto& subscriptions = mSubscriptionsByClient[callback];
    auto it = subscriptions.begin();
    while (it != subscriptions.end()) {
        int32_t propId = it->first.propId;
        if (std::find(propIds.begin(), propIds.end(), propId) != propIds.end()) {
            auto& clients = mClientsByPropIdArea[it->first];
            clients.erase(callback);
            if (clients.empty()) {
                mClientsByPropIdArea.erase(it->first);
            }
            it = subscriptions.erase(it);
        } else {
            it++;
        }
    }
    if (subscriptions.empty()) {
        mSubscriptionsByClient.erase(callback);
    }
    return {};
}

Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCallback>& callback) {
    std::scoped_lock<std::mutex> lockGuard(mLock);

    if (mSubscriptionsByClient.find(callback) == mSubscriptionsByClient.end()) {
        return Error() << "No property was subscribed for the callback";
    }

    auto& subscriptions = mSubscriptionsByClient[callback];
    for (auto const& [propIdAreaId, _] : subscriptions) {
        auto& clients = mClientsByPropIdArea[propIdAreaId];
        clients.erase(callback);
        if (clients.empty()) {
            mClientsByPropIdArea.erase(propIdAreaId);
        }
    }
    mSubscriptionsByClient.erase(callback);
    return {};
}

std::unordered_map<std::shared_ptr<IVehicleCallback>, std::vector<const VehiclePropValue*>>
SubscriptionManager::getSubscribedClients(const std::vector<VehiclePropValue>& updatedValues) {
    std::scoped_lock<std::mutex> lockGuard(mLock);
    std::unordered_map<std::shared_ptr<IVehicleCallback>, std::vector<const VehiclePropValue*>>
            clients;

    for (const auto& value : updatedValues) {
        PropIdAreaId propIdAreaId{
                .propId = value.prop,
                .areaId = value.areaId,
        };
        if (mClientsByPropIdArea.find(propIdAreaId) == mClientsByPropIdArea.end()) {
            continue;
        }
        for (const auto& client : mClientsByPropIdArea[propIdAreaId]) {
            if (!mSubscriptionsByClient[client][propIdAreaId]->isOnChange()) {
                continue;
            }
            clients[client].push_back(&value);
        }
    }
    return clients;
}

SubscriptionManager::RecurrentSubscription::RecurrentSubscription(
        std::shared_ptr<RecurrentTimer> timer, std::function<void()>&& action, int64_t interval)
    : mAction(std::make_shared<std::function<void()>>(action)), mTimer(timer) {
    mTimer->registerTimerCallback(interval, mAction);
}

SubscriptionManager::RecurrentSubscription::~RecurrentSubscription() {
    mTimer->unregisterTimerCallback(mAction);
}

bool SubscriptionManager::RecurrentSubscription::isOnChange() {
    return false;
}

bool SubscriptionManager::OnChangeSubscription::isOnChange() {
    return true;
}

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
}  // namespace android
+483 −0

File added.

Preview size limit exceeded, changes collapsed.