Loading libs/binder/RpcServer.cpp +34 −8 Original line number Diff line number Diff line Loading @@ -110,6 +110,10 @@ size_t RpcServer::getMaxThreads() { return mMaxThreads; } void RpcServer::setProtocolVersion(uint32_t version) { mProtocolVersion = version; } void RpcServer::setRootObject(const sp<IBinder>& binder) { std::lock_guard<std::mutex> _l(mLock); mRootObjectWeak = mRootObject = binder; Loading Loading @@ -245,13 +249,37 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie RpcConnectionHeader header; status_t status = server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &header, sizeof(header)); bool idValid = status == OK; if (!idValid) { if (status != OK) { ALOGE("Failed to read ID for client connecting to RPC server: %s", statusToString(status).c_str()); // still need to cleanup before we can return } bool incoming = header.options & RPC_CONNECTION_OPTION_INCOMING; bool incoming = false; uint32_t protocolVersion = 0; RpcAddress sessionId = RpcAddress::zero(); bool requestingNewSession = false; if (status == OK) { incoming = header.options & RPC_CONNECTION_OPTION_INCOMING; protocolVersion = std::min(header.version, server->mProtocolVersion.value_or(RPC_WIRE_PROTOCOL_VERSION)); sessionId = RpcAddress::fromRawEmbedded(&header.sessionId); requestingNewSession = sessionId.isZero(); if (requestingNewSession) { RpcNewSessionResponse response{ .version = protocolVersion, }; status = server->mShutdownTrigger->interruptableWriteFully(clientFd.get(), &response, sizeof(response)); if (status != OK) { ALOGE("Failed to send new session response: %s", statusToString(status).c_str()); // still need to cleanup before we can return } } } std::thread thisThread; sp<RpcSession> session; Loading @@ -269,19 +297,16 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie }; server->mConnectingThreads.erase(threadId); if (!idValid || server->mShutdownTrigger->isTriggered()) { if (status != OK || server->mShutdownTrigger->isTriggered()) { return; } RpcAddress sessionId = RpcAddress::fromRawEmbedded(&header.sessionId); if (sessionId.isZero()) { if (requestingNewSession) { if (incoming) { ALOGE("Cannot create a new session with an incoming connection, would leak"); return; } sessionId = RpcAddress::zero(); size_t tries = 0; do { // don't block if there is some entropy issue Loading @@ -295,6 +320,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie session = RpcSession::make(); session->setMaxThreads(server->mMaxThreads); if (!session->setProtocolVersion(protocolVersion)) return; if (!session->setForServer(server, sp<RpcServer::EventListener>::fromExisting( static_cast<RpcServer::EventListener*>( Loading libs/binder/RpcSession.cpp +35 −1 Original line number Diff line number Diff line Loading @@ -77,6 +77,25 @@ size_t RpcSession::getMaxThreads() { return mMaxThreads; } bool RpcSession::setProtocolVersion(uint32_t version) { if (version >= RPC_WIRE_PROTOCOL_VERSION_NEXT && version != RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL) { ALOGE("Cannot start RPC session with version %u which is unknown (current protocol version " "is %u).", version, RPC_WIRE_PROTOCOL_VERSION); return false; } std::lock_guard<std::mutex> _l(mMutex); mProtocolVersion = version; return true; } std::optional<uint32_t> RpcSession::getProtocolVersion() { std::lock_guard<std::mutex> _l(mMutex); return mProtocolVersion; } bool RpcSession::setupUnixDomainClient(const char* path) { return setupSocketClient(UnixSocketAddress(path)); } Loading Loading @@ -424,6 +443,18 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*incoming*/)) return false; { ExclusiveConnection connection; status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT, &connection); if (status != OK) return false; uint32_t version; status = state()->readNewSessionResponse(connection.get(), sp<RpcSession>::fromExisting(this), &version); if (!setProtocolVersion(version)) return false; } // TODO(b/189955605): we should add additional sessions dynamically // instead of all at once. // TODO(b/186470974): first risk of blocking Loading Loading @@ -484,7 +515,10 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp return false; } RpcConnectionHeader header{.options = 0}; RpcConnectionHeader header{ .version = mProtocolVersion.value_or(RPC_WIRE_PROTOCOL_VERSION), .options = 0, }; memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress)); if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING; Loading libs/binder/RpcState.cpp +12 −0 Original line number Diff line number Diff line Loading @@ -315,6 +315,18 @@ status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection, return OK; } status_t RpcState::readNewSessionResponse(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, uint32_t* version) { RpcNewSessionResponse response; if (status_t status = rpcRec(connection, session, "new session response", &response, sizeof(response)); status != OK) { return status; } *version = response.version; return OK; } status_t RpcState::sendConnectionInit(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session) { RpcOutgoingConnectionInit init{ Loading libs/binder/RpcState.h +2 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,8 @@ public: RpcState(); ~RpcState(); status_t readNewSessionResponse(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, uint32_t* version); status_t sendConnectionInit(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session); status_t readConnectionInit(const sp<RpcSession::RpcConnection>& connection, Loading libs/binder/RpcWireFormat.h +12 −1 Original line number Diff line number Diff line Loading @@ -37,9 +37,20 @@ struct RpcWireAddress { * either as part of a new session or an existing session */ struct RpcConnectionHeader { uint32_t version; // maximum supported by caller uint8_t reserver0[4]; RpcWireAddress sessionId; uint8_t options; uint8_t reserved[7]; uint8_t reserved1[7]; }; /** * In response to an RpcConnectionHeader which corresponds to a new session, * this returns information to the server. */ struct RpcNewSessionResponse { uint32_t version; // maximum supported by callee <= maximum supported by caller uint8_t reserved[4]; }; #define RPC_CONNECTION_INIT_OKAY "cci" Loading Loading
libs/binder/RpcServer.cpp +34 −8 Original line number Diff line number Diff line Loading @@ -110,6 +110,10 @@ size_t RpcServer::getMaxThreads() { return mMaxThreads; } void RpcServer::setProtocolVersion(uint32_t version) { mProtocolVersion = version; } void RpcServer::setRootObject(const sp<IBinder>& binder) { std::lock_guard<std::mutex> _l(mLock); mRootObjectWeak = mRootObject = binder; Loading Loading @@ -245,13 +249,37 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie RpcConnectionHeader header; status_t status = server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &header, sizeof(header)); bool idValid = status == OK; if (!idValid) { if (status != OK) { ALOGE("Failed to read ID for client connecting to RPC server: %s", statusToString(status).c_str()); // still need to cleanup before we can return } bool incoming = header.options & RPC_CONNECTION_OPTION_INCOMING; bool incoming = false; uint32_t protocolVersion = 0; RpcAddress sessionId = RpcAddress::zero(); bool requestingNewSession = false; if (status == OK) { incoming = header.options & RPC_CONNECTION_OPTION_INCOMING; protocolVersion = std::min(header.version, server->mProtocolVersion.value_or(RPC_WIRE_PROTOCOL_VERSION)); sessionId = RpcAddress::fromRawEmbedded(&header.sessionId); requestingNewSession = sessionId.isZero(); if (requestingNewSession) { RpcNewSessionResponse response{ .version = protocolVersion, }; status = server->mShutdownTrigger->interruptableWriteFully(clientFd.get(), &response, sizeof(response)); if (status != OK) { ALOGE("Failed to send new session response: %s", statusToString(status).c_str()); // still need to cleanup before we can return } } } std::thread thisThread; sp<RpcSession> session; Loading @@ -269,19 +297,16 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie }; server->mConnectingThreads.erase(threadId); if (!idValid || server->mShutdownTrigger->isTriggered()) { if (status != OK || server->mShutdownTrigger->isTriggered()) { return; } RpcAddress sessionId = RpcAddress::fromRawEmbedded(&header.sessionId); if (sessionId.isZero()) { if (requestingNewSession) { if (incoming) { ALOGE("Cannot create a new session with an incoming connection, would leak"); return; } sessionId = RpcAddress::zero(); size_t tries = 0; do { // don't block if there is some entropy issue Loading @@ -295,6 +320,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie session = RpcSession::make(); session->setMaxThreads(server->mMaxThreads); if (!session->setProtocolVersion(protocolVersion)) return; if (!session->setForServer(server, sp<RpcServer::EventListener>::fromExisting( static_cast<RpcServer::EventListener*>( Loading
libs/binder/RpcSession.cpp +35 −1 Original line number Diff line number Diff line Loading @@ -77,6 +77,25 @@ size_t RpcSession::getMaxThreads() { return mMaxThreads; } bool RpcSession::setProtocolVersion(uint32_t version) { if (version >= RPC_WIRE_PROTOCOL_VERSION_NEXT && version != RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL) { ALOGE("Cannot start RPC session with version %u which is unknown (current protocol version " "is %u).", version, RPC_WIRE_PROTOCOL_VERSION); return false; } std::lock_guard<std::mutex> _l(mMutex); mProtocolVersion = version; return true; } std::optional<uint32_t> RpcSession::getProtocolVersion() { std::lock_guard<std::mutex> _l(mMutex); return mProtocolVersion; } bool RpcSession::setupUnixDomainClient(const char* path) { return setupSocketClient(UnixSocketAddress(path)); } Loading Loading @@ -424,6 +443,18 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*incoming*/)) return false; { ExclusiveConnection connection; status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT, &connection); if (status != OK) return false; uint32_t version; status = state()->readNewSessionResponse(connection.get(), sp<RpcSession>::fromExisting(this), &version); if (!setProtocolVersion(version)) return false; } // TODO(b/189955605): we should add additional sessions dynamically // instead of all at once. // TODO(b/186470974): first risk of blocking Loading Loading @@ -484,7 +515,10 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp return false; } RpcConnectionHeader header{.options = 0}; RpcConnectionHeader header{ .version = mProtocolVersion.value_or(RPC_WIRE_PROTOCOL_VERSION), .options = 0, }; memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress)); if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING; Loading
libs/binder/RpcState.cpp +12 −0 Original line number Diff line number Diff line Loading @@ -315,6 +315,18 @@ status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection, return OK; } status_t RpcState::readNewSessionResponse(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, uint32_t* version) { RpcNewSessionResponse response; if (status_t status = rpcRec(connection, session, "new session response", &response, sizeof(response)); status != OK) { return status; } *version = response.version; return OK; } status_t RpcState::sendConnectionInit(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session) { RpcOutgoingConnectionInit init{ Loading
libs/binder/RpcState.h +2 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,8 @@ public: RpcState(); ~RpcState(); status_t readNewSessionResponse(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, uint32_t* version); status_t sendConnectionInit(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session); status_t readConnectionInit(const sp<RpcSession::RpcConnection>& connection, Loading
libs/binder/RpcWireFormat.h +12 −1 Original line number Diff line number Diff line Loading @@ -37,9 +37,20 @@ struct RpcWireAddress { * either as part of a new session or an existing session */ struct RpcConnectionHeader { uint32_t version; // maximum supported by caller uint8_t reserver0[4]; RpcWireAddress sessionId; uint8_t options; uint8_t reserved[7]; uint8_t reserved1[7]; }; /** * In response to an RpcConnectionHeader which corresponds to a new session, * this returns information to the server. */ struct RpcNewSessionResponse { uint32_t version; // maximum supported by callee <= maximum supported by caller uint8_t reserved[4]; }; #define RPC_CONNECTION_INIT_OKAY "cci" Loading