Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 77d4e62e authored by TreeHugger Robot's avatar TreeHugger Robot Committed by Android (Google) Code Review
Browse files

Merge "Add subscription manager."

parents 714f9596 686ec516
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@
#include <aidl/android/hardware/automotive/vehicle/SetValueResult.h>
#include <aidl/android/hardware/automotive/vehicle/SetValueResults.h>
#include <aidl/android/hardware/automotive/vehicle/StatusCode.h>
#include <aidl/android/hardware/automotive/vehicle/SubscribeOptions.h>
#include <aidl/android/hardware/automotive/vehicle/VehicleApPowerStateReport.h>
#include <aidl/android/hardware/automotive/vehicle/VehicleApPowerStateReq.h>
#include <aidl/android/hardware/automotive/vehicle/VehicleArea.h>
+1 −0
Original line number Diff line number Diff line
@@ -58,6 +58,7 @@ cc_library {
        "src/DefaultVehicleHal.cpp",
        "src/PendingRequestPool.cpp",
        "src/RecurrentTimer.cpp",
        "src/SubscriptionManager.cpp",
    ],
    static_libs: [
        "VehicleHalUtils",
+151 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef android_hardware_automotive_vehicle_aidl_impl_vhal_include_SubscriptionManager_H_
#define android_hardware_automotive_vehicle_aidl_impl_vhal_include_SubscriptionManager_H_

#include "RecurrentTimer.h"

#include <VehicleHalTypes.h>

#include <aidl/android/hardware/automotive/vehicle/IVehicleCallback.h>
#include <android-base/result.h>
#include <android-base/thread_annotations.h>

#include <mutex>
#include <unordered_map>
#include <unordered_set>
#include <vector>

namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {

// A thread-safe subscription manager that manages all VHAL subscriptions.
class SubscriptionManager final {
  public:
    using CallbackType =
            std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>;
    using GetValueFunc = std::function<void(
            const CallbackType& callback,
            const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue& value)>;

    explicit SubscriptionManager(GetValueFunc&& action);
    ~SubscriptionManager();

    // Subscribes to properties according to {@code SubscribeOptions}. Note that all option must
    // contain non-empty areaIds field, which contains all area IDs to subscribe. As a result,
    // the options here is different from the options passed from VHAL client.
    // Returns error if any of the subscribe options is not valid. If error is returned, no
    // properties would be subscribed.
    // Returns ok if all the options are parsed correctly and all the properties are subscribed.
    ::android::base::Result<void> subscribe(
            const CallbackType& callback,
            const std::vector<::aidl::android::hardware::automotive::vehicle::SubscribeOptions>&
                    options,
            bool isContinuousProperty);

    // Unsubscribes from the properties for the callback.
    // Returns error if the callback was not subscribed before or one of the given property was not
    // subscribed. If error is returned, no property would be unsubscribed.
    // Returns ok if all the requested properties for the callback are unsubscribed.
    ::android::base::Result<void> unsubscribe(const CallbackType& callback,
                                              const std::vector<int32_t>& propIds);

    // Unsubscribes to all the properties for the callback.
    // Returns error if the callback was not subscribed before. If error is returned, no property
    // would be unsubscribed.
    // Returns ok if all the properties for the callback are unsubscribed.
    ::android::base::Result<void> unsubscribe(const CallbackType& callback);

    // For a list of updated properties, returns a map that maps clients subscribing to
    // the updated properties to a list of updated values. This would only return on-change property
    // clients that should be informed for the given updated values.
    std::unordered_map<
            std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>,
            std::vector<const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue*>>
    getSubscribedClients(
            const std::vector<::aidl::android::hardware::automotive::vehicle::VehiclePropValue>&
                    updatedValues);

    // Checks whether the sample rate is valid.
    static bool checkSampleRate(float sampleRate);

  private:
    struct PropIdAreaId {
        int32_t propId;
        int32_t areaId;

        bool operator==(const PropIdAreaId& other) const;
    };

    struct PropIdAreaIdHash {
        size_t operator()(const PropIdAreaId& propIdAreaId) const;
    };

    // A class to represent a registered subscription.
    class Subscription {
      public:
        Subscription() = default;

        Subscription(const Subscription&) = delete;

        virtual ~Subscription() = default;

        virtual bool isOnChange();
    };

    // A subscription for OnContinuous property. The registered action would be called recurrently
    // until this class is destructed.
    class RecurrentSubscription final : public Subscription {
      public:
        explicit RecurrentSubscription(std::shared_ptr<RecurrentTimer> timer,
                                       std::function<void()>&& action, int64_t interval);
        ~RecurrentSubscription();

        bool isOnChange() override;

      private:
        std::shared_ptr<std::function<void()>> mAction;
        std::shared_ptr<RecurrentTimer> mTimer;
    };

    // A subscription for OnChange property.
    class OnChangeSubscription final : public Subscription {
      public:
        bool isOnChange() override;
    };

    mutable std::mutex mLock;
    std::unordered_map<PropIdAreaId, std::unordered_set<CallbackType>, PropIdAreaIdHash>
            mClientsByPropIdArea GUARDED_BY(mLock);
    std::unordered_map<CallbackType, std::unordered_map<PropIdAreaId, std::unique_ptr<Subscription>,
                                                        PropIdAreaIdHash>>
            mSubscriptionsByClient GUARDED_BY(mLock);
    // RecurrentTimer is thread-safe.
    std::shared_ptr<RecurrentTimer> mTimer;
    const GetValueFunc mGetValue;

    static ::android::base::Result<int64_t> getInterval(float sampleRate);
};

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
}  // namespace android

#endif  // android_hardware_automotive_vehicle_aidl_impl_vhal_include_SubscriptionManager_H_
+240 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "SubscriptionManager.h"

#include <math/HashCombine.h>
#include <utils/Log.h>

namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {

namespace {

constexpr float ONE_SECOND_IN_NANO = 1000000000.;

}  // namespace

using ::aidl::android::hardware::automotive::vehicle::IVehicleCallback;
using ::aidl::android::hardware::automotive::vehicle::SubscribeOptions;
using ::aidl::android::hardware::automotive::vehicle::VehiclePropValue;
using ::android::base::Error;
using ::android::base::Result;
using ::ndk::ScopedAStatus;

bool SubscriptionManager::PropIdAreaId::operator==(const PropIdAreaId& other) const {
    return areaId == other.areaId && propId == other.propId;
}

size_t SubscriptionManager::PropIdAreaIdHash::operator()(PropIdAreaId const& propIdAreaId) const {
    size_t res = 0;
    hashCombine(res, propIdAreaId.propId);
    hashCombine(res, propIdAreaId.areaId);
    return res;
}

SubscriptionManager::SubscriptionManager(GetValueFunc&& action)
    : mTimer(std::make_shared<RecurrentTimer>()), mGetValue(std::move(action)) {}

SubscriptionManager::~SubscriptionManager() {
    std::scoped_lock<std::mutex> lockGuard(mLock);

    mClientsByPropIdArea.clear();
    mSubscriptionsByClient.clear();
}

bool SubscriptionManager::checkSampleRate(float sampleRate) {
    return getInterval(sampleRate).ok();
}

Result<int64_t> SubscriptionManager::getInterval(float sampleRate) {
    int64_t interval = 0;
    if (sampleRate <= 0) {
        return Error() << "invalid sample rate, must be a positive number";
    }
    if (sampleRate <= (ONE_SECOND_IN_NANO / static_cast<float>(INT64_MAX))) {
        return Error() << "invalid sample rate: " << sampleRate << ", too small";
    }
    interval = static_cast<int64_t>(ONE_SECOND_IN_NANO / sampleRate);
    return interval;
}

Result<void> SubscriptionManager::subscribe(const std::shared_ptr<IVehicleCallback>& callback,
                                            const std::vector<SubscribeOptions>& options,
                                            bool isContinuousProperty) {
    std::scoped_lock<std::mutex> lockGuard(mLock);

    std::vector<int64_t> intervals;

    for (const auto& option : options) {
        float sampleRate = option.sampleRate;

        if (isContinuousProperty) {
            auto intervalResult = getInterval(sampleRate);
            if (!intervalResult.ok()) {
                return intervalResult.error();
            }
            intervals.push_back(intervalResult.value());
        }

        if (option.areaIds.empty()) {
            ALOGE("area IDs to subscribe must not be empty");
            return Error() << "area IDs to subscribe must not be empty";
        }
    }

    size_t intervalIndex = 0;
    for (const auto& option : options) {
        int32_t propId = option.propId;
        const std::vector<int32_t>& areaIds = option.areaIds;
        int64_t interval = 0;
        if (isContinuousProperty) {
            interval = intervals[intervalIndex];
            intervalIndex++;
        }
        for (int32_t areaId : areaIds) {
            PropIdAreaId propIdAreaId = {
                    .propId = propId,
                    .areaId = areaId,
            };
            if (isContinuousProperty) {
                VehiclePropValue propValueRequest{
                        .prop = propId,
                        .areaId = areaId,
                };
                mSubscriptionsByClient[callback][propIdAreaId] =
                        std::make_unique<RecurrentSubscription>(
                                mTimer,
                                [this, callback, propValueRequest] {
                                    mGetValue(callback, propValueRequest);
                                },
                                interval);
            } else {
                mSubscriptionsByClient[callback][propIdAreaId] =
                        std::make_unique<OnChangeSubscription>();
            }
            mClientsByPropIdArea[propIdAreaId].insert(callback);
        }
    }
    return {};
}

Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCallback>& callback,
                                              const std::vector<int32_t>& propIds) {
    std::scoped_lock<std::mutex> lockGuard(mLock);

    if (mSubscriptionsByClient.find(callback) == mSubscriptionsByClient.end()) {
        return Error() << "No property was subscribed for the callback";
    }
    std::unordered_set<int32_t> subscribedPropIds;
    for (auto const& [propIdAreaId, _] : mSubscriptionsByClient[callback]) {
        subscribedPropIds.insert(propIdAreaId.propId);
    }

    for (int32_t propId : propIds) {
        if (subscribedPropIds.find(propId) == subscribedPropIds.end()) {
            return Error() << "property ID: " << propId << " is not subscribed";
        }
    }

    auto& subscriptions = mSubscriptionsByClient[callback];
    auto it = subscriptions.begin();
    while (it != subscriptions.end()) {
        int32_t propId = it->first.propId;
        if (std::find(propIds.begin(), propIds.end(), propId) != propIds.end()) {
            auto& clients = mClientsByPropIdArea[it->first];
            clients.erase(callback);
            if (clients.empty()) {
                mClientsByPropIdArea.erase(it->first);
            }
            it = subscriptions.erase(it);
        } else {
            it++;
        }
    }
    if (subscriptions.empty()) {
        mSubscriptionsByClient.erase(callback);
    }
    return {};
}

Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCallback>& callback) {
    std::scoped_lock<std::mutex> lockGuard(mLock);

    if (mSubscriptionsByClient.find(callback) == mSubscriptionsByClient.end()) {
        return Error() << "No property was subscribed for the callback";
    }

    auto& subscriptions = mSubscriptionsByClient[callback];
    for (auto const& [propIdAreaId, _] : subscriptions) {
        auto& clients = mClientsByPropIdArea[propIdAreaId];
        clients.erase(callback);
        if (clients.empty()) {
            mClientsByPropIdArea.erase(propIdAreaId);
        }
    }
    mSubscriptionsByClient.erase(callback);
    return {};
}

std::unordered_map<std::shared_ptr<IVehicleCallback>, std::vector<const VehiclePropValue*>>
SubscriptionManager::getSubscribedClients(const std::vector<VehiclePropValue>& updatedValues) {
    std::scoped_lock<std::mutex> lockGuard(mLock);
    std::unordered_map<std::shared_ptr<IVehicleCallback>, std::vector<const VehiclePropValue*>>
            clients;

    for (const auto& value : updatedValues) {
        PropIdAreaId propIdAreaId{
                .propId = value.prop,
                .areaId = value.areaId,
        };
        if (mClientsByPropIdArea.find(propIdAreaId) == mClientsByPropIdArea.end()) {
            continue;
        }
        for (const auto& client : mClientsByPropIdArea[propIdAreaId]) {
            if (!mSubscriptionsByClient[client][propIdAreaId]->isOnChange()) {
                continue;
            }
            clients[client].push_back(&value);
        }
    }
    return clients;
}

SubscriptionManager::RecurrentSubscription::RecurrentSubscription(
        std::shared_ptr<RecurrentTimer> timer, std::function<void()>&& action, int64_t interval)
    : mAction(std::make_shared<std::function<void()>>(action)), mTimer(timer) {
    mTimer->registerTimerCallback(interval, mAction);
}

SubscriptionManager::RecurrentSubscription::~RecurrentSubscription() {
    mTimer->unregisterTimerCallback(mAction);
}

bool SubscriptionManager::RecurrentSubscription::isOnChange() {
    return false;
}

bool SubscriptionManager::OnChangeSubscription::isOnChange() {
    return true;
}

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
}  // namespace android
+483 −0

File added.

Preview size limit exceeded, changes collapsed.