Loading automotive/vehicle/2.0/default/impl/vhal_v2_0/proto/VehicleServer.proto +12 −2 Original line number Diff line number Diff line Loading @@ -35,13 +35,23 @@ message VehicleHalCallStatus { VehicleHalStatusCode status_code = 1; } message WrappedVehiclePropValue { VehiclePropValue value = 1; // An indicator on whether we should update the status of the property // - true: if the value is generated by (emulated/real) car, or; // if the value is injected to 'fake' a on car event (for debugging purpose) // - false: if the value is set by VHal (public interface), since Android // cannot change status of property on a real car bool update_status = 2; } service VehicleServer { rpc GetAllPropertyConfig(google.protobuf.Empty) returns (stream VehiclePropConfig) {} // Change the property value of the vehicle rpc SetProperty(VehiclePropValue) returns (VehicleHalCallStatus) {} rpc SetProperty(WrappedVehiclePropValue) returns (VehicleHalCallStatus) {} // Start a vehicle property value stream rpc StartPropertyValuesStream(google.protobuf.Empty) returns (stream VehiclePropValue) {} rpc StartPropertyValuesStream(google.protobuf.Empty) returns (stream WrappedVehiclePropValue) {} } automotive/vehicle/2.0/default/impl/vhal_v2_0/virtualization/GrpcVehicleClient.cpp 0 → 100644 +162 −0 Original line number Diff line number Diff line /* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GrpcVehicleClient.h" #include <condition_variable> #include <mutex> #include <android-base/logging.h> #include <grpc++/grpc++.h> #include "VehicleServer.grpc.pb.h" #include "VehicleServer.pb.h" #include "vhal_v2_0/ProtoMessageConverter.h" namespace android { namespace hardware { namespace automotive { namespace vehicle { namespace V2_0 { namespace impl { static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() { // TODO(chenhaosjtuacm): get secured credentials here return ::grpc::InsecureChannelCredentials(); } class GrpcVehicleClientImpl : public EmulatedVehicleClient { public: GrpcVehicleClientImpl(const std::string& addr) : mServiceAddr(addr), mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())), mGrpcStub(vhal_proto::VehicleServer::NewStub(mGrpcChannel)) { StartValuePollingThread(); } ~GrpcVehicleClientImpl() { mShuttingDownFlag.store(true); mShutdownCV.notify_all(); if (mPollingThread.joinable()) { mPollingThread.join(); } } // methods from IVehicleClient std::vector<VehiclePropConfig> getAllPropertyConfig() const override; StatusCode setProperty(const VehiclePropValue& value, bool updateStatus) override; private: void StartValuePollingThread(); // private data members std::string mServiceAddr; std::shared_ptr<::grpc::Channel> mGrpcChannel; std::unique_ptr<vhal_proto::VehicleServer::Stub> mGrpcStub; std::thread mPollingThread; std::mutex mShutdownMutex; std::condition_variable mShutdownCV; std::atomic<bool> mShuttingDownFlag{false}; }; std::unique_ptr<EmulatedVehicleClient> makeGrpcVehicleClient(const std::string& addr) { return std::make_unique<GrpcVehicleClientImpl>(addr); } std::vector<VehiclePropConfig> GrpcVehicleClientImpl::getAllPropertyConfig() const { std::vector<VehiclePropConfig> configs; ::grpc::ClientContext context; auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty()); vhal_proto::VehiclePropConfig protoConfig; while (config_stream->Read(&protoConfig)) { VehiclePropConfig config; proto_msg_converter::fromProto(&config, protoConfig); configs.emplace_back(std::move(config)); } auto grpc_status = config_stream->Finish(); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message(); configs.clear(); } return configs; } StatusCode GrpcVehicleClientImpl::setProperty(const VehiclePropValue& value, bool updateStatus) { ::grpc::ClientContext context; vhal_proto::WrappedVehiclePropValue wrappedProtoValue; vhal_proto::VehicleHalCallStatus vhal_status; proto_msg_converter::toProto(wrappedProtoValue.mutable_value(), value); wrappedProtoValue.set_update_status(updateStatus); auto grpc_status = mGrpcStub->SetProperty(&context, wrappedProtoValue, &vhal_status); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC SetProperty Failed: " << grpc_status.error_message(); return StatusCode::INTERNAL_ERROR; } return static_cast<StatusCode>(vhal_status.status_code()); } void GrpcVehicleClientImpl::StartValuePollingThread() { mPollingThread = std::thread([this]() { while (!mShuttingDownFlag.load()) { ::grpc::ClientContext context; std::atomic<bool> rpc_ok{true}; std::thread shuttingdown_watcher([this, &rpc_ok, &context]() { std::unique_lock<std::mutex> shutdownLock(mShutdownMutex); mShutdownCV.wait(shutdownLock, [this, &rpc_ok]() { return !rpc_ok.load() || mShuttingDownFlag.load(); }); context.TryCancel(); }); auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty()); vhal_proto::WrappedVehiclePropValue wrappedProtoValue; while (!mShuttingDownFlag.load() && value_stream->Read(&wrappedProtoValue)) { VehiclePropValue value; proto_msg_converter::fromProto(&value, wrappedProtoValue.value()); onPropertyValue(value, wrappedProtoValue.update_status()); } rpc_ok.store(false); mShutdownCV.notify_all(); shuttingdown_watcher.join(); auto grpc_status = value_stream->Finish(); // never reach here until connection lost LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message(); // try to reconnect } }); } } // namespace impl } // namespace V2_0 } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android automotive/vehicle/2.0/default/impl/vhal_v2_0/virtualization/GrpcVehicleClient.h 0 → 100644 +40 −0 Original line number Diff line number Diff line /* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleClient_H_ #define android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleClient_H_ #include "vhal_v2_0/EmulatedVehicleConnector.h" namespace android { namespace hardware { namespace automotive { namespace vehicle { namespace V2_0 { namespace impl { std::unique_ptr<EmulatedVehicleClient> makeGrpcVehicleClient(const std::string& addr); } // namespace impl } // namespace V2_0 } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android #endif // android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleClient_H_ automotive/vehicle/2.0/default/impl/vhal_v2_0/virtualization/GrpcVehicleServer.cpp 0 → 100644 +229 −0 Original line number Diff line number Diff line /* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GrpcVehicleServer.h" #include <condition_variable> #include <mutex> #include <shared_mutex> #include <android-base/logging.h> #include <grpc++/grpc++.h> #include "VehicleServer.grpc.pb.h" #include "VehicleServer.pb.h" #include "vhal_v2_0/ProtoMessageConverter.h" namespace android { namespace hardware { namespace automotive { namespace vehicle { namespace V2_0 { namespace impl { class GrpcVehicleServerImpl : public GrpcVehicleServer, public vhal_proto::VehicleServer::Service { public: GrpcVehicleServerImpl(const std::string& addr) : mServiceAddr(addr) { setValuePool(&mValueObjectPool); } // method from GrpcVehicleServer void Start() override; // method from IVehicleServer void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override; // methods from vhal_proto::VehicleServer::Service ::grpc::Status GetAllPropertyConfig( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) override; ::grpc::Status SetProperty(::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue, vhal_proto::VehicleHalCallStatus* status) override; ::grpc::Status StartPropertyValuesStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) override; private: // We keep long-lasting connection for streaming the prop values. // For us, each connection can be represented as a function to send the new value, and // an ID to identify this connection struct ConnectionDescriptor { using ValueWriterType = std::function<bool(const vhal_proto::WrappedVehiclePropValue&)>; ConnectionDescriptor(ValueWriterType&& value_writer) : mValueWriter(std::move(value_writer)), mConnectionID(CONNECTION_ID_COUNTER.fetch_add(1)) {} ConnectionDescriptor(const ConnectionDescriptor&) = delete; ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete; // This move constructor is NOT THREAD-SAFE, which means it cannot be moved // while using. Since the connection descriptors are pretected by mConnectionMutex // then we are fine here ConnectionDescriptor(ConnectionDescriptor&& cd) : mValueWriter(std::move(cd.mValueWriter)), mConnectionID(cd.mConnectionID), mIsAlive(cd.mIsAlive.load()) { cd.mIsAlive.store(false); } ValueWriterType mValueWriter; uint64_t mConnectionID; std::atomic<bool> mIsAlive{true}; static std::atomic<uint64_t> CONNECTION_ID_COUNTER; }; std::string mServiceAddr; VehiclePropValuePool mValueObjectPool; mutable std::shared_mutex mConnectionMutex; mutable std::shared_mutex mWriterMutex; std::list<ConnectionDescriptor> mValueStreamingConnections; }; std::atomic<uint64_t> GrpcVehicleServerImpl::ConnectionDescriptor::CONNECTION_ID_COUNTER = 0; static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() { // TODO(chenhaosjtuacm): get secured credentials here return ::grpc::InsecureServerCredentials(); } GrpcVehicleServerPtr makeGrpcVehicleServer(const std::string& addr) { return std::make_unique<GrpcVehicleServerImpl>(addr); } void GrpcVehicleServerImpl::Start() { ::grpc::ServerBuilder builder; builder.RegisterService(this); builder.AddListeningPort(mServiceAddr, getServerCredentials()); std::unique_ptr<::grpc::Server> server(builder.BuildAndStart()); server->Wait(); } void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) { vhal_proto::WrappedVehiclePropValue wrappedPropValue; proto_msg_converter::toProto(wrappedPropValue.mutable_value(), value); wrappedPropValue.set_update_status(updateStatus); std::shared_lock read_lock(mConnectionMutex); bool has_terminated_connections = 0; for (auto& connection : mValueStreamingConnections) { auto writeOK = connection.mValueWriter(wrappedPropValue); if (!writeOK) { LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << connection.mConnectionID; has_terminated_connections = true; connection.mIsAlive.store(false); } } if (!has_terminated_connections) { return; } read_lock.unlock(); std::unique_lock write_lock(mConnectionMutex); for (auto itr = mValueStreamingConnections.begin(); itr != mValueStreamingConnections.end();) { if (!itr->mIsAlive.load()) { itr = mValueStreamingConnections.erase(itr); } else { ++itr; } } } ::grpc::Status GrpcVehicleServerImpl::GetAllPropertyConfig( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) { auto configs = onGetAllPropertyConfig(); for (auto& config : configs) { vhal_proto::VehiclePropConfig protoConfig; proto_msg_converter::toProto(&protoConfig, config); if (!stream->Write(protoConfig)) { return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } } return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleServerImpl::SetProperty( ::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue, vhal_proto::VehicleHalCallStatus* status) { VehiclePropValue value; proto_msg_converter::fromProto(&value, wrappedPropValue->value()); auto set_status = static_cast<int32_t>(onSetProperty(value, wrappedPropValue->update_status())); if (!vhal_proto::VehicleHalStatusCode_IsValid(set_status)) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "Unknown status code"); } status->set_status_code(static_cast<vhal_proto::VehicleHalStatusCode>(set_status)); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleServerImpl::StartPropertyValuesStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) { std::mutex terminateMutex; std::condition_variable terminateCV; std::unique_lock<std::mutex> terminateLock(terminateMutex); bool terminated{false}; auto callBack = [stream, &terminateMutex, &terminateCV, &terminated, this](const vhal_proto::WrappedVehiclePropValue& value) { std::unique_lock lock(mWriterMutex); if (!stream->Write(value)) { std::unique_lock<std::mutex> terminateLock(terminateMutex); terminated = true; terminateLock.unlock(); terminateCV.notify_all(); return false; } return true; }; // Register connection std::unique_lock lock(mConnectionMutex); auto& conn = mValueStreamingConnections.emplace_back(std::move(callBack)); lock.unlock(); // Never stop until connection lost terminateCV.wait(terminateLock, [&terminated]() { return terminated; }); LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn.mConnectionID; return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } } // namespace impl } // namespace V2_0 } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android automotive/vehicle/2.0/default/impl/vhal_v2_0/virtualization/GrpcVehicleServer.h 0 → 100644 +49 −0 Original line number Diff line number Diff line /* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleServer_H_ #define android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleServer_H_ #include "vhal_v2_0/EmulatedVehicleConnector.h" namespace android { namespace hardware { namespace automotive { namespace vehicle { namespace V2_0 { namespace impl { // Connect to the Vehicle Client via GRPC class GrpcVehicleServer : public EmulatedVehicleServer { public: // Start listening incoming calls, should never return if working normally virtual void Start() = 0; }; using GrpcVehicleServerPtr = std::unique_ptr<GrpcVehicleServer>; GrpcVehicleServerPtr makeGrpcVehicleServer(const std::string& addr); } // namespace impl } // namespace V2_0 } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android #endif // android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleServer_H_ Loading
automotive/vehicle/2.0/default/impl/vhal_v2_0/proto/VehicleServer.proto +12 −2 Original line number Diff line number Diff line Loading @@ -35,13 +35,23 @@ message VehicleHalCallStatus { VehicleHalStatusCode status_code = 1; } message WrappedVehiclePropValue { VehiclePropValue value = 1; // An indicator on whether we should update the status of the property // - true: if the value is generated by (emulated/real) car, or; // if the value is injected to 'fake' a on car event (for debugging purpose) // - false: if the value is set by VHal (public interface), since Android // cannot change status of property on a real car bool update_status = 2; } service VehicleServer { rpc GetAllPropertyConfig(google.protobuf.Empty) returns (stream VehiclePropConfig) {} // Change the property value of the vehicle rpc SetProperty(VehiclePropValue) returns (VehicleHalCallStatus) {} rpc SetProperty(WrappedVehiclePropValue) returns (VehicleHalCallStatus) {} // Start a vehicle property value stream rpc StartPropertyValuesStream(google.protobuf.Empty) returns (stream VehiclePropValue) {} rpc StartPropertyValuesStream(google.protobuf.Empty) returns (stream WrappedVehiclePropValue) {} }
automotive/vehicle/2.0/default/impl/vhal_v2_0/virtualization/GrpcVehicleClient.cpp 0 → 100644 +162 −0 Original line number Diff line number Diff line /* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GrpcVehicleClient.h" #include <condition_variable> #include <mutex> #include <android-base/logging.h> #include <grpc++/grpc++.h> #include "VehicleServer.grpc.pb.h" #include "VehicleServer.pb.h" #include "vhal_v2_0/ProtoMessageConverter.h" namespace android { namespace hardware { namespace automotive { namespace vehicle { namespace V2_0 { namespace impl { static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() { // TODO(chenhaosjtuacm): get secured credentials here return ::grpc::InsecureChannelCredentials(); } class GrpcVehicleClientImpl : public EmulatedVehicleClient { public: GrpcVehicleClientImpl(const std::string& addr) : mServiceAddr(addr), mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())), mGrpcStub(vhal_proto::VehicleServer::NewStub(mGrpcChannel)) { StartValuePollingThread(); } ~GrpcVehicleClientImpl() { mShuttingDownFlag.store(true); mShutdownCV.notify_all(); if (mPollingThread.joinable()) { mPollingThread.join(); } } // methods from IVehicleClient std::vector<VehiclePropConfig> getAllPropertyConfig() const override; StatusCode setProperty(const VehiclePropValue& value, bool updateStatus) override; private: void StartValuePollingThread(); // private data members std::string mServiceAddr; std::shared_ptr<::grpc::Channel> mGrpcChannel; std::unique_ptr<vhal_proto::VehicleServer::Stub> mGrpcStub; std::thread mPollingThread; std::mutex mShutdownMutex; std::condition_variable mShutdownCV; std::atomic<bool> mShuttingDownFlag{false}; }; std::unique_ptr<EmulatedVehicleClient> makeGrpcVehicleClient(const std::string& addr) { return std::make_unique<GrpcVehicleClientImpl>(addr); } std::vector<VehiclePropConfig> GrpcVehicleClientImpl::getAllPropertyConfig() const { std::vector<VehiclePropConfig> configs; ::grpc::ClientContext context; auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty()); vhal_proto::VehiclePropConfig protoConfig; while (config_stream->Read(&protoConfig)) { VehiclePropConfig config; proto_msg_converter::fromProto(&config, protoConfig); configs.emplace_back(std::move(config)); } auto grpc_status = config_stream->Finish(); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message(); configs.clear(); } return configs; } StatusCode GrpcVehicleClientImpl::setProperty(const VehiclePropValue& value, bool updateStatus) { ::grpc::ClientContext context; vhal_proto::WrappedVehiclePropValue wrappedProtoValue; vhal_proto::VehicleHalCallStatus vhal_status; proto_msg_converter::toProto(wrappedProtoValue.mutable_value(), value); wrappedProtoValue.set_update_status(updateStatus); auto grpc_status = mGrpcStub->SetProperty(&context, wrappedProtoValue, &vhal_status); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC SetProperty Failed: " << grpc_status.error_message(); return StatusCode::INTERNAL_ERROR; } return static_cast<StatusCode>(vhal_status.status_code()); } void GrpcVehicleClientImpl::StartValuePollingThread() { mPollingThread = std::thread([this]() { while (!mShuttingDownFlag.load()) { ::grpc::ClientContext context; std::atomic<bool> rpc_ok{true}; std::thread shuttingdown_watcher([this, &rpc_ok, &context]() { std::unique_lock<std::mutex> shutdownLock(mShutdownMutex); mShutdownCV.wait(shutdownLock, [this, &rpc_ok]() { return !rpc_ok.load() || mShuttingDownFlag.load(); }); context.TryCancel(); }); auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty()); vhal_proto::WrappedVehiclePropValue wrappedProtoValue; while (!mShuttingDownFlag.load() && value_stream->Read(&wrappedProtoValue)) { VehiclePropValue value; proto_msg_converter::fromProto(&value, wrappedProtoValue.value()); onPropertyValue(value, wrappedProtoValue.update_status()); } rpc_ok.store(false); mShutdownCV.notify_all(); shuttingdown_watcher.join(); auto grpc_status = value_stream->Finish(); // never reach here until connection lost LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message(); // try to reconnect } }); } } // namespace impl } // namespace V2_0 } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android
automotive/vehicle/2.0/default/impl/vhal_v2_0/virtualization/GrpcVehicleClient.h 0 → 100644 +40 −0 Original line number Diff line number Diff line /* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleClient_H_ #define android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleClient_H_ #include "vhal_v2_0/EmulatedVehicleConnector.h" namespace android { namespace hardware { namespace automotive { namespace vehicle { namespace V2_0 { namespace impl { std::unique_ptr<EmulatedVehicleClient> makeGrpcVehicleClient(const std::string& addr); } // namespace impl } // namespace V2_0 } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android #endif // android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleClient_H_
automotive/vehicle/2.0/default/impl/vhal_v2_0/virtualization/GrpcVehicleServer.cpp 0 → 100644 +229 −0 Original line number Diff line number Diff line /* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GrpcVehicleServer.h" #include <condition_variable> #include <mutex> #include <shared_mutex> #include <android-base/logging.h> #include <grpc++/grpc++.h> #include "VehicleServer.grpc.pb.h" #include "VehicleServer.pb.h" #include "vhal_v2_0/ProtoMessageConverter.h" namespace android { namespace hardware { namespace automotive { namespace vehicle { namespace V2_0 { namespace impl { class GrpcVehicleServerImpl : public GrpcVehicleServer, public vhal_proto::VehicleServer::Service { public: GrpcVehicleServerImpl(const std::string& addr) : mServiceAddr(addr) { setValuePool(&mValueObjectPool); } // method from GrpcVehicleServer void Start() override; // method from IVehicleServer void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override; // methods from vhal_proto::VehicleServer::Service ::grpc::Status GetAllPropertyConfig( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) override; ::grpc::Status SetProperty(::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue, vhal_proto::VehicleHalCallStatus* status) override; ::grpc::Status StartPropertyValuesStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) override; private: // We keep long-lasting connection for streaming the prop values. // For us, each connection can be represented as a function to send the new value, and // an ID to identify this connection struct ConnectionDescriptor { using ValueWriterType = std::function<bool(const vhal_proto::WrappedVehiclePropValue&)>; ConnectionDescriptor(ValueWriterType&& value_writer) : mValueWriter(std::move(value_writer)), mConnectionID(CONNECTION_ID_COUNTER.fetch_add(1)) {} ConnectionDescriptor(const ConnectionDescriptor&) = delete; ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete; // This move constructor is NOT THREAD-SAFE, which means it cannot be moved // while using. Since the connection descriptors are pretected by mConnectionMutex // then we are fine here ConnectionDescriptor(ConnectionDescriptor&& cd) : mValueWriter(std::move(cd.mValueWriter)), mConnectionID(cd.mConnectionID), mIsAlive(cd.mIsAlive.load()) { cd.mIsAlive.store(false); } ValueWriterType mValueWriter; uint64_t mConnectionID; std::atomic<bool> mIsAlive{true}; static std::atomic<uint64_t> CONNECTION_ID_COUNTER; }; std::string mServiceAddr; VehiclePropValuePool mValueObjectPool; mutable std::shared_mutex mConnectionMutex; mutable std::shared_mutex mWriterMutex; std::list<ConnectionDescriptor> mValueStreamingConnections; }; std::atomic<uint64_t> GrpcVehicleServerImpl::ConnectionDescriptor::CONNECTION_ID_COUNTER = 0; static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() { // TODO(chenhaosjtuacm): get secured credentials here return ::grpc::InsecureServerCredentials(); } GrpcVehicleServerPtr makeGrpcVehicleServer(const std::string& addr) { return std::make_unique<GrpcVehicleServerImpl>(addr); } void GrpcVehicleServerImpl::Start() { ::grpc::ServerBuilder builder; builder.RegisterService(this); builder.AddListeningPort(mServiceAddr, getServerCredentials()); std::unique_ptr<::grpc::Server> server(builder.BuildAndStart()); server->Wait(); } void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) { vhal_proto::WrappedVehiclePropValue wrappedPropValue; proto_msg_converter::toProto(wrappedPropValue.mutable_value(), value); wrappedPropValue.set_update_status(updateStatus); std::shared_lock read_lock(mConnectionMutex); bool has_terminated_connections = 0; for (auto& connection : mValueStreamingConnections) { auto writeOK = connection.mValueWriter(wrappedPropValue); if (!writeOK) { LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << connection.mConnectionID; has_terminated_connections = true; connection.mIsAlive.store(false); } } if (!has_terminated_connections) { return; } read_lock.unlock(); std::unique_lock write_lock(mConnectionMutex); for (auto itr = mValueStreamingConnections.begin(); itr != mValueStreamingConnections.end();) { if (!itr->mIsAlive.load()) { itr = mValueStreamingConnections.erase(itr); } else { ++itr; } } } ::grpc::Status GrpcVehicleServerImpl::GetAllPropertyConfig( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) { auto configs = onGetAllPropertyConfig(); for (auto& config : configs) { vhal_proto::VehiclePropConfig protoConfig; proto_msg_converter::toProto(&protoConfig, config); if (!stream->Write(protoConfig)) { return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } } return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleServerImpl::SetProperty( ::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue, vhal_proto::VehicleHalCallStatus* status) { VehiclePropValue value; proto_msg_converter::fromProto(&value, wrappedPropValue->value()); auto set_status = static_cast<int32_t>(onSetProperty(value, wrappedPropValue->update_status())); if (!vhal_proto::VehicleHalStatusCode_IsValid(set_status)) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "Unknown status code"); } status->set_status_code(static_cast<vhal_proto::VehicleHalStatusCode>(set_status)); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleServerImpl::StartPropertyValuesStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) { std::mutex terminateMutex; std::condition_variable terminateCV; std::unique_lock<std::mutex> terminateLock(terminateMutex); bool terminated{false}; auto callBack = [stream, &terminateMutex, &terminateCV, &terminated, this](const vhal_proto::WrappedVehiclePropValue& value) { std::unique_lock lock(mWriterMutex); if (!stream->Write(value)) { std::unique_lock<std::mutex> terminateLock(terminateMutex); terminated = true; terminateLock.unlock(); terminateCV.notify_all(); return false; } return true; }; // Register connection std::unique_lock lock(mConnectionMutex); auto& conn = mValueStreamingConnections.emplace_back(std::move(callBack)); lock.unlock(); // Never stop until connection lost terminateCV.wait(terminateLock, [&terminated]() { return terminated; }); LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn.mConnectionID; return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } } // namespace impl } // namespace V2_0 } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android
automotive/vehicle/2.0/default/impl/vhal_v2_0/virtualization/GrpcVehicleServer.h 0 → 100644 +49 −0 Original line number Diff line number Diff line /* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleServer_H_ #define android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleServer_H_ #include "vhal_v2_0/EmulatedVehicleConnector.h" namespace android { namespace hardware { namespace automotive { namespace vehicle { namespace V2_0 { namespace impl { // Connect to the Vehicle Client via GRPC class GrpcVehicleServer : public EmulatedVehicleServer { public: // Start listening incoming calls, should never return if working normally virtual void Start() = 0; }; using GrpcVehicleServerPtr = std::unique_ptr<GrpcVehicleServer>; GrpcVehicleServerPtr makeGrpcVehicleServer(const std::string& addr); } // namespace impl } // namespace V2_0 } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android #endif // android_hardware_automotive_vehicle_V2_0_impl_virtialization_GrpcVehicleServer_H_