Loading system/gd/grpc/async_grpc.hdeleted 100644 → 0 +0 −165 Original line number Diff line number Diff line /* * Copyright 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <functional> #include <future> #include <memory> #include <mutex> #include <grpc++/grpc++.h> #include "os/log.h" namespace bluetooth { namespace grpc { // To be passed to gRPC async invocations as tag. // Function is called when the CompletionQueue.Next() returns this tag. // Then, user needs to delete this object. using GrpcAsyncEventCallback = std::function<void(bool)>; template <typename REQ, typename RES> class GrpcAsyncServerStreamingHandler { public: virtual ~GrpcAsyncServerStreamingHandler() = default; // Implementation for requesting the next specific type RPC, using provided parameters. virtual void OnReadyForNextRequest(::grpc::ServerContext*, REQ* req, ::grpc::ServerAsyncWriter<RES>* res, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void* tag) = 0; virtual void OnRpcRequestReceived(REQ req) = 0; virtual void OnRpcRequestFailed() {} virtual void OnRpcFinished() {} virtual void OnWriteSuccess() {} }; // Provides API to upper layer users to control (request, write, finish) a server-streaming asynchronous RPC. // When each API is done, callback will be sent to the given GrpcAsyncServerStreamingHandler. // Each control box can take one active RPC at one time. // TODO: problems with this control box: // 1. RequestNewRpc is async, but Write and Stop is blocking users. Do we want to do this? // 2. Callback to user is done in the gRPC thread. Let's create a pool thread to give it to user? // 3. Currently it uses promise to synchronize between events. If we use os/handler it should be easier. template <typename REQ, typename RES> class GrpcAsyncServerStreamingControlBox { public: GrpcAsyncServerStreamingControlBox(GrpcAsyncServerStreamingHandler<REQ, RES>* async_handler, ::grpc::ServerCompletionQueue* cq) : async_handler_(async_handler), cq_(cq) {} void RequestNewRpc() { ASSERT(my_state_ == MyState::IDLE); context_ = std::make_unique<::grpc::ServerContext>(); req_ = std::make_unique<REQ>(); res_ = std::make_unique<::grpc::ServerAsyncWriter<RES>>(context_.get()); request_done_ = std::make_unique<GrpcAsyncEventCallback>([this](bool ok) { this->RequestDone(ok); }); async_handler_->OnReadyForNextRequest(context_.get(), req_.get(), res_.get(), cq_, cq_, request_done_.get()); my_state_ = MyState::REQUESTING; } void Write(const RES& res) { std::unique_lock<std::mutex> lock(mutex_); if (my_state_ == MyState::IDLE || my_state_ == MyState::REQUESTING) { LOG_INFO("stream already stopped"); return; } ASSERT(my_state_ == MyState::OPEN); write_done_ = std::make_unique<GrpcAsyncEventCallback>([this](bool ok) { this->WriteDone(ok); }); my_state_ = MyState::WRITING; res_->Write(res, write_done_.get()); promise_ = new std::promise<void>(); auto future = promise_->get_future(); future.wait(); } void StopStreaming() { std::unique_lock<std::mutex> lock(mutex_); ASSERT(my_state_ == MyState::OPEN); rpc_finish_ = std::make_unique<GrpcAsyncEventCallback>([this](bool ok) { this->RpcFinish(ok); }); my_state_ = MyState::FINISHING; res_->Finish(::grpc::Status::OK, rpc_finish_.get()); promise_ = new std::promise<void>(); auto future = promise_->get_future(); future.wait(); } private: void RequestDone(bool ok) { ASSERT(my_state_ == MyState::REQUESTING); if (ok) { async_handler_->OnRpcRequestReceived(*req_); my_state_ = MyState::OPEN; } else { clean_up(); async_handler_->OnRpcRequestFailed(); my_state_ = MyState::IDLE; } } void WriteDone(bool ok) { ASSERT(my_state_ == MyState::WRITING); if (ok) { my_state_ = MyState::OPEN; async_handler_->OnWriteSuccess(); } else { clean_up(); my_state_ = MyState::IDLE; async_handler_->OnRpcFinished(); } promise_->set_value(); } void RpcFinish(bool ok) { ASSERT(ok); ASSERT(my_state_ == MyState::FINISHING); clean_up(); my_state_ = MyState::IDLE; async_handler_->OnRpcFinished(); promise_->set_value(); } void clean_up() { context_ = nullptr; req_ = nullptr; res_ = nullptr; } mutable std::mutex mutex_; std::promise<void>* promise_ = nullptr; GrpcAsyncServerStreamingHandler<REQ, RES>* async_handler_; ::grpc::ServerCompletionQueue* cq_; std::unique_ptr<::grpc::ServerContext> context_ = nullptr; std::unique_ptr<REQ> req_ = nullptr; std::unique_ptr<::grpc::ServerAsyncWriter<RES>> res_ = nullptr; std::unique_ptr<GrpcAsyncEventCallback> request_done_ = nullptr; std::unique_ptr<GrpcAsyncEventCallback> write_done_ = nullptr; std::unique_ptr<GrpcAsyncEventCallback> rpc_finish_ = nullptr; enum class MyState { IDLE, REQUESTING, OPEN, WRITING, FINISHING } my_state_ = MyState::IDLE; }; } // namespace grpc } // namespace bluetooth system/gd/grpc/grpc_module.cc +1 −4 Original line number Diff line number Diff line Loading @@ -17,7 +17,6 @@ #include "grpc/grpc_module.h" #include "os/log.h" #include "grpc/async_grpc.h" using ::grpc::Server; using ::grpc::ServerBuilder; Loading Loading @@ -53,7 +52,7 @@ void GrpcModule::StartServer(const std::string& address, int port) { ASSERT(server_ != nullptr); for (const auto& facade : facades_) { facade->OnServerStarted(completion_queue_.get()); facade->OnServerStarted(); } } Loading Loading @@ -97,8 +96,6 @@ void GrpcModule::RunGrpcLoop() { LOG_INFO("gRPC is shutdown"); break; } auto* data = static_cast<GrpcAsyncEventCallback*>(tag); (*data)(ok); } } Loading system/gd/grpc/grpc_module.h +1 −1 Original line number Diff line number Diff line Loading @@ -69,7 +69,7 @@ class GrpcFacadeModule : public ::bluetooth::Module { virtual ::grpc::Service* GetService() const = 0; virtual void OnServerStarted(::grpc::ServerCompletionQueue* cq) {} virtual void OnServerStarted() {} virtual void OnServerStopped() {} Loading Loading
system/gd/grpc/async_grpc.hdeleted 100644 → 0 +0 −165 Original line number Diff line number Diff line /* * Copyright 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <functional> #include <future> #include <memory> #include <mutex> #include <grpc++/grpc++.h> #include "os/log.h" namespace bluetooth { namespace grpc { // To be passed to gRPC async invocations as tag. // Function is called when the CompletionQueue.Next() returns this tag. // Then, user needs to delete this object. using GrpcAsyncEventCallback = std::function<void(bool)>; template <typename REQ, typename RES> class GrpcAsyncServerStreamingHandler { public: virtual ~GrpcAsyncServerStreamingHandler() = default; // Implementation for requesting the next specific type RPC, using provided parameters. virtual void OnReadyForNextRequest(::grpc::ServerContext*, REQ* req, ::grpc::ServerAsyncWriter<RES>* res, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void* tag) = 0; virtual void OnRpcRequestReceived(REQ req) = 0; virtual void OnRpcRequestFailed() {} virtual void OnRpcFinished() {} virtual void OnWriteSuccess() {} }; // Provides API to upper layer users to control (request, write, finish) a server-streaming asynchronous RPC. // When each API is done, callback will be sent to the given GrpcAsyncServerStreamingHandler. // Each control box can take one active RPC at one time. // TODO: problems with this control box: // 1. RequestNewRpc is async, but Write and Stop is blocking users. Do we want to do this? // 2. Callback to user is done in the gRPC thread. Let's create a pool thread to give it to user? // 3. Currently it uses promise to synchronize between events. If we use os/handler it should be easier. template <typename REQ, typename RES> class GrpcAsyncServerStreamingControlBox { public: GrpcAsyncServerStreamingControlBox(GrpcAsyncServerStreamingHandler<REQ, RES>* async_handler, ::grpc::ServerCompletionQueue* cq) : async_handler_(async_handler), cq_(cq) {} void RequestNewRpc() { ASSERT(my_state_ == MyState::IDLE); context_ = std::make_unique<::grpc::ServerContext>(); req_ = std::make_unique<REQ>(); res_ = std::make_unique<::grpc::ServerAsyncWriter<RES>>(context_.get()); request_done_ = std::make_unique<GrpcAsyncEventCallback>([this](bool ok) { this->RequestDone(ok); }); async_handler_->OnReadyForNextRequest(context_.get(), req_.get(), res_.get(), cq_, cq_, request_done_.get()); my_state_ = MyState::REQUESTING; } void Write(const RES& res) { std::unique_lock<std::mutex> lock(mutex_); if (my_state_ == MyState::IDLE || my_state_ == MyState::REQUESTING) { LOG_INFO("stream already stopped"); return; } ASSERT(my_state_ == MyState::OPEN); write_done_ = std::make_unique<GrpcAsyncEventCallback>([this](bool ok) { this->WriteDone(ok); }); my_state_ = MyState::WRITING; res_->Write(res, write_done_.get()); promise_ = new std::promise<void>(); auto future = promise_->get_future(); future.wait(); } void StopStreaming() { std::unique_lock<std::mutex> lock(mutex_); ASSERT(my_state_ == MyState::OPEN); rpc_finish_ = std::make_unique<GrpcAsyncEventCallback>([this](bool ok) { this->RpcFinish(ok); }); my_state_ = MyState::FINISHING; res_->Finish(::grpc::Status::OK, rpc_finish_.get()); promise_ = new std::promise<void>(); auto future = promise_->get_future(); future.wait(); } private: void RequestDone(bool ok) { ASSERT(my_state_ == MyState::REQUESTING); if (ok) { async_handler_->OnRpcRequestReceived(*req_); my_state_ = MyState::OPEN; } else { clean_up(); async_handler_->OnRpcRequestFailed(); my_state_ = MyState::IDLE; } } void WriteDone(bool ok) { ASSERT(my_state_ == MyState::WRITING); if (ok) { my_state_ = MyState::OPEN; async_handler_->OnWriteSuccess(); } else { clean_up(); my_state_ = MyState::IDLE; async_handler_->OnRpcFinished(); } promise_->set_value(); } void RpcFinish(bool ok) { ASSERT(ok); ASSERT(my_state_ == MyState::FINISHING); clean_up(); my_state_ = MyState::IDLE; async_handler_->OnRpcFinished(); promise_->set_value(); } void clean_up() { context_ = nullptr; req_ = nullptr; res_ = nullptr; } mutable std::mutex mutex_; std::promise<void>* promise_ = nullptr; GrpcAsyncServerStreamingHandler<REQ, RES>* async_handler_; ::grpc::ServerCompletionQueue* cq_; std::unique_ptr<::grpc::ServerContext> context_ = nullptr; std::unique_ptr<REQ> req_ = nullptr; std::unique_ptr<::grpc::ServerAsyncWriter<RES>> res_ = nullptr; std::unique_ptr<GrpcAsyncEventCallback> request_done_ = nullptr; std::unique_ptr<GrpcAsyncEventCallback> write_done_ = nullptr; std::unique_ptr<GrpcAsyncEventCallback> rpc_finish_ = nullptr; enum class MyState { IDLE, REQUESTING, OPEN, WRITING, FINISHING } my_state_ = MyState::IDLE; }; } // namespace grpc } // namespace bluetooth
system/gd/grpc/grpc_module.cc +1 −4 Original line number Diff line number Diff line Loading @@ -17,7 +17,6 @@ #include "grpc/grpc_module.h" #include "os/log.h" #include "grpc/async_grpc.h" using ::grpc::Server; using ::grpc::ServerBuilder; Loading Loading @@ -53,7 +52,7 @@ void GrpcModule::StartServer(const std::string& address, int port) { ASSERT(server_ != nullptr); for (const auto& facade : facades_) { facade->OnServerStarted(completion_queue_.get()); facade->OnServerStarted(); } } Loading Loading @@ -97,8 +96,6 @@ void GrpcModule::RunGrpcLoop() { LOG_INFO("gRPC is shutdown"); break; } auto* data = static_cast<GrpcAsyncEventCallback*>(tag); (*data)(ok); } } Loading
system/gd/grpc/grpc_module.h +1 −1 Original line number Diff line number Diff line Loading @@ -69,7 +69,7 @@ class GrpcFacadeModule : public ::bluetooth::Module { virtual ::grpc::Service* GetService() const = 0; virtual void OnServerStarted(::grpc::ServerCompletionQueue* cq) {} virtual void OnServerStarted() {} virtual void OnServerStopped() {} Loading