Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 723d476b authored by Yu Shan's avatar Yu Shan
Browse files

Support property events batching.

Supports batching property change events based on configured time
window in DefaultVehicleHal. This will reduce binder callbacks but
will increase latency. This is configurable from IVehicleHardware
side, default is off.

Test: atest DefaultVehicleHalTest
Bug: 305111644
Change-Id: Id703e8486fdeb98a88cc27dbebe4d79f232f7e3d
parent e5bdf5aa
Loading
Loading
Loading
Loading
+20 −1
Original line number Diff line number Diff line
@@ -118,7 +118,7 @@ class IVehicleHardware {
    virtual aidl::android::hardware::automotive::vehicle::StatusCode checkHealth() = 0;

    // Register a callback that would be called when there is a property change event from vehicle.
    // Must only be called once during initialization.
    // This function must only be called once during initialization.
    virtual void registerOnPropertyChangeEvent(
            std::unique_ptr<const PropertyChangeCallback> callback) = 0;

@@ -126,6 +126,25 @@ class IVehicleHardware {
    // vehicle. Must only be called once during initialization.
    virtual void registerOnPropertySetErrorEvent(
            std::unique_ptr<const PropertySetErrorCallback> callback) = 0;

    // Gets the batching window used by DefaultVehicleHal for property change events.
    //
    // In DefaultVehicleHal, all the property change events generated within the batching window
    // will be delivered through one callback to the VHAL client. This affects the maximum supported
    // subscription rate. For example, if this returns 10ms, then only one callback for property
    // change events will be called per 10ms, meaining that the max subscription rate for all
    // continuous properties would be 100hz.
    //
    // A higher batching window means less callbacks to the VHAL client, causing a better
    // performance. However, it also means a longer average latency for every property change
    // events.
    //
    // 0 means no batching should be enabled in DefaultVehicleHal. In this case, batching can
    // be optionally implemented in IVehicleHardware layer.
    virtual std::chrono::nanoseconds getPropertyOnChangeEventBatchingWindow() {
        // By default batching is disabled.
        return std::chrono::nanoseconds(0);
    }
};

}  // namespace vehicle
+76 −0
Original line number Diff line number Diff line
@@ -69,6 +69,19 @@ class ConcurrentQueue {
        mCond.notify_one();
    }

    void push(std::vector<T>&& items) {
        {
            std::scoped_lock<std::mutex> lockGuard(mLock);
            if (!mIsActive) {
                return;
            }
            for (T& item : items) {
                mQueue.push(std::move(item));
            }
        }
        mCond.notify_one();
    }

    // Deactivates the queue, thus no one can push items to it, also notifies all waiting thread.
    // The items already in the queue could still be flushed even after the queue is deactivated.
    void deactivate() {
@@ -92,6 +105,69 @@ class ConcurrentQueue {
    std::queue<T> mQueue GUARDED_BY(mLock);
};

template <typename T>
class BatchingConsumer {
  private:
    enum class State {
        INIT = 0,
        RUNNING = 1,
        STOP_REQUESTED = 2,
        STOPPED = 3,
    };

  public:
    BatchingConsumer() : mState(State::INIT) {}

    BatchingConsumer(const BatchingConsumer&) = delete;
    BatchingConsumer& operator=(const BatchingConsumer&) = delete;

    using OnBatchReceivedFunc = std::function<void(std::vector<T> vec)>;

    void run(ConcurrentQueue<T>* queue, std::chrono::nanoseconds batchInterval,
             const OnBatchReceivedFunc& func) {
        mQueue = queue;
        mBatchInterval = batchInterval;

        mWorkerThread = std::thread(&BatchingConsumer<T>::runInternal, this, func);
    }

    void requestStop() { mState = State::STOP_REQUESTED; }

    void waitStopped() {
        if (mWorkerThread.joinable()) {
            mWorkerThread.join();
        }
    }

  private:
    void runInternal(const OnBatchReceivedFunc& onBatchReceived) {
        if (mState.exchange(State::RUNNING) == State::INIT) {
            while (State::RUNNING == mState) {
                mQueue->waitForItems();
                if (State::STOP_REQUESTED == mState) break;

                std::this_thread::sleep_for(mBatchInterval);
                if (State::STOP_REQUESTED == mState) break;

                std::vector<T> items = mQueue->flush();

                if (items.size() > 0) {
                    onBatchReceived(std::move(items));
                }
            }
        }

        mState = State::STOPPED;
    }

  private:
    std::thread mWorkerThread;

    std::atomic<State> mState;
    std::chrono::nanoseconds mBatchInterval;
    ConcurrentQueue<T>* mQueue;
};

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
+23 −1
Original line number Diff line number Diff line
@@ -144,6 +144,15 @@ class DefaultVehicleHal final : public aidl::android::hardware::automotive::vehi
    std::shared_ptr<PendingRequestPool> mPendingRequestPool;
    // SubscriptionManager is thread-safe.
    std::shared_ptr<SubscriptionManager> mSubscriptionManager;
    // ConcurrentQueue is thread-safe.
    std::shared_ptr<ConcurrentQueue<aidl::android::hardware::automotive::vehicle::VehiclePropValue>>
            mBatchedEventQueue;
    // BatchingConsumer is thread-safe.
    std::shared_ptr<
            BatchingConsumer<aidl::android::hardware::automotive::vehicle::VehiclePropValue>>
            mPropertyChangeEventsBatchingConsumer;
    // Only set once during initialization.
    std::chrono::nanoseconds mEventBatchingWindow;

    std::mutex mLock;
    std::unordered_map<const AIBinder*, std::unique_ptr<OnBinderDiedContext>> mOnBinderDiedContexts
@@ -209,6 +218,19 @@ class DefaultVehicleHal final : public aidl::android::hardware::automotive::vehi

    size_t countSubscribeClients();

    // Handles the property change events in batch.
    void handleBatchedPropertyEvents(
            std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&&
                    batchedEvents);

    // Puts the property change events into a queue so that they can handled in batch.
    static void batchPropertyChangeEvent(
            const std::weak_ptr<ConcurrentQueue<
                    aidl::android::hardware::automotive::vehicle::VehiclePropValue>>&
                    batchedEventQueue,
            std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&&
                    updatedValues);

    // Gets or creates a {@code T} object for the client to or from {@code clients}.
    template <class T>
    static std::shared_ptr<T> getOrCreateClient(
@@ -217,7 +239,7 @@ class DefaultVehicleHal final : public aidl::android::hardware::automotive::vehi

    static void onPropertyChangeEvent(
            const std::weak_ptr<SubscriptionManager>& subscriptionManager,
            const std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&
            std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&&
                    updatedValues);

    static void onPropertySetErrorEvent(
+3 −4
Original line number Diff line number Diff line
@@ -92,11 +92,10 @@ class SubscriptionManager final {
    // For a list of updated properties, returns a map that maps clients subscribing to
    // the updated properties to a list of updated values. This would only return on-change property
    // clients that should be informed for the given updated values.
    std::unordered_map<
            CallbackType,
            std::vector<const aidl::android::hardware::automotive::vehicle::VehiclePropValue*>>
    std::unordered_map<CallbackType,
                       std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>>
    getSubscribedClients(
            const std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&
            std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&&
                    updatedValues);

    // For a list of set property error events, returns a map that maps clients subscribing to the
+54 −12
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@
#include <utils/Trace.h>

#include <inttypes.h>
#include <chrono>
#include <set>
#include <unordered_set>

@@ -101,12 +102,32 @@ DefaultVehicleHal::DefaultVehicleHal(std::unique_ptr<IVehicleHardware> vehicleHa

    IVehicleHardware* vehicleHardwarePtr = mVehicleHardware.get();
    mSubscriptionManager = std::make_shared<SubscriptionManager>(vehicleHardwarePtr);
    mEventBatchingWindow = mVehicleHardware->getPropertyOnChangeEventBatchingWindow();
    if (mEventBatchingWindow != std::chrono::nanoseconds(0)) {
        mBatchedEventQueue = std::make_shared<ConcurrentQueue<VehiclePropValue>>();
        mPropertyChangeEventsBatchingConsumer =
                std::make_shared<BatchingConsumer<VehiclePropValue>>();
        mPropertyChangeEventsBatchingConsumer->run(
                mBatchedEventQueue.get(), mEventBatchingWindow,
                [this](std::vector<VehiclePropValue> batchedEvents) {
                    handleBatchedPropertyEvents(std::move(batchedEvents));
                });
    }

    std::weak_ptr<ConcurrentQueue<VehiclePropValue>> batchedEventQueueCopy = mBatchedEventQueue;
    std::chrono::nanoseconds eventBatchingWindow = mEventBatchingWindow;
    std::weak_ptr<SubscriptionManager> subscriptionManagerCopy = mSubscriptionManager;
    mVehicleHardware->registerOnPropertyChangeEvent(
            std::make_unique<IVehicleHardware::PropertyChangeCallback>(
                    [subscriptionManagerCopy](std::vector<VehiclePropValue> updatedValues) {
                        onPropertyChangeEvent(subscriptionManagerCopy, updatedValues);
                    [subscriptionManagerCopy, batchedEventQueueCopy,
                     eventBatchingWindow](std::vector<VehiclePropValue> updatedValues) {
                        if (eventBatchingWindow != std::chrono::nanoseconds(0)) {
                            batchPropertyChangeEvent(batchedEventQueueCopy,
                                                     std::move(updatedValues));
                        } else {
                            onPropertyChangeEvent(subscriptionManagerCopy,
                                                  std::move(updatedValues));
                        }
                    }));
    mVehicleHardware->registerOnPropertySetErrorEvent(
            std::make_unique<IVehicleHardware::PropertySetErrorCallback>(
@@ -139,26 +160,47 @@ DefaultVehicleHal::~DefaultVehicleHal() {
    // mRecurrentAction uses pointer to mVehicleHardware, so it has to be unregistered before
    // mVehicleHardware.
    mRecurrentTimer.unregisterTimerCallback(mRecurrentAction);

    if (mBatchedEventQueue) {
        // mPropertyChangeEventsBatchingConsumer uses mSubscriptionManager and mBatchedEventQueue.
        mBatchedEventQueue->deactivate();
        mPropertyChangeEventsBatchingConsumer->requestStop();
        mPropertyChangeEventsBatchingConsumer->waitStopped();
        mPropertyChangeEventsBatchingConsumer.reset();
        mBatchedEventQueue.reset();
    }

    // mSubscriptionManager uses pointer to mVehicleHardware, so it has to be destroyed before
    // mVehicleHardware.
    mSubscriptionManager.reset();
    mVehicleHardware.reset();
}

void DefaultVehicleHal::batchPropertyChangeEvent(
        const std::weak_ptr<ConcurrentQueue<VehiclePropValue>>& batchedEventQueue,
        std::vector<VehiclePropValue>&& updatedValues) {
    auto batchedEventQueueStrong = batchedEventQueue.lock();
    if (batchedEventQueueStrong == nullptr) {
        ALOGW("the batched property events queue is destroyed, DefaultVehicleHal is ending");
        return;
    }
    batchedEventQueueStrong->push(std::move(updatedValues));
}

void DefaultVehicleHal::handleBatchedPropertyEvents(std::vector<VehiclePropValue>&& batchedEvents) {
    onPropertyChangeEvent(mSubscriptionManager, std::move(batchedEvents));
}

void DefaultVehicleHal::onPropertyChangeEvent(
        const std::weak_ptr<SubscriptionManager>& subscriptionManager,
        const std::vector<VehiclePropValue>& updatedValues) {
        std::vector<VehiclePropValue>&& updatedValues) {
    auto manager = subscriptionManager.lock();
    if (manager == nullptr) {
        ALOGW("the SubscriptionManager is destroyed, DefaultVehicleHal is ending");
        return;
    }
    auto updatedValuesByClients = manager->getSubscribedClients(updatedValues);
    for (const auto& [callback, valuePtrs] : updatedValuesByClients) {
        std::vector<VehiclePropValue> values;
        for (const VehiclePropValue* valuePtr : valuePtrs) {
            values.push_back(*valuePtr);
        }
    auto updatedValuesByClients = manager->getSubscribedClients(std::move(updatedValues));
    for (auto& [callback, values] : updatedValuesByClients) {
        SubscriptionClient::sendUpdatedValues(callback, std::move(values));
    }
}
@@ -742,12 +784,12 @@ void DefaultVehicleHal::checkHealth(IVehicleHardware* vehicleHardware,
        return;
    }
    std::vector<VehiclePropValue> values = {{
            .prop = toInt(VehicleProperty::VHAL_HEARTBEAT),
            .areaId = 0,
            .prop = toInt(VehicleProperty::VHAL_HEARTBEAT),
            .status = VehiclePropertyStatus::AVAILABLE,
            .value.int64Values = {uptimeMillis()},
    }};
    onPropertyChangeEvent(subscriptionManager, values);
    onPropertyChangeEvent(subscriptionManager, std::move(values));
    return;
}

Loading