Loading automotive/vehicle/aidl/impl/hardware/include/IVehicleHardware.h +20 −1 Original line number Diff line number Diff line Loading @@ -118,7 +118,7 @@ class IVehicleHardware { virtual aidl::android::hardware::automotive::vehicle::StatusCode checkHealth() = 0; // Register a callback that would be called when there is a property change event from vehicle. // Must only be called once during initialization. // This function must only be called once during initialization. virtual void registerOnPropertyChangeEvent( std::unique_ptr<const PropertyChangeCallback> callback) = 0; Loading @@ -126,6 +126,25 @@ class IVehicleHardware { // vehicle. Must only be called once during initialization. virtual void registerOnPropertySetErrorEvent( std::unique_ptr<const PropertySetErrorCallback> callback) = 0; // Gets the batching window used by DefaultVehicleHal for property change events. // // In DefaultVehicleHal, all the property change events generated within the batching window // will be delivered through one callback to the VHAL client. This affects the maximum supported // subscription rate. For example, if this returns 10ms, then only one callback for property // change events will be called per 10ms, meaining that the max subscription rate for all // continuous properties would be 100hz. // // A higher batching window means less callbacks to the VHAL client, causing a better // performance. However, it also means a longer average latency for every property change // events. // // 0 means no batching should be enabled in DefaultVehicleHal. In this case, batching can // be optionally implemented in IVehicleHardware layer. virtual std::chrono::nanoseconds getPropertyOnChangeEventBatchingWindow() { // By default batching is disabled. return std::chrono::nanoseconds(0); } }; } // namespace vehicle Loading automotive/vehicle/aidl/impl/utils/common/include/ConcurrentQueue.h +76 −0 Original line number Diff line number Diff line Loading @@ -69,6 +69,19 @@ class ConcurrentQueue { mCond.notify_one(); } void push(std::vector<T>&& items) { { std::scoped_lock<std::mutex> lockGuard(mLock); if (!mIsActive) { return; } for (T& item : items) { mQueue.push(std::move(item)); } } mCond.notify_one(); } // Deactivates the queue, thus no one can push items to it, also notifies all waiting thread. // The items already in the queue could still be flushed even after the queue is deactivated. void deactivate() { Loading @@ -92,6 +105,69 @@ class ConcurrentQueue { std::queue<T> mQueue GUARDED_BY(mLock); }; template <typename T> class BatchingConsumer { private: enum class State { INIT = 0, RUNNING = 1, STOP_REQUESTED = 2, STOPPED = 3, }; public: BatchingConsumer() : mState(State::INIT) {} BatchingConsumer(const BatchingConsumer&) = delete; BatchingConsumer& operator=(const BatchingConsumer&) = delete; using OnBatchReceivedFunc = std::function<void(std::vector<T> vec)>; void run(ConcurrentQueue<T>* queue, std::chrono::nanoseconds batchInterval, const OnBatchReceivedFunc& func) { mQueue = queue; mBatchInterval = batchInterval; mWorkerThread = std::thread(&BatchingConsumer<T>::runInternal, this, func); } void requestStop() { mState = State::STOP_REQUESTED; } void waitStopped() { if (mWorkerThread.joinable()) { mWorkerThread.join(); } } private: void runInternal(const OnBatchReceivedFunc& onBatchReceived) { if (mState.exchange(State::RUNNING) == State::INIT) { while (State::RUNNING == mState) { mQueue->waitForItems(); if (State::STOP_REQUESTED == mState) break; std::this_thread::sleep_for(mBatchInterval); if (State::STOP_REQUESTED == mState) break; std::vector<T> items = mQueue->flush(); if (items.size() > 0) { onBatchReceived(std::move(items)); } } } mState = State::STOPPED; } private: std::thread mWorkerThread; std::atomic<State> mState; std::chrono::nanoseconds mBatchInterval; ConcurrentQueue<T>* mQueue; }; } // namespace vehicle } // namespace automotive } // namespace hardware Loading automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h +23 −1 Original line number Diff line number Diff line Loading @@ -144,6 +144,15 @@ class DefaultVehicleHal final : public aidl::android::hardware::automotive::vehi std::shared_ptr<PendingRequestPool> mPendingRequestPool; // SubscriptionManager is thread-safe. std::shared_ptr<SubscriptionManager> mSubscriptionManager; // ConcurrentQueue is thread-safe. std::shared_ptr<ConcurrentQueue<aidl::android::hardware::automotive::vehicle::VehiclePropValue>> mBatchedEventQueue; // BatchingConsumer is thread-safe. std::shared_ptr< BatchingConsumer<aidl::android::hardware::automotive::vehicle::VehiclePropValue>> mPropertyChangeEventsBatchingConsumer; // Only set once during initialization. std::chrono::nanoseconds mEventBatchingWindow; std::mutex mLock; std::unordered_map<const AIBinder*, std::unique_ptr<OnBinderDiedContext>> mOnBinderDiedContexts Loading Loading @@ -209,6 +218,19 @@ class DefaultVehicleHal final : public aidl::android::hardware::automotive::vehi size_t countSubscribeClients(); // Handles the property change events in batch. void handleBatchedPropertyEvents( std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&& batchedEvents); // Puts the property change events into a queue so that they can handled in batch. static void batchPropertyChangeEvent( const std::weak_ptr<ConcurrentQueue< aidl::android::hardware::automotive::vehicle::VehiclePropValue>>& batchedEventQueue, std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&& updatedValues); // Gets or creates a {@code T} object for the client to or from {@code clients}. template <class T> static std::shared_ptr<T> getOrCreateClient( Loading @@ -217,7 +239,7 @@ class DefaultVehicleHal final : public aidl::android::hardware::automotive::vehi static void onPropertyChangeEvent( const std::weak_ptr<SubscriptionManager>& subscriptionManager, const std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>& std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&& updatedValues); static void onPropertySetErrorEvent( Loading automotive/vehicle/aidl/impl/vhal/include/SubscriptionManager.h +3 −4 Original line number Diff line number Diff line Loading @@ -92,11 +92,10 @@ class SubscriptionManager final { // For a list of updated properties, returns a map that maps clients subscribing to // the updated properties to a list of updated values. This would only return on-change property // clients that should be informed for the given updated values. std::unordered_map< CallbackType, std::vector<const aidl::android::hardware::automotive::vehicle::VehiclePropValue*>> std::unordered_map<CallbackType, std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>> getSubscribedClients( const std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>& std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&& updatedValues); // For a list of set property error events, returns a map that maps clients subscribing to the Loading automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp +54 −12 Original line number Diff line number Diff line Loading @@ -32,6 +32,7 @@ #include <utils/Trace.h> #include <inttypes.h> #include <chrono> #include <set> #include <unordered_set> Loading Loading @@ -101,12 +102,32 @@ DefaultVehicleHal::DefaultVehicleHal(std::unique_ptr<IVehicleHardware> vehicleHa IVehicleHardware* vehicleHardwarePtr = mVehicleHardware.get(); mSubscriptionManager = std::make_shared<SubscriptionManager>(vehicleHardwarePtr); mEventBatchingWindow = mVehicleHardware->getPropertyOnChangeEventBatchingWindow(); if (mEventBatchingWindow != std::chrono::nanoseconds(0)) { mBatchedEventQueue = std::make_shared<ConcurrentQueue<VehiclePropValue>>(); mPropertyChangeEventsBatchingConsumer = std::make_shared<BatchingConsumer<VehiclePropValue>>(); mPropertyChangeEventsBatchingConsumer->run( mBatchedEventQueue.get(), mEventBatchingWindow, [this](std::vector<VehiclePropValue> batchedEvents) { handleBatchedPropertyEvents(std::move(batchedEvents)); }); } std::weak_ptr<ConcurrentQueue<VehiclePropValue>> batchedEventQueueCopy = mBatchedEventQueue; std::chrono::nanoseconds eventBatchingWindow = mEventBatchingWindow; std::weak_ptr<SubscriptionManager> subscriptionManagerCopy = mSubscriptionManager; mVehicleHardware->registerOnPropertyChangeEvent( std::make_unique<IVehicleHardware::PropertyChangeCallback>( [subscriptionManagerCopy](std::vector<VehiclePropValue> updatedValues) { onPropertyChangeEvent(subscriptionManagerCopy, updatedValues); [subscriptionManagerCopy, batchedEventQueueCopy, eventBatchingWindow](std::vector<VehiclePropValue> updatedValues) { if (eventBatchingWindow != std::chrono::nanoseconds(0)) { batchPropertyChangeEvent(batchedEventQueueCopy, std::move(updatedValues)); } else { onPropertyChangeEvent(subscriptionManagerCopy, std::move(updatedValues)); } })); mVehicleHardware->registerOnPropertySetErrorEvent( std::make_unique<IVehicleHardware::PropertySetErrorCallback>( Loading Loading @@ -139,26 +160,47 @@ DefaultVehicleHal::~DefaultVehicleHal() { // mRecurrentAction uses pointer to mVehicleHardware, so it has to be unregistered before // mVehicleHardware. mRecurrentTimer.unregisterTimerCallback(mRecurrentAction); if (mBatchedEventQueue) { // mPropertyChangeEventsBatchingConsumer uses mSubscriptionManager and mBatchedEventQueue. mBatchedEventQueue->deactivate(); mPropertyChangeEventsBatchingConsumer->requestStop(); mPropertyChangeEventsBatchingConsumer->waitStopped(); mPropertyChangeEventsBatchingConsumer.reset(); mBatchedEventQueue.reset(); } // mSubscriptionManager uses pointer to mVehicleHardware, so it has to be destroyed before // mVehicleHardware. mSubscriptionManager.reset(); mVehicleHardware.reset(); } void DefaultVehicleHal::batchPropertyChangeEvent( const std::weak_ptr<ConcurrentQueue<VehiclePropValue>>& batchedEventQueue, std::vector<VehiclePropValue>&& updatedValues) { auto batchedEventQueueStrong = batchedEventQueue.lock(); if (batchedEventQueueStrong == nullptr) { ALOGW("the batched property events queue is destroyed, DefaultVehicleHal is ending"); return; } batchedEventQueueStrong->push(std::move(updatedValues)); } void DefaultVehicleHal::handleBatchedPropertyEvents(std::vector<VehiclePropValue>&& batchedEvents) { onPropertyChangeEvent(mSubscriptionManager, std::move(batchedEvents)); } void DefaultVehicleHal::onPropertyChangeEvent( const std::weak_ptr<SubscriptionManager>& subscriptionManager, const std::vector<VehiclePropValue>& updatedValues) { std::vector<VehiclePropValue>&& updatedValues) { auto manager = subscriptionManager.lock(); if (manager == nullptr) { ALOGW("the SubscriptionManager is destroyed, DefaultVehicleHal is ending"); return; } auto updatedValuesByClients = manager->getSubscribedClients(updatedValues); for (const auto& [callback, valuePtrs] : updatedValuesByClients) { std::vector<VehiclePropValue> values; for (const VehiclePropValue* valuePtr : valuePtrs) { values.push_back(*valuePtr); } auto updatedValuesByClients = manager->getSubscribedClients(std::move(updatedValues)); for (auto& [callback, values] : updatedValuesByClients) { SubscriptionClient::sendUpdatedValues(callback, std::move(values)); } } Loading Loading @@ -742,12 +784,12 @@ void DefaultVehicleHal::checkHealth(IVehicleHardware* vehicleHardware, return; } std::vector<VehiclePropValue> values = {{ .prop = toInt(VehicleProperty::VHAL_HEARTBEAT), .areaId = 0, .prop = toInt(VehicleProperty::VHAL_HEARTBEAT), .status = VehiclePropertyStatus::AVAILABLE, .value.int64Values = {uptimeMillis()}, }}; onPropertyChangeEvent(subscriptionManager, values); onPropertyChangeEvent(subscriptionManager, std::move(values)); return; } Loading Loading
automotive/vehicle/aidl/impl/hardware/include/IVehicleHardware.h +20 −1 Original line number Diff line number Diff line Loading @@ -118,7 +118,7 @@ class IVehicleHardware { virtual aidl::android::hardware::automotive::vehicle::StatusCode checkHealth() = 0; // Register a callback that would be called when there is a property change event from vehicle. // Must only be called once during initialization. // This function must only be called once during initialization. virtual void registerOnPropertyChangeEvent( std::unique_ptr<const PropertyChangeCallback> callback) = 0; Loading @@ -126,6 +126,25 @@ class IVehicleHardware { // vehicle. Must only be called once during initialization. virtual void registerOnPropertySetErrorEvent( std::unique_ptr<const PropertySetErrorCallback> callback) = 0; // Gets the batching window used by DefaultVehicleHal for property change events. // // In DefaultVehicleHal, all the property change events generated within the batching window // will be delivered through one callback to the VHAL client. This affects the maximum supported // subscription rate. For example, if this returns 10ms, then only one callback for property // change events will be called per 10ms, meaining that the max subscription rate for all // continuous properties would be 100hz. // // A higher batching window means less callbacks to the VHAL client, causing a better // performance. However, it also means a longer average latency for every property change // events. // // 0 means no batching should be enabled in DefaultVehicleHal. In this case, batching can // be optionally implemented in IVehicleHardware layer. virtual std::chrono::nanoseconds getPropertyOnChangeEventBatchingWindow() { // By default batching is disabled. return std::chrono::nanoseconds(0); } }; } // namespace vehicle Loading
automotive/vehicle/aidl/impl/utils/common/include/ConcurrentQueue.h +76 −0 Original line number Diff line number Diff line Loading @@ -69,6 +69,19 @@ class ConcurrentQueue { mCond.notify_one(); } void push(std::vector<T>&& items) { { std::scoped_lock<std::mutex> lockGuard(mLock); if (!mIsActive) { return; } for (T& item : items) { mQueue.push(std::move(item)); } } mCond.notify_one(); } // Deactivates the queue, thus no one can push items to it, also notifies all waiting thread. // The items already in the queue could still be flushed even after the queue is deactivated. void deactivate() { Loading @@ -92,6 +105,69 @@ class ConcurrentQueue { std::queue<T> mQueue GUARDED_BY(mLock); }; template <typename T> class BatchingConsumer { private: enum class State { INIT = 0, RUNNING = 1, STOP_REQUESTED = 2, STOPPED = 3, }; public: BatchingConsumer() : mState(State::INIT) {} BatchingConsumer(const BatchingConsumer&) = delete; BatchingConsumer& operator=(const BatchingConsumer&) = delete; using OnBatchReceivedFunc = std::function<void(std::vector<T> vec)>; void run(ConcurrentQueue<T>* queue, std::chrono::nanoseconds batchInterval, const OnBatchReceivedFunc& func) { mQueue = queue; mBatchInterval = batchInterval; mWorkerThread = std::thread(&BatchingConsumer<T>::runInternal, this, func); } void requestStop() { mState = State::STOP_REQUESTED; } void waitStopped() { if (mWorkerThread.joinable()) { mWorkerThread.join(); } } private: void runInternal(const OnBatchReceivedFunc& onBatchReceived) { if (mState.exchange(State::RUNNING) == State::INIT) { while (State::RUNNING == mState) { mQueue->waitForItems(); if (State::STOP_REQUESTED == mState) break; std::this_thread::sleep_for(mBatchInterval); if (State::STOP_REQUESTED == mState) break; std::vector<T> items = mQueue->flush(); if (items.size() > 0) { onBatchReceived(std::move(items)); } } } mState = State::STOPPED; } private: std::thread mWorkerThread; std::atomic<State> mState; std::chrono::nanoseconds mBatchInterval; ConcurrentQueue<T>* mQueue; }; } // namespace vehicle } // namespace automotive } // namespace hardware Loading
automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h +23 −1 Original line number Diff line number Diff line Loading @@ -144,6 +144,15 @@ class DefaultVehicleHal final : public aidl::android::hardware::automotive::vehi std::shared_ptr<PendingRequestPool> mPendingRequestPool; // SubscriptionManager is thread-safe. std::shared_ptr<SubscriptionManager> mSubscriptionManager; // ConcurrentQueue is thread-safe. std::shared_ptr<ConcurrentQueue<aidl::android::hardware::automotive::vehicle::VehiclePropValue>> mBatchedEventQueue; // BatchingConsumer is thread-safe. std::shared_ptr< BatchingConsumer<aidl::android::hardware::automotive::vehicle::VehiclePropValue>> mPropertyChangeEventsBatchingConsumer; // Only set once during initialization. std::chrono::nanoseconds mEventBatchingWindow; std::mutex mLock; std::unordered_map<const AIBinder*, std::unique_ptr<OnBinderDiedContext>> mOnBinderDiedContexts Loading Loading @@ -209,6 +218,19 @@ class DefaultVehicleHal final : public aidl::android::hardware::automotive::vehi size_t countSubscribeClients(); // Handles the property change events in batch. void handleBatchedPropertyEvents( std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&& batchedEvents); // Puts the property change events into a queue so that they can handled in batch. static void batchPropertyChangeEvent( const std::weak_ptr<ConcurrentQueue< aidl::android::hardware::automotive::vehicle::VehiclePropValue>>& batchedEventQueue, std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&& updatedValues); // Gets or creates a {@code T} object for the client to or from {@code clients}. template <class T> static std::shared_ptr<T> getOrCreateClient( Loading @@ -217,7 +239,7 @@ class DefaultVehicleHal final : public aidl::android::hardware::automotive::vehi static void onPropertyChangeEvent( const std::weak_ptr<SubscriptionManager>& subscriptionManager, const std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>& std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&& updatedValues); static void onPropertySetErrorEvent( Loading
automotive/vehicle/aidl/impl/vhal/include/SubscriptionManager.h +3 −4 Original line number Diff line number Diff line Loading @@ -92,11 +92,10 @@ class SubscriptionManager final { // For a list of updated properties, returns a map that maps clients subscribing to // the updated properties to a list of updated values. This would only return on-change property // clients that should be informed for the given updated values. std::unordered_map< CallbackType, std::vector<const aidl::android::hardware::automotive::vehicle::VehiclePropValue*>> std::unordered_map<CallbackType, std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>> getSubscribedClients( const std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>& std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue>&& updatedValues); // For a list of set property error events, returns a map that maps clients subscribing to the Loading
automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp +54 −12 Original line number Diff line number Diff line Loading @@ -32,6 +32,7 @@ #include <utils/Trace.h> #include <inttypes.h> #include <chrono> #include <set> #include <unordered_set> Loading Loading @@ -101,12 +102,32 @@ DefaultVehicleHal::DefaultVehicleHal(std::unique_ptr<IVehicleHardware> vehicleHa IVehicleHardware* vehicleHardwarePtr = mVehicleHardware.get(); mSubscriptionManager = std::make_shared<SubscriptionManager>(vehicleHardwarePtr); mEventBatchingWindow = mVehicleHardware->getPropertyOnChangeEventBatchingWindow(); if (mEventBatchingWindow != std::chrono::nanoseconds(0)) { mBatchedEventQueue = std::make_shared<ConcurrentQueue<VehiclePropValue>>(); mPropertyChangeEventsBatchingConsumer = std::make_shared<BatchingConsumer<VehiclePropValue>>(); mPropertyChangeEventsBatchingConsumer->run( mBatchedEventQueue.get(), mEventBatchingWindow, [this](std::vector<VehiclePropValue> batchedEvents) { handleBatchedPropertyEvents(std::move(batchedEvents)); }); } std::weak_ptr<ConcurrentQueue<VehiclePropValue>> batchedEventQueueCopy = mBatchedEventQueue; std::chrono::nanoseconds eventBatchingWindow = mEventBatchingWindow; std::weak_ptr<SubscriptionManager> subscriptionManagerCopy = mSubscriptionManager; mVehicleHardware->registerOnPropertyChangeEvent( std::make_unique<IVehicleHardware::PropertyChangeCallback>( [subscriptionManagerCopy](std::vector<VehiclePropValue> updatedValues) { onPropertyChangeEvent(subscriptionManagerCopy, updatedValues); [subscriptionManagerCopy, batchedEventQueueCopy, eventBatchingWindow](std::vector<VehiclePropValue> updatedValues) { if (eventBatchingWindow != std::chrono::nanoseconds(0)) { batchPropertyChangeEvent(batchedEventQueueCopy, std::move(updatedValues)); } else { onPropertyChangeEvent(subscriptionManagerCopy, std::move(updatedValues)); } })); mVehicleHardware->registerOnPropertySetErrorEvent( std::make_unique<IVehicleHardware::PropertySetErrorCallback>( Loading Loading @@ -139,26 +160,47 @@ DefaultVehicleHal::~DefaultVehicleHal() { // mRecurrentAction uses pointer to mVehicleHardware, so it has to be unregistered before // mVehicleHardware. mRecurrentTimer.unregisterTimerCallback(mRecurrentAction); if (mBatchedEventQueue) { // mPropertyChangeEventsBatchingConsumer uses mSubscriptionManager and mBatchedEventQueue. mBatchedEventQueue->deactivate(); mPropertyChangeEventsBatchingConsumer->requestStop(); mPropertyChangeEventsBatchingConsumer->waitStopped(); mPropertyChangeEventsBatchingConsumer.reset(); mBatchedEventQueue.reset(); } // mSubscriptionManager uses pointer to mVehicleHardware, so it has to be destroyed before // mVehicleHardware. mSubscriptionManager.reset(); mVehicleHardware.reset(); } void DefaultVehicleHal::batchPropertyChangeEvent( const std::weak_ptr<ConcurrentQueue<VehiclePropValue>>& batchedEventQueue, std::vector<VehiclePropValue>&& updatedValues) { auto batchedEventQueueStrong = batchedEventQueue.lock(); if (batchedEventQueueStrong == nullptr) { ALOGW("the batched property events queue is destroyed, DefaultVehicleHal is ending"); return; } batchedEventQueueStrong->push(std::move(updatedValues)); } void DefaultVehicleHal::handleBatchedPropertyEvents(std::vector<VehiclePropValue>&& batchedEvents) { onPropertyChangeEvent(mSubscriptionManager, std::move(batchedEvents)); } void DefaultVehicleHal::onPropertyChangeEvent( const std::weak_ptr<SubscriptionManager>& subscriptionManager, const std::vector<VehiclePropValue>& updatedValues) { std::vector<VehiclePropValue>&& updatedValues) { auto manager = subscriptionManager.lock(); if (manager == nullptr) { ALOGW("the SubscriptionManager is destroyed, DefaultVehicleHal is ending"); return; } auto updatedValuesByClients = manager->getSubscribedClients(updatedValues); for (const auto& [callback, valuePtrs] : updatedValuesByClients) { std::vector<VehiclePropValue> values; for (const VehiclePropValue* valuePtr : valuePtrs) { values.push_back(*valuePtr); } auto updatedValuesByClients = manager->getSubscribedClients(std::move(updatedValues)); for (auto& [callback, values] : updatedValuesByClients) { SubscriptionClient::sendUpdatedValues(callback, std::move(values)); } } Loading Loading @@ -742,12 +784,12 @@ void DefaultVehicleHal::checkHealth(IVehicleHardware* vehicleHardware, return; } std::vector<VehiclePropValue> values = {{ .prop = toInt(VehicleProperty::VHAL_HEARTBEAT), .areaId = 0, .prop = toInt(VehicleProperty::VHAL_HEARTBEAT), .status = VehiclePropertyStatus::AVAILABLE, .value.int64Values = {uptimeMillis()}, }}; onPropertyChangeEvent(subscriptionManager, values); onPropertyChangeEvent(subscriptionManager, std::move(values)); return; } Loading