Loading automotive/vehicle/aidl/impl/vhal/include/ConnectedClient.h +9 −19 Original line number Diff line number Diff line Loading @@ -42,10 +42,10 @@ namespace vehicle { // This class is thread-safe. class ConnectedClient { public: ConnectedClient( std::shared_ptr<PendingRequestPool> requestPool, std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> callback); using CallbackType = std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>; ConnectedClient(std::shared_ptr<PendingRequestPool> requestPool, CallbackType callback); virtual ~ConnectedClient() = default; Loading @@ -68,8 +68,7 @@ class ConnectedClient { virtual std::shared_ptr<const PendingRequestPool::TimeoutCallbackFunc> getTimeoutCallback() = 0; const std::shared_ptr<PendingRequestPool> mRequestPool; const std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> mCallback; const CallbackType mCallback; }; // A class to represent a client that calls {@code IVehicle.setValues} or {@code Loading @@ -77,10 +76,7 @@ class ConnectedClient { template <class ResultType, class ResultsType> class GetSetValuesClient final : public ConnectedClient { public: GetSetValuesClient( std::shared_ptr<PendingRequestPool> requestPool, std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> callback); GetSetValuesClient(std::shared_ptr<PendingRequestPool> requestPool, CallbackType callback); // Sends the results to this client. void sendResults(const std::vector<ResultType>& results); Loading @@ -105,10 +101,7 @@ class GetSetValuesClient final : public ConnectedClient { // A class to represent a client that calls {@code IVehicle.subscribe}. class SubscriptionClient final : public ConnectedClient { public: SubscriptionClient( std::shared_ptr<PendingRequestPool> requestPool, std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> callback); SubscriptionClient(std::shared_ptr<PendingRequestPool> requestPool, CallbackType callback); // Gets the callback to be called when the request for this client has finished. std::shared_ptr<const IVehicleHardware::GetValuesCallback> getResultCallback(); Loading @@ -116,8 +109,7 @@ class SubscriptionClient final : public ConnectedClient { // Marshals the updated values into largeParcelable and sents it through {@code onPropertyEvent} // callback. static void sendUpdatedValues( std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> callback, CallbackType callback, std::vector<::aidl::android::hardware::automotive::vehicle::VehiclePropValue>&& updatedValues); Loading @@ -132,9 +124,7 @@ class SubscriptionClient final : public ConnectedClient { std::shared_ptr<const IVehicleHardware::PropertyChangeCallback> mPropertyChangeCallback; static void onGetValueResults( const void* clientId, std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> callback, const void* clientId, CallbackType callback, std::shared_ptr<PendingRequestPool> requestPool, std::vector<::aidl::android::hardware::automotive::vehicle::GetValueResult> results); }; Loading automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h +52 −6 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve explicit DefaultVehicleHal(std::unique_ptr<IVehicleHardware> hardware); ~DefaultVehicleHal() = default; ~DefaultVehicleHal(); ::ndk::ScopedAStatus getAllPropConfigs( ::aidl::android::hardware::automotive::vehicle::VehiclePropConfigs* returnConfigs) Loading Loading @@ -101,7 +101,7 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve private: std::mutex mLock; std::unordered_map<CallbackType, int64_t> mIds GUARDED_BY(mLock); std::unordered_map<const AIBinder*, int64_t> mIds GUARDED_BY(mLock); }; // A thread safe class to store all subscribe clients. This class is safe to pass to async Loading @@ -112,16 +112,42 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve std::shared_ptr<SubscriptionClient> getClient(const CallbackType& callback); void removeClient(const AIBinder* clientId); size_t countClients(); private: std::mutex mLock; std::unordered_map<CallbackType, std::shared_ptr<SubscriptionClient>> mClients std::unordered_map<const AIBinder*, std::shared_ptr<SubscriptionClient>> mClients GUARDED_BY(mLock); // PendingRequestPool is thread-safe. std::shared_ptr<PendingRequestPool> mPendingRequestPool; }; // A wrapper for linkToDeath to enable stubbing for test. class ILinkToDeath { public: virtual ~ILinkToDeath() = default; virtual binder_status_t linkToDeath(AIBinder* binder, AIBinder_DeathRecipient* recipient, void* cookie) = 0; }; // A real implementation for ILinkToDeath. class AIBinderLinkToDeathImpl final : public ILinkToDeath { public: binder_status_t linkToDeath(AIBinder* binder, AIBinder_DeathRecipient* recipient, void* cookie) override; }; // OnBinderDiedContext is a type used as a cookie passed deathRecipient. The deathRecipient's // onBinderDied function takes only a cookie as input and we have to store all the contexts // as the cookie. struct OnBinderDiedContext { DefaultVehicleHal* vhal; const AIBinder* clientId; }; // The default timeout of get or set value requests is 30s. // TODO(b/214605968): define TIMEOUT_IN_NANO in IVehicle and allow getValues/setValues/subscribe // to specify custom timeouts. Loading @@ -142,15 +168,22 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve std::shared_ptr<SubscriptionManager> mSubscriptionManager; std::mutex mLock; std::unordered_map<CallbackType, std::shared_ptr<GetValuesClient>> mGetValuesClients std::unordered_map<const AIBinder*, std::unique_ptr<OnBinderDiedContext>> mOnBinderDiedContexts GUARDED_BY(mLock); std::unordered_map<CallbackType, std::shared_ptr<SetValuesClient>> mSetValuesClients std::unordered_map<const AIBinder*, std::shared_ptr<GetValuesClient>> mGetValuesClients GUARDED_BY(mLock); std::unordered_map<const AIBinder*, std::shared_ptr<SetValuesClient>> mSetValuesClients GUARDED_BY(mLock); // SubscriptionClients is thread-safe. std::shared_ptr<SubscriptionClients> mSubscriptionClients; // mLinkToDeathImpl is only going to be changed in test. std::unique_ptr<ILinkToDeath> mLinkToDeathImpl; // RecurrentTimer is thread-safe. RecurrentTimer mRecurrentTimer; ::ndk::ScopedAIBinder_DeathRecipient mDeathRecipient; ::android::base::Result<void> checkProperty( const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue& propValue); Loading @@ -176,9 +209,15 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve const ::aidl::android::hardware::automotive::vehicle::VehiclePropConfig*> getConfig(int32_t propId) const; void onBinderDiedWithContext(const AIBinder* clientId); void onBinderUnlinkedWithContext(const AIBinder* clientId); void monitorBinderLifeCycle(const CallbackType& callback); template <class T> static std::shared_ptr<T> getOrCreateClient( std::unordered_map<CallbackType, std::shared_ptr<T>>* clients, std::unordered_map<const AIBinder*, std::shared_ptr<T>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool); static void getValueFromHardwareCallCallback( Loading @@ -195,9 +234,16 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve static void checkHealth(std::weak_ptr<IVehicleHardware> hardware, std::weak_ptr<SubscriptionManager> subscriptionManager); static void onBinderDied(void* cookie); static void onBinderUnlinked(void* cookie); // Test-only // Set the default timeout for pending requests. void setTimeout(int64_t timeoutInNano); // Test-only void setLinkToDeathImpl(std::unique_ptr<ILinkToDeath> impl); }; } // namespace vehicle Loading automotive/vehicle/aidl/impl/vhal/include/SubscriptionManager.h +19 −11 Original line number Diff line number Diff line Loading @@ -38,6 +38,7 @@ namespace vehicle { // A thread-safe subscription manager that manages all VHAL subscriptions. class SubscriptionManager final { public: using ClientIdType = const AIBinder*; using CallbackType = std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>; using GetValueFunc = std::function<void( Loading @@ -59,24 +60,24 @@ class SubscriptionManager final { options, bool isContinuousProperty); // Unsubscribes from the properties for the callback. // Returns error if the callback was not subscribed before or one of the given property was not // Unsubscribes from the properties for the client. // Returns error if the client was not subscribed before or one of the given property was not // subscribed. If error is returned, no property would be unsubscribed. // Returns ok if all the requested properties for the callback are unsubscribed. ::android::base::Result<void> unsubscribe(const CallbackType& callback, // Returns ok if all the requested properties for the client are unsubscribed. ::android::base::Result<void> unsubscribe(ClientIdType client, const std::vector<int32_t>& propIds); // Unsubscribes to all the properties for the callback. // Returns error if the callback was not subscribed before. If error is returned, no property // Unsubscribes from all the properties for the client. // Returns error if the client was not subscribed before. If error is returned, no property // would be unsubscribed. // Returns ok if all the properties for the callback are unsubscribed. ::android::base::Result<void> unsubscribe(const CallbackType& callback); // Returns ok if all the properties for the client are unsubscribed. ::android::base::Result<void> unsubscribe(ClientIdType client); // For a list of updated properties, returns a map that maps clients subscribing to // the updated properties to a list of updated values. This would only return on-change property // clients that should be informed for the given updated values. std::unordered_map< std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>, CallbackType, std::vector<const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue*>> getSubscribedClients( const std::vector<::aidl::android::hardware::automotive::vehicle::VehiclePropValue>& Loading @@ -86,6 +87,9 @@ class SubscriptionManager final { static bool checkSampleRate(float sampleRate); private: // Friend class for testing. friend class DefaultVehicleHalTest; struct PropIdAreaId { int32_t propId; int32_t areaId; Loading Loading @@ -131,9 +135,10 @@ class SubscriptionManager final { }; mutable std::mutex mLock; std::unordered_map<PropIdAreaId, std::unordered_set<CallbackType>, PropIdAreaIdHash> std::unordered_map<PropIdAreaId, std::unordered_map<ClientIdType, CallbackType>, PropIdAreaIdHash> mClientsByPropIdArea GUARDED_BY(mLock); std::unordered_map<CallbackType, std::unordered_map<PropIdAreaId, std::unique_ptr<Subscription>, std::unordered_map<ClientIdType, std::unordered_map<PropIdAreaId, std::unique_ptr<Subscription>, PropIdAreaIdHash>> mSubscriptionsByClient GUARDED_BY(mLock); // RecurrentTimer is thread-safe. Loading @@ -141,6 +146,9 @@ class SubscriptionManager final { const GetValueFunc mGetValue; static ::android::base::Result<int64_t> getInterval(float sampleRate); // Checks whether the manager is empty. For testing purpose. bool isEmpty(); }; } // namespace vehicle Loading automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp +91 −12 Original line number Diff line number Diff line Loading @@ -42,7 +42,6 @@ using ::aidl::android::hardware::automotive::vehicle::GetValueRequest; using ::aidl::android::hardware::automotive::vehicle::GetValueRequests; using ::aidl::android::hardware::automotive::vehicle::GetValueResult; using ::aidl::android::hardware::automotive::vehicle::GetValueResults; using ::aidl::android::hardware::automotive::vehicle::IVehicleCallback; using ::aidl::android::hardware::automotive::vehicle::SetValueRequest; using ::aidl::android::hardware::automotive::vehicle::SetValueRequests; using ::aidl::android::hardware::automotive::vehicle::SetValueResult; Loading @@ -62,6 +61,8 @@ using ::android::base::Error; using ::android::base::expected; using ::android::base::Result; using ::android::base::StringPrintf; using ::ndk::ScopedAIBinder_DeathRecipient; using ::ndk::ScopedAStatus; std::string toString(const std::unordered_set<int64_t>& values) { Loading @@ -86,10 +87,15 @@ std::shared_ptr<SubscriptionClient> DefaultVehicleHal::SubscriptionClients::getC int64_t DefaultVehicleHal::SubscribeIdByClient::getId(const CallbackType& callback) { std::scoped_lock<std::mutex> lockGuard(mLock); // This would be initialized to 0 if callback does not exist in the map. int64_t subscribeId = (mIds[callback])++; int64_t subscribeId = (mIds[callback->asBinder().get()])++; return subscribeId; } void DefaultVehicleHal::SubscriptionClients::removeClient(const AIBinder* clientId) { std::scoped_lock<std::mutex> lockGuard(mLock); mClients.erase(clientId); } size_t DefaultVehicleHal::SubscriptionClients::countClients() { std::scoped_lock<std::mutex> lockGuard(mLock); return mClients.size(); Loading Loading @@ -139,6 +145,17 @@ DefaultVehicleHal::DefaultVehicleHal(std::unique_ptr<IVehicleHardware> hardware) std::make_shared<std::function<void()>>([hardwareCopy, subscriptionManagerCopy]() { checkHealth(hardwareCopy, subscriptionManagerCopy); })); mLinkToDeathImpl = std::make_unique<AIBinderLinkToDeathImpl>(); mDeathRecipient = ScopedAIBinder_DeathRecipient( AIBinder_DeathRecipient_new(&DefaultVehicleHal::onBinderDied)); AIBinder_DeathRecipient_setOnUnlinked(mDeathRecipient.get(), &DefaultVehicleHal::onBinderUnlinked); } DefaultVehicleHal::~DefaultVehicleHal() { // Delete the deathRecipient so that onBinderDied would not be called to reference 'this'. mDeathRecipient = ScopedAIBinder_DeathRecipient(); } void DefaultVehicleHal::onPropertyChangeEvent( Loading @@ -161,26 +178,73 @@ void DefaultVehicleHal::onPropertyChangeEvent( template <class T> std::shared_ptr<T> DefaultVehicleHal::getOrCreateClient( std::unordered_map<CallbackType, std::shared_ptr<T>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool) { if (clients->find(callback) == clients->end()) { // TODO(b/204943359): Remove client from clients when linkToDeath is implemented. (*clients)[callback] = std::make_shared<T>(pendingRequestPool, callback); std::unordered_map<const AIBinder*, std::shared_ptr<T>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool) { const AIBinder* clientId = callback->asBinder().get(); if (clients->find(clientId) == clients->end()) { (*clients)[clientId] = std::make_shared<T>(pendingRequestPool, callback); } return (*clients)[clientId]; } void DefaultVehicleHal::monitorBinderLifeCycle(const CallbackType& callback) { AIBinder* clientId = callback->asBinder().get(); { std::scoped_lock<std::mutex> lockGuard(mLock); if (mOnBinderDiedContexts.find(clientId) != mOnBinderDiedContexts.end()) { // Already registered. return; } } std::unique_ptr<OnBinderDiedContext> context = std::make_unique<OnBinderDiedContext>( OnBinderDiedContext{.vhal = this, .clientId = clientId}); binder_status_t status = mLinkToDeathImpl->linkToDeath(clientId, mDeathRecipient.get(), static_cast<void*>(context.get())); if (status == STATUS_OK) { std::scoped_lock<std::mutex> lockGuard(mLock); // Insert into a map to keep the context object alive. mOnBinderDiedContexts[clientId] = std::move(context); } else { ALOGE("failed to call linkToDeath on client binder, status: %d", static_cast<int>(status)); } return (*clients)[callback]; } void DefaultVehicleHal::onBinderDied(void* cookie) { OnBinderDiedContext* context = reinterpret_cast<OnBinderDiedContext*>(cookie); context->vhal->onBinderDiedWithContext(context->clientId); } void DefaultVehicleHal::onBinderDiedWithContext(const AIBinder* clientId) { std::scoped_lock<std::mutex> lockGuard(mLock); mSetValuesClients.erase(clientId); mGetValuesClients.erase(clientId); mSubscriptionClients->removeClient(clientId); mSubscriptionManager->unsubscribe(clientId); } void DefaultVehicleHal::onBinderUnlinked(void* cookie) { // Delete the context associated with this cookie. OnBinderDiedContext* context = reinterpret_cast<OnBinderDiedContext*>(cookie); context->vhal->onBinderUnlinkedWithContext(context->clientId); } void DefaultVehicleHal::onBinderUnlinkedWithContext(const AIBinder* clientId) { std::scoped_lock<std::mutex> lockGuard(mLock); mOnBinderDiedContexts.erase(clientId); } template std::shared_ptr<DefaultVehicleHal::GetValuesClient> DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::GetValuesClient>( std::unordered_map<CallbackType, std::shared_ptr<GetValuesClient>>* clients, std::unordered_map<const AIBinder*, std::shared_ptr<GetValuesClient>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool); template std::shared_ptr<DefaultVehicleHal::SetValuesClient> DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::SetValuesClient>( std::unordered_map<CallbackType, std::shared_ptr<SetValuesClient>>* clients, std::unordered_map<const AIBinder*, std::shared_ptr<SetValuesClient>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool); template std::shared_ptr<SubscriptionClient> DefaultVehicleHal::getOrCreateClient<SubscriptionClient>( std::unordered_map<CallbackType, std::shared_ptr<SubscriptionClient>>* clients, std::unordered_map<const AIBinder*, std::shared_ptr<SubscriptionClient>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool); void DefaultVehicleHal::getValueFromHardwareCallCallback( Loading Loading @@ -268,6 +332,8 @@ Result<void> DefaultVehicleHal::checkProperty(const VehiclePropValue& propValue) ScopedAStatus DefaultVehicleHal::getValues(const CallbackType& callback, const GetValueRequests& requests) { monitorBinderLifeCycle(callback); expected<LargeParcelableBase::BorrowedOwnedObject<GetValueRequests>, ScopedAStatus> deserializedResults = fromStableLargeParcelable(requests); if (!deserializedResults.ok()) { Loading Loading @@ -344,6 +410,8 @@ ScopedAStatus DefaultVehicleHal::getValues(const CallbackType& callback, ScopedAStatus DefaultVehicleHal::setValues(const CallbackType& callback, const SetValueRequests& requests) { monitorBinderLifeCycle(callback); expected<LargeParcelableBase::BorrowedOwnedObject<SetValueRequests>, ScopedAStatus> deserializedResults = fromStableLargeParcelable(requests); if (!deserializedResults.ok()) { Loading Loading @@ -526,6 +594,8 @@ Result<void> DefaultVehicleHal::checkSubscribeOptions( ScopedAStatus DefaultVehicleHal::subscribe(const CallbackType& callback, const std::vector<SubscribeOptions>& options, [[maybe_unused]] int32_t maxSharedMemoryFileCount) { monitorBinderLifeCycle(callback); // TODO(b/205189110): Use shared memory file count. if (auto result = checkSubscribeOptions(options); !result.ok()) { ALOGE("subscribe: invalid subscribe options: %s", getErrorMsg(result).c_str()); Loading Loading @@ -571,7 +641,7 @@ ScopedAStatus DefaultVehicleHal::subscribe(const CallbackType& callback, ScopedAStatus DefaultVehicleHal::unsubscribe(const CallbackType& callback, const std::vector<int32_t>& propIds) { return toScopedAStatus(mSubscriptionManager->unsubscribe(callback, propIds), return toScopedAStatus(mSubscriptionManager->unsubscribe(callback->asBinder().get(), propIds), StatusCode::INVALID_ARG); } Loading Loading @@ -639,6 +709,15 @@ void DefaultVehicleHal::checkHealth(std::weak_ptr<IVehicleHardware> hardware, return; } binder_status_t DefaultVehicleHal::AIBinderLinkToDeathImpl::linkToDeath( AIBinder* binder, AIBinder_DeathRecipient* recipient, void* cookie) { return AIBinder_linkToDeath(binder, recipient, cookie); } void DefaultVehicleHal::setLinkToDeathImpl(std::unique_ptr<ILinkToDeath> impl) { mLinkToDeathImpl = std::move(impl); } } // namespace vehicle } // namespace automotive } // namespace hardware Loading automotive/vehicle/aidl/impl/vhal/src/SubscriptionManager.cpp +24 −18 Original line number Diff line number Diff line Loading @@ -26,7 +26,7 @@ namespace vehicle { namespace { constexpr float ONE_SECOND_IN_NANO = 1000000000.; constexpr float ONE_SECOND_IN_NANO = 1'000'000'000.; } // namespace Loading Loading @@ -100,6 +100,7 @@ Result<void> SubscriptionManager::subscribe(const std::shared_ptr<IVehicleCallba } size_t intervalIndex = 0; ClientIdType clientId = callback->asBinder().get(); for (const auto& option : options) { int32_t propId = option.propId; const std::vector<int32_t>& areaIds = option.areaIds; Loading @@ -118,7 +119,7 @@ Result<void> SubscriptionManager::subscribe(const std::shared_ptr<IVehicleCallba .prop = propId, .areaId = areaId, }; mSubscriptionsByClient[callback][propIdAreaId] = mSubscriptionsByClient[clientId][propIdAreaId] = std::make_unique<RecurrentSubscription>( mTimer, [this, callback, propValueRequest] { Loading @@ -126,24 +127,24 @@ Result<void> SubscriptionManager::subscribe(const std::shared_ptr<IVehicleCallba }, interval); } else { mSubscriptionsByClient[callback][propIdAreaId] = mSubscriptionsByClient[clientId][propIdAreaId] = std::make_unique<OnChangeSubscription>(); } mClientsByPropIdArea[propIdAreaId].insert(callback); mClientsByPropIdArea[propIdAreaId][clientId] = callback; } } return {}; } Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCallback>& callback, Result<void> SubscriptionManager::unsubscribe(SubscriptionManager::ClientIdType clientId, const std::vector<int32_t>& propIds) { std::scoped_lock<std::mutex> lockGuard(mLock); if (mSubscriptionsByClient.find(callback) == mSubscriptionsByClient.end()) { if (mSubscriptionsByClient.find(clientId) == mSubscriptionsByClient.end()) { return Error() << "No property was subscribed for the callback"; } std::unordered_set<int32_t> subscribedPropIds; for (auto const& [propIdAreaId, _] : mSubscriptionsByClient[callback]) { for (auto const& [propIdAreaId, _] : mSubscriptionsByClient[clientId]) { subscribedPropIds.insert(propIdAreaId.propId); } Loading @@ -153,13 +154,13 @@ Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCall } } auto& subscriptions = mSubscriptionsByClient[callback]; auto& subscriptions = mSubscriptionsByClient[clientId]; auto it = subscriptions.begin(); while (it != subscriptions.end()) { int32_t propId = it->first.propId; if (std::find(propIds.begin(), propIds.end(), propId) != propIds.end()) { auto& clients = mClientsByPropIdArea[it->first]; clients.erase(callback); clients.erase(clientId); if (clients.empty()) { mClientsByPropIdArea.erase(it->first); } Loading @@ -169,27 +170,27 @@ Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCall } } if (subscriptions.empty()) { mSubscriptionsByClient.erase(callback); mSubscriptionsByClient.erase(clientId); } return {}; } Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCallback>& callback) { Result<void> SubscriptionManager::unsubscribe(SubscriptionManager::ClientIdType clientId) { std::scoped_lock<std::mutex> lockGuard(mLock); if (mSubscriptionsByClient.find(callback) == mSubscriptionsByClient.end()) { return Error() << "No property was subscribed for the callback"; if (mSubscriptionsByClient.find(clientId) == mSubscriptionsByClient.end()) { return Error() << "No property was subscribed for this client"; } auto& subscriptions = mSubscriptionsByClient[callback]; auto& subscriptions = mSubscriptionsByClient[clientId]; for (auto const& [propIdAreaId, _] : subscriptions) { auto& clients = mClientsByPropIdArea[propIdAreaId]; clients.erase(callback); clients.erase(clientId); if (clients.empty()) { mClientsByPropIdArea.erase(propIdAreaId); } } mSubscriptionsByClient.erase(callback); mSubscriptionsByClient.erase(clientId); return {}; } Loading @@ -207,8 +208,8 @@ SubscriptionManager::getSubscribedClients(const std::vector<VehiclePropValue>& u if (mClientsByPropIdArea.find(propIdAreaId) == mClientsByPropIdArea.end()) { continue; } for (const auto& client : mClientsByPropIdArea[propIdAreaId]) { if (!mSubscriptionsByClient[client][propIdAreaId]->isOnChange()) { for (const auto& [clientId, client] : mClientsByPropIdArea[propIdAreaId]) { if (!mSubscriptionsByClient[clientId][propIdAreaId]->isOnChange()) { continue; } clients[client].push_back(&value); Loading @@ -217,6 +218,11 @@ SubscriptionManager::getSubscribedClients(const std::vector<VehiclePropValue>& u return clients; } bool SubscriptionManager::isEmpty() { std::scoped_lock<std::mutex> lockGuard(mLock); return mSubscriptionsByClient.empty() && mClientsByPropIdArea.empty(); } SubscriptionManager::RecurrentSubscription::RecurrentSubscription( std::shared_ptr<RecurrentTimer> timer, std::function<void()>&& action, int64_t interval) : mAction(std::make_shared<std::function<void()>>(action)), mTimer(timer) { Loading Loading
automotive/vehicle/aidl/impl/vhal/include/ConnectedClient.h +9 −19 Original line number Diff line number Diff line Loading @@ -42,10 +42,10 @@ namespace vehicle { // This class is thread-safe. class ConnectedClient { public: ConnectedClient( std::shared_ptr<PendingRequestPool> requestPool, std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> callback); using CallbackType = std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>; ConnectedClient(std::shared_ptr<PendingRequestPool> requestPool, CallbackType callback); virtual ~ConnectedClient() = default; Loading @@ -68,8 +68,7 @@ class ConnectedClient { virtual std::shared_ptr<const PendingRequestPool::TimeoutCallbackFunc> getTimeoutCallback() = 0; const std::shared_ptr<PendingRequestPool> mRequestPool; const std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> mCallback; const CallbackType mCallback; }; // A class to represent a client that calls {@code IVehicle.setValues} or {@code Loading @@ -77,10 +76,7 @@ class ConnectedClient { template <class ResultType, class ResultsType> class GetSetValuesClient final : public ConnectedClient { public: GetSetValuesClient( std::shared_ptr<PendingRequestPool> requestPool, std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> callback); GetSetValuesClient(std::shared_ptr<PendingRequestPool> requestPool, CallbackType callback); // Sends the results to this client. void sendResults(const std::vector<ResultType>& results); Loading @@ -105,10 +101,7 @@ class GetSetValuesClient final : public ConnectedClient { // A class to represent a client that calls {@code IVehicle.subscribe}. class SubscriptionClient final : public ConnectedClient { public: SubscriptionClient( std::shared_ptr<PendingRequestPool> requestPool, std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> callback); SubscriptionClient(std::shared_ptr<PendingRequestPool> requestPool, CallbackType callback); // Gets the callback to be called when the request for this client has finished. std::shared_ptr<const IVehicleHardware::GetValuesCallback> getResultCallback(); Loading @@ -116,8 +109,7 @@ class SubscriptionClient final : public ConnectedClient { // Marshals the updated values into largeParcelable and sents it through {@code onPropertyEvent} // callback. static void sendUpdatedValues( std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> callback, CallbackType callback, std::vector<::aidl::android::hardware::automotive::vehicle::VehiclePropValue>&& updatedValues); Loading @@ -132,9 +124,7 @@ class SubscriptionClient final : public ConnectedClient { std::shared_ptr<const IVehicleHardware::PropertyChangeCallback> mPropertyChangeCallback; static void onGetValueResults( const void* clientId, std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback> callback, const void* clientId, CallbackType callback, std::shared_ptr<PendingRequestPool> requestPool, std::vector<::aidl::android::hardware::automotive::vehicle::GetValueResult> results); }; Loading
automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h +52 −6 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve explicit DefaultVehicleHal(std::unique_ptr<IVehicleHardware> hardware); ~DefaultVehicleHal() = default; ~DefaultVehicleHal(); ::ndk::ScopedAStatus getAllPropConfigs( ::aidl::android::hardware::automotive::vehicle::VehiclePropConfigs* returnConfigs) Loading Loading @@ -101,7 +101,7 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve private: std::mutex mLock; std::unordered_map<CallbackType, int64_t> mIds GUARDED_BY(mLock); std::unordered_map<const AIBinder*, int64_t> mIds GUARDED_BY(mLock); }; // A thread safe class to store all subscribe clients. This class is safe to pass to async Loading @@ -112,16 +112,42 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve std::shared_ptr<SubscriptionClient> getClient(const CallbackType& callback); void removeClient(const AIBinder* clientId); size_t countClients(); private: std::mutex mLock; std::unordered_map<CallbackType, std::shared_ptr<SubscriptionClient>> mClients std::unordered_map<const AIBinder*, std::shared_ptr<SubscriptionClient>> mClients GUARDED_BY(mLock); // PendingRequestPool is thread-safe. std::shared_ptr<PendingRequestPool> mPendingRequestPool; }; // A wrapper for linkToDeath to enable stubbing for test. class ILinkToDeath { public: virtual ~ILinkToDeath() = default; virtual binder_status_t linkToDeath(AIBinder* binder, AIBinder_DeathRecipient* recipient, void* cookie) = 0; }; // A real implementation for ILinkToDeath. class AIBinderLinkToDeathImpl final : public ILinkToDeath { public: binder_status_t linkToDeath(AIBinder* binder, AIBinder_DeathRecipient* recipient, void* cookie) override; }; // OnBinderDiedContext is a type used as a cookie passed deathRecipient. The deathRecipient's // onBinderDied function takes only a cookie as input and we have to store all the contexts // as the cookie. struct OnBinderDiedContext { DefaultVehicleHal* vhal; const AIBinder* clientId; }; // The default timeout of get or set value requests is 30s. // TODO(b/214605968): define TIMEOUT_IN_NANO in IVehicle and allow getValues/setValues/subscribe // to specify custom timeouts. Loading @@ -142,15 +168,22 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve std::shared_ptr<SubscriptionManager> mSubscriptionManager; std::mutex mLock; std::unordered_map<CallbackType, std::shared_ptr<GetValuesClient>> mGetValuesClients std::unordered_map<const AIBinder*, std::unique_ptr<OnBinderDiedContext>> mOnBinderDiedContexts GUARDED_BY(mLock); std::unordered_map<CallbackType, std::shared_ptr<SetValuesClient>> mSetValuesClients std::unordered_map<const AIBinder*, std::shared_ptr<GetValuesClient>> mGetValuesClients GUARDED_BY(mLock); std::unordered_map<const AIBinder*, std::shared_ptr<SetValuesClient>> mSetValuesClients GUARDED_BY(mLock); // SubscriptionClients is thread-safe. std::shared_ptr<SubscriptionClients> mSubscriptionClients; // mLinkToDeathImpl is only going to be changed in test. std::unique_ptr<ILinkToDeath> mLinkToDeathImpl; // RecurrentTimer is thread-safe. RecurrentTimer mRecurrentTimer; ::ndk::ScopedAIBinder_DeathRecipient mDeathRecipient; ::android::base::Result<void> checkProperty( const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue& propValue); Loading @@ -176,9 +209,15 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve const ::aidl::android::hardware::automotive::vehicle::VehiclePropConfig*> getConfig(int32_t propId) const; void onBinderDiedWithContext(const AIBinder* clientId); void onBinderUnlinkedWithContext(const AIBinder* clientId); void monitorBinderLifeCycle(const CallbackType& callback); template <class T> static std::shared_ptr<T> getOrCreateClient( std::unordered_map<CallbackType, std::shared_ptr<T>>* clients, std::unordered_map<const AIBinder*, std::shared_ptr<T>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool); static void getValueFromHardwareCallCallback( Loading @@ -195,9 +234,16 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve static void checkHealth(std::weak_ptr<IVehicleHardware> hardware, std::weak_ptr<SubscriptionManager> subscriptionManager); static void onBinderDied(void* cookie); static void onBinderUnlinked(void* cookie); // Test-only // Set the default timeout for pending requests. void setTimeout(int64_t timeoutInNano); // Test-only void setLinkToDeathImpl(std::unique_ptr<ILinkToDeath> impl); }; } // namespace vehicle Loading
automotive/vehicle/aidl/impl/vhal/include/SubscriptionManager.h +19 −11 Original line number Diff line number Diff line Loading @@ -38,6 +38,7 @@ namespace vehicle { // A thread-safe subscription manager that manages all VHAL subscriptions. class SubscriptionManager final { public: using ClientIdType = const AIBinder*; using CallbackType = std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>; using GetValueFunc = std::function<void( Loading @@ -59,24 +60,24 @@ class SubscriptionManager final { options, bool isContinuousProperty); // Unsubscribes from the properties for the callback. // Returns error if the callback was not subscribed before or one of the given property was not // Unsubscribes from the properties for the client. // Returns error if the client was not subscribed before or one of the given property was not // subscribed. If error is returned, no property would be unsubscribed. // Returns ok if all the requested properties for the callback are unsubscribed. ::android::base::Result<void> unsubscribe(const CallbackType& callback, // Returns ok if all the requested properties for the client are unsubscribed. ::android::base::Result<void> unsubscribe(ClientIdType client, const std::vector<int32_t>& propIds); // Unsubscribes to all the properties for the callback. // Returns error if the callback was not subscribed before. If error is returned, no property // Unsubscribes from all the properties for the client. // Returns error if the client was not subscribed before. If error is returned, no property // would be unsubscribed. // Returns ok if all the properties for the callback are unsubscribed. ::android::base::Result<void> unsubscribe(const CallbackType& callback); // Returns ok if all the properties for the client are unsubscribed. ::android::base::Result<void> unsubscribe(ClientIdType client); // For a list of updated properties, returns a map that maps clients subscribing to // the updated properties to a list of updated values. This would only return on-change property // clients that should be informed for the given updated values. std::unordered_map< std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>, CallbackType, std::vector<const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue*>> getSubscribedClients( const std::vector<::aidl::android::hardware::automotive::vehicle::VehiclePropValue>& Loading @@ -86,6 +87,9 @@ class SubscriptionManager final { static bool checkSampleRate(float sampleRate); private: // Friend class for testing. friend class DefaultVehicleHalTest; struct PropIdAreaId { int32_t propId; int32_t areaId; Loading Loading @@ -131,9 +135,10 @@ class SubscriptionManager final { }; mutable std::mutex mLock; std::unordered_map<PropIdAreaId, std::unordered_set<CallbackType>, PropIdAreaIdHash> std::unordered_map<PropIdAreaId, std::unordered_map<ClientIdType, CallbackType>, PropIdAreaIdHash> mClientsByPropIdArea GUARDED_BY(mLock); std::unordered_map<CallbackType, std::unordered_map<PropIdAreaId, std::unique_ptr<Subscription>, std::unordered_map<ClientIdType, std::unordered_map<PropIdAreaId, std::unique_ptr<Subscription>, PropIdAreaIdHash>> mSubscriptionsByClient GUARDED_BY(mLock); // RecurrentTimer is thread-safe. Loading @@ -141,6 +146,9 @@ class SubscriptionManager final { const GetValueFunc mGetValue; static ::android::base::Result<int64_t> getInterval(float sampleRate); // Checks whether the manager is empty. For testing purpose. bool isEmpty(); }; } // namespace vehicle Loading
automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp +91 −12 Original line number Diff line number Diff line Loading @@ -42,7 +42,6 @@ using ::aidl::android::hardware::automotive::vehicle::GetValueRequest; using ::aidl::android::hardware::automotive::vehicle::GetValueRequests; using ::aidl::android::hardware::automotive::vehicle::GetValueResult; using ::aidl::android::hardware::automotive::vehicle::GetValueResults; using ::aidl::android::hardware::automotive::vehicle::IVehicleCallback; using ::aidl::android::hardware::automotive::vehicle::SetValueRequest; using ::aidl::android::hardware::automotive::vehicle::SetValueRequests; using ::aidl::android::hardware::automotive::vehicle::SetValueResult; Loading @@ -62,6 +61,8 @@ using ::android::base::Error; using ::android::base::expected; using ::android::base::Result; using ::android::base::StringPrintf; using ::ndk::ScopedAIBinder_DeathRecipient; using ::ndk::ScopedAStatus; std::string toString(const std::unordered_set<int64_t>& values) { Loading @@ -86,10 +87,15 @@ std::shared_ptr<SubscriptionClient> DefaultVehicleHal::SubscriptionClients::getC int64_t DefaultVehicleHal::SubscribeIdByClient::getId(const CallbackType& callback) { std::scoped_lock<std::mutex> lockGuard(mLock); // This would be initialized to 0 if callback does not exist in the map. int64_t subscribeId = (mIds[callback])++; int64_t subscribeId = (mIds[callback->asBinder().get()])++; return subscribeId; } void DefaultVehicleHal::SubscriptionClients::removeClient(const AIBinder* clientId) { std::scoped_lock<std::mutex> lockGuard(mLock); mClients.erase(clientId); } size_t DefaultVehicleHal::SubscriptionClients::countClients() { std::scoped_lock<std::mutex> lockGuard(mLock); return mClients.size(); Loading Loading @@ -139,6 +145,17 @@ DefaultVehicleHal::DefaultVehicleHal(std::unique_ptr<IVehicleHardware> hardware) std::make_shared<std::function<void()>>([hardwareCopy, subscriptionManagerCopy]() { checkHealth(hardwareCopy, subscriptionManagerCopy); })); mLinkToDeathImpl = std::make_unique<AIBinderLinkToDeathImpl>(); mDeathRecipient = ScopedAIBinder_DeathRecipient( AIBinder_DeathRecipient_new(&DefaultVehicleHal::onBinderDied)); AIBinder_DeathRecipient_setOnUnlinked(mDeathRecipient.get(), &DefaultVehicleHal::onBinderUnlinked); } DefaultVehicleHal::~DefaultVehicleHal() { // Delete the deathRecipient so that onBinderDied would not be called to reference 'this'. mDeathRecipient = ScopedAIBinder_DeathRecipient(); } void DefaultVehicleHal::onPropertyChangeEvent( Loading @@ -161,26 +178,73 @@ void DefaultVehicleHal::onPropertyChangeEvent( template <class T> std::shared_ptr<T> DefaultVehicleHal::getOrCreateClient( std::unordered_map<CallbackType, std::shared_ptr<T>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool) { if (clients->find(callback) == clients->end()) { // TODO(b/204943359): Remove client from clients when linkToDeath is implemented. (*clients)[callback] = std::make_shared<T>(pendingRequestPool, callback); std::unordered_map<const AIBinder*, std::shared_ptr<T>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool) { const AIBinder* clientId = callback->asBinder().get(); if (clients->find(clientId) == clients->end()) { (*clients)[clientId] = std::make_shared<T>(pendingRequestPool, callback); } return (*clients)[clientId]; } void DefaultVehicleHal::monitorBinderLifeCycle(const CallbackType& callback) { AIBinder* clientId = callback->asBinder().get(); { std::scoped_lock<std::mutex> lockGuard(mLock); if (mOnBinderDiedContexts.find(clientId) != mOnBinderDiedContexts.end()) { // Already registered. return; } } std::unique_ptr<OnBinderDiedContext> context = std::make_unique<OnBinderDiedContext>( OnBinderDiedContext{.vhal = this, .clientId = clientId}); binder_status_t status = mLinkToDeathImpl->linkToDeath(clientId, mDeathRecipient.get(), static_cast<void*>(context.get())); if (status == STATUS_OK) { std::scoped_lock<std::mutex> lockGuard(mLock); // Insert into a map to keep the context object alive. mOnBinderDiedContexts[clientId] = std::move(context); } else { ALOGE("failed to call linkToDeath on client binder, status: %d", static_cast<int>(status)); } return (*clients)[callback]; } void DefaultVehicleHal::onBinderDied(void* cookie) { OnBinderDiedContext* context = reinterpret_cast<OnBinderDiedContext*>(cookie); context->vhal->onBinderDiedWithContext(context->clientId); } void DefaultVehicleHal::onBinderDiedWithContext(const AIBinder* clientId) { std::scoped_lock<std::mutex> lockGuard(mLock); mSetValuesClients.erase(clientId); mGetValuesClients.erase(clientId); mSubscriptionClients->removeClient(clientId); mSubscriptionManager->unsubscribe(clientId); } void DefaultVehicleHal::onBinderUnlinked(void* cookie) { // Delete the context associated with this cookie. OnBinderDiedContext* context = reinterpret_cast<OnBinderDiedContext*>(cookie); context->vhal->onBinderUnlinkedWithContext(context->clientId); } void DefaultVehicleHal::onBinderUnlinkedWithContext(const AIBinder* clientId) { std::scoped_lock<std::mutex> lockGuard(mLock); mOnBinderDiedContexts.erase(clientId); } template std::shared_ptr<DefaultVehicleHal::GetValuesClient> DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::GetValuesClient>( std::unordered_map<CallbackType, std::shared_ptr<GetValuesClient>>* clients, std::unordered_map<const AIBinder*, std::shared_ptr<GetValuesClient>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool); template std::shared_ptr<DefaultVehicleHal::SetValuesClient> DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::SetValuesClient>( std::unordered_map<CallbackType, std::shared_ptr<SetValuesClient>>* clients, std::unordered_map<const AIBinder*, std::shared_ptr<SetValuesClient>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool); template std::shared_ptr<SubscriptionClient> DefaultVehicleHal::getOrCreateClient<SubscriptionClient>( std::unordered_map<CallbackType, std::shared_ptr<SubscriptionClient>>* clients, std::unordered_map<const AIBinder*, std::shared_ptr<SubscriptionClient>>* clients, const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool); void DefaultVehicleHal::getValueFromHardwareCallCallback( Loading Loading @@ -268,6 +332,8 @@ Result<void> DefaultVehicleHal::checkProperty(const VehiclePropValue& propValue) ScopedAStatus DefaultVehicleHal::getValues(const CallbackType& callback, const GetValueRequests& requests) { monitorBinderLifeCycle(callback); expected<LargeParcelableBase::BorrowedOwnedObject<GetValueRequests>, ScopedAStatus> deserializedResults = fromStableLargeParcelable(requests); if (!deserializedResults.ok()) { Loading Loading @@ -344,6 +410,8 @@ ScopedAStatus DefaultVehicleHal::getValues(const CallbackType& callback, ScopedAStatus DefaultVehicleHal::setValues(const CallbackType& callback, const SetValueRequests& requests) { monitorBinderLifeCycle(callback); expected<LargeParcelableBase::BorrowedOwnedObject<SetValueRequests>, ScopedAStatus> deserializedResults = fromStableLargeParcelable(requests); if (!deserializedResults.ok()) { Loading Loading @@ -526,6 +594,8 @@ Result<void> DefaultVehicleHal::checkSubscribeOptions( ScopedAStatus DefaultVehicleHal::subscribe(const CallbackType& callback, const std::vector<SubscribeOptions>& options, [[maybe_unused]] int32_t maxSharedMemoryFileCount) { monitorBinderLifeCycle(callback); // TODO(b/205189110): Use shared memory file count. if (auto result = checkSubscribeOptions(options); !result.ok()) { ALOGE("subscribe: invalid subscribe options: %s", getErrorMsg(result).c_str()); Loading Loading @@ -571,7 +641,7 @@ ScopedAStatus DefaultVehicleHal::subscribe(const CallbackType& callback, ScopedAStatus DefaultVehicleHal::unsubscribe(const CallbackType& callback, const std::vector<int32_t>& propIds) { return toScopedAStatus(mSubscriptionManager->unsubscribe(callback, propIds), return toScopedAStatus(mSubscriptionManager->unsubscribe(callback->asBinder().get(), propIds), StatusCode::INVALID_ARG); } Loading Loading @@ -639,6 +709,15 @@ void DefaultVehicleHal::checkHealth(std::weak_ptr<IVehicleHardware> hardware, return; } binder_status_t DefaultVehicleHal::AIBinderLinkToDeathImpl::linkToDeath( AIBinder* binder, AIBinder_DeathRecipient* recipient, void* cookie) { return AIBinder_linkToDeath(binder, recipient, cookie); } void DefaultVehicleHal::setLinkToDeathImpl(std::unique_ptr<ILinkToDeath> impl) { mLinkToDeathImpl = std::move(impl); } } // namespace vehicle } // namespace automotive } // namespace hardware Loading
automotive/vehicle/aidl/impl/vhal/src/SubscriptionManager.cpp +24 −18 Original line number Diff line number Diff line Loading @@ -26,7 +26,7 @@ namespace vehicle { namespace { constexpr float ONE_SECOND_IN_NANO = 1000000000.; constexpr float ONE_SECOND_IN_NANO = 1'000'000'000.; } // namespace Loading Loading @@ -100,6 +100,7 @@ Result<void> SubscriptionManager::subscribe(const std::shared_ptr<IVehicleCallba } size_t intervalIndex = 0; ClientIdType clientId = callback->asBinder().get(); for (const auto& option : options) { int32_t propId = option.propId; const std::vector<int32_t>& areaIds = option.areaIds; Loading @@ -118,7 +119,7 @@ Result<void> SubscriptionManager::subscribe(const std::shared_ptr<IVehicleCallba .prop = propId, .areaId = areaId, }; mSubscriptionsByClient[callback][propIdAreaId] = mSubscriptionsByClient[clientId][propIdAreaId] = std::make_unique<RecurrentSubscription>( mTimer, [this, callback, propValueRequest] { Loading @@ -126,24 +127,24 @@ Result<void> SubscriptionManager::subscribe(const std::shared_ptr<IVehicleCallba }, interval); } else { mSubscriptionsByClient[callback][propIdAreaId] = mSubscriptionsByClient[clientId][propIdAreaId] = std::make_unique<OnChangeSubscription>(); } mClientsByPropIdArea[propIdAreaId].insert(callback); mClientsByPropIdArea[propIdAreaId][clientId] = callback; } } return {}; } Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCallback>& callback, Result<void> SubscriptionManager::unsubscribe(SubscriptionManager::ClientIdType clientId, const std::vector<int32_t>& propIds) { std::scoped_lock<std::mutex> lockGuard(mLock); if (mSubscriptionsByClient.find(callback) == mSubscriptionsByClient.end()) { if (mSubscriptionsByClient.find(clientId) == mSubscriptionsByClient.end()) { return Error() << "No property was subscribed for the callback"; } std::unordered_set<int32_t> subscribedPropIds; for (auto const& [propIdAreaId, _] : mSubscriptionsByClient[callback]) { for (auto const& [propIdAreaId, _] : mSubscriptionsByClient[clientId]) { subscribedPropIds.insert(propIdAreaId.propId); } Loading @@ -153,13 +154,13 @@ Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCall } } auto& subscriptions = mSubscriptionsByClient[callback]; auto& subscriptions = mSubscriptionsByClient[clientId]; auto it = subscriptions.begin(); while (it != subscriptions.end()) { int32_t propId = it->first.propId; if (std::find(propIds.begin(), propIds.end(), propId) != propIds.end()) { auto& clients = mClientsByPropIdArea[it->first]; clients.erase(callback); clients.erase(clientId); if (clients.empty()) { mClientsByPropIdArea.erase(it->first); } Loading @@ -169,27 +170,27 @@ Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCall } } if (subscriptions.empty()) { mSubscriptionsByClient.erase(callback); mSubscriptionsByClient.erase(clientId); } return {}; } Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCallback>& callback) { Result<void> SubscriptionManager::unsubscribe(SubscriptionManager::ClientIdType clientId) { std::scoped_lock<std::mutex> lockGuard(mLock); if (mSubscriptionsByClient.find(callback) == mSubscriptionsByClient.end()) { return Error() << "No property was subscribed for the callback"; if (mSubscriptionsByClient.find(clientId) == mSubscriptionsByClient.end()) { return Error() << "No property was subscribed for this client"; } auto& subscriptions = mSubscriptionsByClient[callback]; auto& subscriptions = mSubscriptionsByClient[clientId]; for (auto const& [propIdAreaId, _] : subscriptions) { auto& clients = mClientsByPropIdArea[propIdAreaId]; clients.erase(callback); clients.erase(clientId); if (clients.empty()) { mClientsByPropIdArea.erase(propIdAreaId); } } mSubscriptionsByClient.erase(callback); mSubscriptionsByClient.erase(clientId); return {}; } Loading @@ -207,8 +208,8 @@ SubscriptionManager::getSubscribedClients(const std::vector<VehiclePropValue>& u if (mClientsByPropIdArea.find(propIdAreaId) == mClientsByPropIdArea.end()) { continue; } for (const auto& client : mClientsByPropIdArea[propIdAreaId]) { if (!mSubscriptionsByClient[client][propIdAreaId]->isOnChange()) { for (const auto& [clientId, client] : mClientsByPropIdArea[propIdAreaId]) { if (!mSubscriptionsByClient[clientId][propIdAreaId]->isOnChange()) { continue; } clients[client].push_back(&value); Loading @@ -217,6 +218,11 @@ SubscriptionManager::getSubscribedClients(const std::vector<VehiclePropValue>& u return clients; } bool SubscriptionManager::isEmpty() { std::scoped_lock<std::mutex> lockGuard(mLock); return mSubscriptionsByClient.empty() && mClientsByPropIdArea.empty(); } SubscriptionManager::RecurrentSubscription::RecurrentSubscription( std::shared_ptr<RecurrentTimer> timer, std::function<void()>&& action, int64_t interval) : mAction(std::make_shared<std::function<void()>>(action)), mTimer(timer) { Loading