Loading automotive/vehicle/aidl/impl/grpc/Android.bp +24 −0 Original line number Diff line number Diff line Loading @@ -100,3 +100,27 @@ cc_library_static { "-Wno-unused-parameter", ], } cc_library_static { name: "android.hardware.automotive.vehicle@default-grpc-server-lib", defaults: ["VehicleHalDefaults"], vendor: true, srcs: [ "GRPCVehicleProxyServer.cpp", ], whole_static_libs: [ "android.hardware.automotive.vehicle@default-grpc-libgrpc", "VehicleHalProtoMessageConverter", ], header_libs: [ "IVehicleHardware", ], shared_libs: [ "libgrpc++", "libprotobuf-cpp-full", ], export_include_dirs: ["."], cflags: [ "-Wno-unused-parameter", ], } automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp +38 −10 Original line number Diff line number Diff line Loading @@ -152,21 +152,49 @@ void GRPCVehicleHardware::registerOnPropertySetErrorEvent( mOnSetErr = std::move(callback); } DumpResult GRPCVehicleHardware::dump(const std::vector<std::string>& /* options */) { // TODO(chenhaosjtuacm): To be implemented. DumpResult GRPCVehicleHardware::dump(const std::vector<std::string>& options) { ::grpc::ClientContext context; proto::DumpOptions protoDumpOptions; proto::DumpResult protoDumpResult; for (const auto& option : options) { protoDumpOptions.add_options(option); } auto grpc_status = mGrpcStub->Dump(&context, protoDumpOptions, &protoDumpResult); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC Dump Failed: " << grpc_status.error_message(); return {}; } return { .callerShouldDumpState = protoDumpResult.caller_should_dump_state(), .buffer = protoDumpResult.buffer(), }; } aidlvhal::StatusCode GRPCVehicleHardware::checkHealth() { // TODO(chenhaosjtuacm): To be implemented. return aidlvhal::StatusCode::OK; ::grpc::ClientContext context; proto::VehicleHalCallStatus protoStatus; auto grpc_status = mGrpcStub->CheckHealth(&context, ::google::protobuf::Empty(), &protoStatus); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC CheckHealth Failed: " << grpc_status.error_message(); return aidlvhal::StatusCode::INTERNAL_ERROR; } return static_cast<aidlvhal::StatusCode>(protoStatus.status_code()); } aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t /* propId */, int32_t /* areaId */, float /* sampleRate */) { // TODO(chenhaosjtuacm): To be implemented. return aidlvhal::StatusCode::OK; aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t propId, int32_t areaId, float sampleRate) { ::grpc::ClientContext context; proto::UpdateSampleRateRequest request; proto::VehicleHalCallStatus protoStatus; request.set_prop(propId); request.set_area_id(areaId); request.set_sample_rate(sampleRate); auto grpc_status = mGrpcStub->UpdateSampleRate(&context, request, &protoStatus); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC UpdateSampleRate Failed: " << grpc_status.error_message(); return aidlvhal::StatusCode::INTERNAL_ERROR; } return static_cast<aidlvhal::StatusCode>(protoStatus.status_code()); } bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) { Loading automotive/vehicle/aidl/impl/grpc/GRPCVehicleProxyServer.cpp 0 → 100644 +296 −0 Original line number Diff line number Diff line /* * Copyright (C) 2023 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GRPCVehicleProxyServer.h" #include "ProtoMessageConverter.h" #include <grpc++/grpc++.h> #include <android-base/logging.h> #include <algorithm> #include <condition_variable> #include <mutex> #include <unordered_set> #include <utility> #include <vector> namespace android::hardware::automotive::vehicle::virtualization { std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0}; static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() { // TODO(chenhaosjtuacm): get secured credentials here return ::grpc::InsecureServerCredentials(); } GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr, std::unique_ptr<IVehicleHardware>&& hardware) : mServiceAddr(std::move(serverAddr)), mHardware(std::move(hardware)) { mHardware->registerOnPropertyChangeEvent( std::make_unique<const IVehicleHardware::PropertyChangeCallback>( [this](std::vector<aidlvhal::VehiclePropValue> values) { OnVehiclePropChange(values); })); } ::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) { for (const auto& config : mHardware->getAllPropertyConfigs()) { proto::VehiclePropConfig protoConfig; proto_msg_converter::aidlToProto(config, &protoConfig); if (!stream->Write(protoConfig)) { return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } } return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context, const proto::VehiclePropValueRequests* requests, proto::SetValueResults* results) { std::vector<aidlvhal::SetValueRequest> aidlRequests; for (const auto& protoRequest : requests->requests()) { auto& aidlRequest = aidlRequests.emplace_back(); aidlRequest.requestId = protoRequest.request_id(); proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value); } auto waitMtx = std::make_shared<std::mutex>(); auto waitCV = std::make_shared<std::condition_variable>(); auto complete = std::make_shared<bool>(false); auto tmpResults = std::make_shared<proto::SetValueResults>(); auto aidlStatus = mHardware->setValues( std::make_shared<const IVehicleHardware::SetValuesCallback>( [waitMtx, waitCV, complete, tmpResults](std::vector<aidlvhal::SetValueResult> setValueResults) { for (const auto& aidlResult : setValueResults) { auto& protoResult = *tmpResults->add_results(); protoResult.set_request_id(aidlResult.requestId); protoResult.set_status( static_cast<proto::StatusCode>(aidlResult.status)); } { std::lock_guard lck(*waitMtx); *complete = true; } waitCV->notify_all(); }), aidlRequests); if (aidlStatus != aidlvhal::StatusCode::OK) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware fails to set values, VHAL status: " + toString(aidlStatus)); } std::unique_lock lck(*waitMtx); bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; }); if (!success) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware set values timeout."); } *results = std::move(*tmpResults); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context, const proto::VehiclePropValueRequests* requests, proto::GetValueResults* results) { std::vector<aidlvhal::GetValueRequest> aidlRequests; for (const auto& protoRequest : requests->requests()) { auto& aidlRequest = aidlRequests.emplace_back(); aidlRequest.requestId = protoRequest.request_id(); proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop); } auto waitMtx = std::make_shared<std::mutex>(); auto waitCV = std::make_shared<std::condition_variable>(); auto complete = std::make_shared<bool>(false); auto tmpResults = std::make_shared<proto::GetValueResults>(); auto aidlStatus = mHardware->getValues( std::make_shared<const IVehicleHardware::GetValuesCallback>( [waitMtx, waitCV, complete, tmpResults](std::vector<aidlvhal::GetValueResult> getValueResults) { for (const auto& aidlResult : getValueResults) { auto& protoResult = *tmpResults->add_results(); protoResult.set_request_id(aidlResult.requestId); protoResult.set_status( static_cast<proto::StatusCode>(aidlResult.status)); if (aidlResult.prop) { auto* valuePtr = protoResult.mutable_value(); proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr); } } { std::lock_guard lck(*waitMtx); *complete = true; } waitCV->notify_all(); }), aidlRequests); if (aidlStatus != aidlvhal::StatusCode::OK) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware fails to get values, VHAL status: " + toString(aidlStatus)); } std::unique_lock lck(*waitMtx); bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; }); if (!success) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware get values timeout."); } *results = std::move(*tmpResults); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate( ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request, proto::VehicleHalCallStatus* status) { const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(), request->sample_rate()); status->set_status_code(static_cast<proto::StatusCode>(status_code)); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context, const ::google::protobuf::Empty*, proto::VehicleHalCallStatus* status) { status->set_status_code(static_cast<proto::StatusCode>(mHardware->checkHealth())); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context, const proto::DumpOptions* options, proto::DumpResult* result) { std::vector<std::string> dumpOptionStrings(options->options().begin(), options->options().end()); auto dumpResult = mHardware->dump(dumpOptionStrings); result->set_caller_should_dump_state(dumpResult.callerShouldDumpState); result->set_buffer(dumpResult.buffer); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<proto::VehiclePropValues>* stream) { auto conn = std::make_shared<ConnectionDescriptor>(stream); { std::lock_guard lck(mConnectionMutex); mValueStreamingConnections.push_back(conn); } conn->Wait(); LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID(); return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } void GrpcVehicleProxyServer::OnVehiclePropChange( const std::vector<aidlvhal::VehiclePropValue>& values) { std::unordered_set<uint64_t> brokenConn; proto::VehiclePropValues protoValues; for (const auto& value : values) { auto* protoValuePtr = protoValues.add_values(); proto_msg_converter::aidlToProto(value, protoValuePtr); } { std::shared_lock read_lock(mConnectionMutex); for (auto& connection : mValueStreamingConnections) { auto writeOK = connection->Write(protoValues); if (!writeOK) { LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << connection->ID(); brokenConn.insert(connection->ID()); } } } if (brokenConn.empty()) { return; } std::unique_lock write_lock(mConnectionMutex); mValueStreamingConnections.erase( std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(), [&brokenConn](const auto& conn) { return brokenConn.find(conn->ID()) != brokenConn.end(); }), mValueStreamingConnections.end()); } GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() { if (mServer) { LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started."; return *this; } ::grpc::ServerBuilder builder; builder.RegisterService(this); builder.AddListeningPort(mServiceAddr, getServerCredentials()); mServer = builder.BuildAndStart(); CHECK(mServer) << __func__ << ": failed to create the GRPC server, " << "please make sure the configuration and permissions are correct"; return *this; } GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() { std::shared_lock read_lock(mConnectionMutex); for (auto& conn : mValueStreamingConnections) { conn->Shutdown(); } if (mServer) { mServer->Shutdown(); } return *this; } void GrpcVehicleProxyServer::Wait() { if (mServer) { mServer->Wait(); } mServer.reset(); } GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() { Shutdown(); } bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) { if (!mStream) { LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID(); Shutdown(); return false; } { std::lock_guard lck(*mMtx); if (!mShutdownFlag && mStream->Write(values)) { return true; } else { LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID(); } } Shutdown(); return false; } void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() { std::unique_lock lck(*mMtx); mCV->wait(lck, [this] { return mShutdownFlag; }); } void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() { { std::lock_guard lck(*mMtx); mShutdownFlag = true; } mCV->notify_all(); } } // namespace android::hardware::automotive::vehicle::virtualization automotive/vehicle/aidl/impl/grpc/GRPCVehicleProxyServer.h 0 → 100644 +122 −0 Original line number Diff line number Diff line /* * Copyright (C) 2023 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include "IVehicleHardware.h" #include "VehicleServer.grpc.pb.h" #include "VehicleServer.pb.h" #include <grpc++/grpc++.h> #include <atomic> #include <chrono> #include <cstdint> #include <functional> #include <memory> #include <shared_mutex> #include <string> #include <utility> namespace android::hardware::automotive::vehicle::virtualization { namespace aidlvhal = ::aidl::android::hardware::automotive::vehicle; // Connect other GRPC vehicle hardware(s) to the underlying vehicle hardware. class GrpcVehicleProxyServer : public proto::VehicleServer::Service { public: GrpcVehicleProxyServer(std::string serverAddr, std::unique_ptr<IVehicleHardware>&& hardware); ::grpc::Status GetAllPropertyConfig( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) override; ::grpc::Status SetValues(::grpc::ServerContext* context, const proto::VehiclePropValueRequests* requests, proto::SetValueResults* results) override; ::grpc::Status GetValues(::grpc::ServerContext* context, const proto::VehiclePropValueRequests* requests, proto::GetValueResults* results) override; ::grpc::Status UpdateSampleRate(::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request, proto::VehicleHalCallStatus* status) override; ::grpc::Status CheckHealth(::grpc::ServerContext* context, const ::google::protobuf::Empty*, proto::VehicleHalCallStatus* status) override; ::grpc::Status Dump(::grpc::ServerContext* context, const proto::DumpOptions* options, proto::DumpResult* result) override; ::grpc::Status StartPropertyValuesStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<proto::VehiclePropValues>* stream) override; GrpcVehicleProxyServer& Start(); GrpcVehicleProxyServer& Shutdown(); void Wait(); private: void OnVehiclePropChange(const std::vector<aidlvhal::VehiclePropValue>& values); // We keep long-lasting connection for streaming the prop values. struct ConnectionDescriptor { explicit ConnectionDescriptor(::grpc::ServerWriter<proto::VehiclePropValues>* stream) : mStream(stream), mConnectionID(connection_id_counter_.fetch_add(1) + 1), mMtx(std::make_unique<std::mutex>()), mCV(std::make_unique<std::condition_variable>()) {} ConnectionDescriptor(const ConnectionDescriptor&) = delete; ConnectionDescriptor(ConnectionDescriptor&& cd) = default; ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete; ConnectionDescriptor& operator=(ConnectionDescriptor&& cd) = default; ~ConnectionDescriptor(); uint64_t ID() const { return mConnectionID; } bool Write(const proto::VehiclePropValues& values); void Wait(); void Shutdown(); private: ::grpc::ServerWriter<proto::VehiclePropValues>* mStream; uint64_t mConnectionID{0}; std::unique_ptr<std::mutex> mMtx; std::unique_ptr<std::condition_variable> mCV; bool mShutdownFlag{false}; static std::atomic<uint64_t> connection_id_counter_; }; std::string mServiceAddr; std::unique_ptr<::grpc::Server> mServer{nullptr}; std::unique_ptr<IVehicleHardware> mHardware; std::shared_mutex mConnectionMutex; std::vector<std::shared_ptr<ConnectionDescriptor>> mValueStreamingConnections; static constexpr auto kHardwareOpTimeout = std::chrono::seconds(1); }; } // namespace android::hardware::automotive::vehicle::virtualization automotive/vehicle/aidl/impl/grpc/proto/VehicleServer.proto +8 −8 Original line number Diff line number Diff line Loading @@ -18,20 +18,14 @@ syntax = "proto3"; package android.hardware.automotive.vehicle.proto; import "android/hardware/automotive/vehicle/DumpOptions.proto"; import "android/hardware/automotive/vehicle/DumpResult.proto"; import "android/hardware/automotive/vehicle/StatusCode.proto"; import "android/hardware/automotive/vehicle/VehiclePropConfig.proto"; import "android/hardware/automotive/vehicle/VehiclePropValue.proto"; import "android/hardware/automotive/vehicle/VehiclePropValueRequest.proto"; import "google/protobuf/empty.proto"; message VehicleHalCallStatus { StatusCode status_code = 1; } message VehiclePropValues { repeated VehiclePropValue values = 1; } service VehicleServer { rpc GetAllPropertyConfig(google.protobuf.Empty) returns (stream VehiclePropConfig) {} Loading @@ -39,5 +33,11 @@ service VehicleServer { rpc GetValues(VehiclePropValueRequests) returns (GetValueResults) {} rpc UpdateSampleRate(UpdateSampleRateRequest) returns (VehicleHalCallStatus) {} rpc CheckHealth(google.protobuf.Empty) returns (VehicleHalCallStatus) {} rpc Dump(DumpOptions) returns (DumpResult) {} rpc StartPropertyValuesStream(google.protobuf.Empty) returns (stream VehiclePropValues) {} } Loading
automotive/vehicle/aidl/impl/grpc/Android.bp +24 −0 Original line number Diff line number Diff line Loading @@ -100,3 +100,27 @@ cc_library_static { "-Wno-unused-parameter", ], } cc_library_static { name: "android.hardware.automotive.vehicle@default-grpc-server-lib", defaults: ["VehicleHalDefaults"], vendor: true, srcs: [ "GRPCVehicleProxyServer.cpp", ], whole_static_libs: [ "android.hardware.automotive.vehicle@default-grpc-libgrpc", "VehicleHalProtoMessageConverter", ], header_libs: [ "IVehicleHardware", ], shared_libs: [ "libgrpc++", "libprotobuf-cpp-full", ], export_include_dirs: ["."], cflags: [ "-Wno-unused-parameter", ], }
automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp +38 −10 Original line number Diff line number Diff line Loading @@ -152,21 +152,49 @@ void GRPCVehicleHardware::registerOnPropertySetErrorEvent( mOnSetErr = std::move(callback); } DumpResult GRPCVehicleHardware::dump(const std::vector<std::string>& /* options */) { // TODO(chenhaosjtuacm): To be implemented. DumpResult GRPCVehicleHardware::dump(const std::vector<std::string>& options) { ::grpc::ClientContext context; proto::DumpOptions protoDumpOptions; proto::DumpResult protoDumpResult; for (const auto& option : options) { protoDumpOptions.add_options(option); } auto grpc_status = mGrpcStub->Dump(&context, protoDumpOptions, &protoDumpResult); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC Dump Failed: " << grpc_status.error_message(); return {}; } return { .callerShouldDumpState = protoDumpResult.caller_should_dump_state(), .buffer = protoDumpResult.buffer(), }; } aidlvhal::StatusCode GRPCVehicleHardware::checkHealth() { // TODO(chenhaosjtuacm): To be implemented. return aidlvhal::StatusCode::OK; ::grpc::ClientContext context; proto::VehicleHalCallStatus protoStatus; auto grpc_status = mGrpcStub->CheckHealth(&context, ::google::protobuf::Empty(), &protoStatus); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC CheckHealth Failed: " << grpc_status.error_message(); return aidlvhal::StatusCode::INTERNAL_ERROR; } return static_cast<aidlvhal::StatusCode>(protoStatus.status_code()); } aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t /* propId */, int32_t /* areaId */, float /* sampleRate */) { // TODO(chenhaosjtuacm): To be implemented. return aidlvhal::StatusCode::OK; aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t propId, int32_t areaId, float sampleRate) { ::grpc::ClientContext context; proto::UpdateSampleRateRequest request; proto::VehicleHalCallStatus protoStatus; request.set_prop(propId); request.set_area_id(areaId); request.set_sample_rate(sampleRate); auto grpc_status = mGrpcStub->UpdateSampleRate(&context, request, &protoStatus); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC UpdateSampleRate Failed: " << grpc_status.error_message(); return aidlvhal::StatusCode::INTERNAL_ERROR; } return static_cast<aidlvhal::StatusCode>(protoStatus.status_code()); } bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) { Loading
automotive/vehicle/aidl/impl/grpc/GRPCVehicleProxyServer.cpp 0 → 100644 +296 −0 Original line number Diff line number Diff line /* * Copyright (C) 2023 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GRPCVehicleProxyServer.h" #include "ProtoMessageConverter.h" #include <grpc++/grpc++.h> #include <android-base/logging.h> #include <algorithm> #include <condition_variable> #include <mutex> #include <unordered_set> #include <utility> #include <vector> namespace android::hardware::automotive::vehicle::virtualization { std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0}; static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() { // TODO(chenhaosjtuacm): get secured credentials here return ::grpc::InsecureServerCredentials(); } GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr, std::unique_ptr<IVehicleHardware>&& hardware) : mServiceAddr(std::move(serverAddr)), mHardware(std::move(hardware)) { mHardware->registerOnPropertyChangeEvent( std::make_unique<const IVehicleHardware::PropertyChangeCallback>( [this](std::vector<aidlvhal::VehiclePropValue> values) { OnVehiclePropChange(values); })); } ::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) { for (const auto& config : mHardware->getAllPropertyConfigs()) { proto::VehiclePropConfig protoConfig; proto_msg_converter::aidlToProto(config, &protoConfig); if (!stream->Write(protoConfig)) { return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } } return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context, const proto::VehiclePropValueRequests* requests, proto::SetValueResults* results) { std::vector<aidlvhal::SetValueRequest> aidlRequests; for (const auto& protoRequest : requests->requests()) { auto& aidlRequest = aidlRequests.emplace_back(); aidlRequest.requestId = protoRequest.request_id(); proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value); } auto waitMtx = std::make_shared<std::mutex>(); auto waitCV = std::make_shared<std::condition_variable>(); auto complete = std::make_shared<bool>(false); auto tmpResults = std::make_shared<proto::SetValueResults>(); auto aidlStatus = mHardware->setValues( std::make_shared<const IVehicleHardware::SetValuesCallback>( [waitMtx, waitCV, complete, tmpResults](std::vector<aidlvhal::SetValueResult> setValueResults) { for (const auto& aidlResult : setValueResults) { auto& protoResult = *tmpResults->add_results(); protoResult.set_request_id(aidlResult.requestId); protoResult.set_status( static_cast<proto::StatusCode>(aidlResult.status)); } { std::lock_guard lck(*waitMtx); *complete = true; } waitCV->notify_all(); }), aidlRequests); if (aidlStatus != aidlvhal::StatusCode::OK) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware fails to set values, VHAL status: " + toString(aidlStatus)); } std::unique_lock lck(*waitMtx); bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; }); if (!success) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware set values timeout."); } *results = std::move(*tmpResults); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context, const proto::VehiclePropValueRequests* requests, proto::GetValueResults* results) { std::vector<aidlvhal::GetValueRequest> aidlRequests; for (const auto& protoRequest : requests->requests()) { auto& aidlRequest = aidlRequests.emplace_back(); aidlRequest.requestId = protoRequest.request_id(); proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop); } auto waitMtx = std::make_shared<std::mutex>(); auto waitCV = std::make_shared<std::condition_variable>(); auto complete = std::make_shared<bool>(false); auto tmpResults = std::make_shared<proto::GetValueResults>(); auto aidlStatus = mHardware->getValues( std::make_shared<const IVehicleHardware::GetValuesCallback>( [waitMtx, waitCV, complete, tmpResults](std::vector<aidlvhal::GetValueResult> getValueResults) { for (const auto& aidlResult : getValueResults) { auto& protoResult = *tmpResults->add_results(); protoResult.set_request_id(aidlResult.requestId); protoResult.set_status( static_cast<proto::StatusCode>(aidlResult.status)); if (aidlResult.prop) { auto* valuePtr = protoResult.mutable_value(); proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr); } } { std::lock_guard lck(*waitMtx); *complete = true; } waitCV->notify_all(); }), aidlRequests); if (aidlStatus != aidlvhal::StatusCode::OK) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware fails to get values, VHAL status: " + toString(aidlStatus)); } std::unique_lock lck(*waitMtx); bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; }); if (!success) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware get values timeout."); } *results = std::move(*tmpResults); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate( ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request, proto::VehicleHalCallStatus* status) { const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(), request->sample_rate()); status->set_status_code(static_cast<proto::StatusCode>(status_code)); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context, const ::google::protobuf::Empty*, proto::VehicleHalCallStatus* status) { status->set_status_code(static_cast<proto::StatusCode>(mHardware->checkHealth())); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context, const proto::DumpOptions* options, proto::DumpResult* result) { std::vector<std::string> dumpOptionStrings(options->options().begin(), options->options().end()); auto dumpResult = mHardware->dump(dumpOptionStrings); result->set_caller_should_dump_state(dumpResult.callerShouldDumpState); result->set_buffer(dumpResult.buffer); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<proto::VehiclePropValues>* stream) { auto conn = std::make_shared<ConnectionDescriptor>(stream); { std::lock_guard lck(mConnectionMutex); mValueStreamingConnections.push_back(conn); } conn->Wait(); LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID(); return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } void GrpcVehicleProxyServer::OnVehiclePropChange( const std::vector<aidlvhal::VehiclePropValue>& values) { std::unordered_set<uint64_t> brokenConn; proto::VehiclePropValues protoValues; for (const auto& value : values) { auto* protoValuePtr = protoValues.add_values(); proto_msg_converter::aidlToProto(value, protoValuePtr); } { std::shared_lock read_lock(mConnectionMutex); for (auto& connection : mValueStreamingConnections) { auto writeOK = connection->Write(protoValues); if (!writeOK) { LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << connection->ID(); brokenConn.insert(connection->ID()); } } } if (brokenConn.empty()) { return; } std::unique_lock write_lock(mConnectionMutex); mValueStreamingConnections.erase( std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(), [&brokenConn](const auto& conn) { return brokenConn.find(conn->ID()) != brokenConn.end(); }), mValueStreamingConnections.end()); } GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() { if (mServer) { LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started."; return *this; } ::grpc::ServerBuilder builder; builder.RegisterService(this); builder.AddListeningPort(mServiceAddr, getServerCredentials()); mServer = builder.BuildAndStart(); CHECK(mServer) << __func__ << ": failed to create the GRPC server, " << "please make sure the configuration and permissions are correct"; return *this; } GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() { std::shared_lock read_lock(mConnectionMutex); for (auto& conn : mValueStreamingConnections) { conn->Shutdown(); } if (mServer) { mServer->Shutdown(); } return *this; } void GrpcVehicleProxyServer::Wait() { if (mServer) { mServer->Wait(); } mServer.reset(); } GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() { Shutdown(); } bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) { if (!mStream) { LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID(); Shutdown(); return false; } { std::lock_guard lck(*mMtx); if (!mShutdownFlag && mStream->Write(values)) { return true; } else { LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID(); } } Shutdown(); return false; } void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() { std::unique_lock lck(*mMtx); mCV->wait(lck, [this] { return mShutdownFlag; }); } void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() { { std::lock_guard lck(*mMtx); mShutdownFlag = true; } mCV->notify_all(); } } // namespace android::hardware::automotive::vehicle::virtualization
automotive/vehicle/aidl/impl/grpc/GRPCVehicleProxyServer.h 0 → 100644 +122 −0 Original line number Diff line number Diff line /* * Copyright (C) 2023 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include "IVehicleHardware.h" #include "VehicleServer.grpc.pb.h" #include "VehicleServer.pb.h" #include <grpc++/grpc++.h> #include <atomic> #include <chrono> #include <cstdint> #include <functional> #include <memory> #include <shared_mutex> #include <string> #include <utility> namespace android::hardware::automotive::vehicle::virtualization { namespace aidlvhal = ::aidl::android::hardware::automotive::vehicle; // Connect other GRPC vehicle hardware(s) to the underlying vehicle hardware. class GrpcVehicleProxyServer : public proto::VehicleServer::Service { public: GrpcVehicleProxyServer(std::string serverAddr, std::unique_ptr<IVehicleHardware>&& hardware); ::grpc::Status GetAllPropertyConfig( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) override; ::grpc::Status SetValues(::grpc::ServerContext* context, const proto::VehiclePropValueRequests* requests, proto::SetValueResults* results) override; ::grpc::Status GetValues(::grpc::ServerContext* context, const proto::VehiclePropValueRequests* requests, proto::GetValueResults* results) override; ::grpc::Status UpdateSampleRate(::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request, proto::VehicleHalCallStatus* status) override; ::grpc::Status CheckHealth(::grpc::ServerContext* context, const ::google::protobuf::Empty*, proto::VehicleHalCallStatus* status) override; ::grpc::Status Dump(::grpc::ServerContext* context, const proto::DumpOptions* options, proto::DumpResult* result) override; ::grpc::Status StartPropertyValuesStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<proto::VehiclePropValues>* stream) override; GrpcVehicleProxyServer& Start(); GrpcVehicleProxyServer& Shutdown(); void Wait(); private: void OnVehiclePropChange(const std::vector<aidlvhal::VehiclePropValue>& values); // We keep long-lasting connection for streaming the prop values. struct ConnectionDescriptor { explicit ConnectionDescriptor(::grpc::ServerWriter<proto::VehiclePropValues>* stream) : mStream(stream), mConnectionID(connection_id_counter_.fetch_add(1) + 1), mMtx(std::make_unique<std::mutex>()), mCV(std::make_unique<std::condition_variable>()) {} ConnectionDescriptor(const ConnectionDescriptor&) = delete; ConnectionDescriptor(ConnectionDescriptor&& cd) = default; ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete; ConnectionDescriptor& operator=(ConnectionDescriptor&& cd) = default; ~ConnectionDescriptor(); uint64_t ID() const { return mConnectionID; } bool Write(const proto::VehiclePropValues& values); void Wait(); void Shutdown(); private: ::grpc::ServerWriter<proto::VehiclePropValues>* mStream; uint64_t mConnectionID{0}; std::unique_ptr<std::mutex> mMtx; std::unique_ptr<std::condition_variable> mCV; bool mShutdownFlag{false}; static std::atomic<uint64_t> connection_id_counter_; }; std::string mServiceAddr; std::unique_ptr<::grpc::Server> mServer{nullptr}; std::unique_ptr<IVehicleHardware> mHardware; std::shared_mutex mConnectionMutex; std::vector<std::shared_ptr<ConnectionDescriptor>> mValueStreamingConnections; static constexpr auto kHardwareOpTimeout = std::chrono::seconds(1); }; } // namespace android::hardware::automotive::vehicle::virtualization
automotive/vehicle/aidl/impl/grpc/proto/VehicleServer.proto +8 −8 Original line number Diff line number Diff line Loading @@ -18,20 +18,14 @@ syntax = "proto3"; package android.hardware.automotive.vehicle.proto; import "android/hardware/automotive/vehicle/DumpOptions.proto"; import "android/hardware/automotive/vehicle/DumpResult.proto"; import "android/hardware/automotive/vehicle/StatusCode.proto"; import "android/hardware/automotive/vehicle/VehiclePropConfig.proto"; import "android/hardware/automotive/vehicle/VehiclePropValue.proto"; import "android/hardware/automotive/vehicle/VehiclePropValueRequest.proto"; import "google/protobuf/empty.proto"; message VehicleHalCallStatus { StatusCode status_code = 1; } message VehiclePropValues { repeated VehiclePropValue values = 1; } service VehicleServer { rpc GetAllPropertyConfig(google.protobuf.Empty) returns (stream VehiclePropConfig) {} Loading @@ -39,5 +33,11 @@ service VehicleServer { rpc GetValues(VehiclePropValueRequests) returns (GetValueResults) {} rpc UpdateSampleRate(UpdateSampleRateRequest) returns (VehicleHalCallStatus) {} rpc CheckHealth(google.protobuf.Empty) returns (VehicleHalCallStatus) {} rpc Dump(DumpOptions) returns (DumpResult) {} rpc StartPropertyValuesStream(google.protobuf.Empty) returns (stream VehiclePropValues) {} }