Loading libs/binder/RpcServer.cpp +1 −0 Original line number Original line Diff line number Diff line Loading @@ -280,6 +280,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie server->mSessionIdCounter++; server->mSessionIdCounter++; session = RpcSession::make(); session = RpcSession::make(); session->setMaxThreads(server->mMaxThreads); session->setForServer(server, session->setForServer(server, sp<RpcServer::EventListener>::fromExisting( sp<RpcServer::EventListener>::fromExisting( static_cast<RpcServer::EventListener*>(server.get())), static_cast<RpcServer::EventListener*>(server.get())), Loading libs/binder/RpcSession.cpp +13 −10 Original line number Original line Diff line number Diff line Loading @@ -59,15 +59,18 @@ sp<RpcSession> RpcSession::make() { return sp<RpcSession>::make(); return sp<RpcSession>::make(); } } void RpcSession::setMaxReverseConnections(size_t connections) { void RpcSession::setMaxThreads(size_t threads) { { std::lock_guard<std::mutex> _l(mMutex); std::lock_guard<std::mutex> _l(mMutex); LOG_ALWAYS_FATAL_IF(mClientConnections.size() != 0, LOG_ALWAYS_FATAL_IF(!mClientConnections.empty() || !mServerConnections.empty(), "Must setup reverse connections before setting up client connections, " "Must set max threads before setting up connections, but has %zu client(s) " "but already has %zu clients", "and %zu server(s)", mClientConnections.size()); mClientConnections.size(), mServerConnections.size()); mMaxThreads = threads; } } mMaxReverseConnections = connections; size_t RpcSession::getMaxThreads() { std::lock_guard<std::mutex> _l(mMutex); return mMaxThreads; } } bool RpcSession::setupUnixDomainClient(const char* path) { bool RpcSession::setupUnixDomainClient(const char* path) { Loading Loading @@ -309,7 +312,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { // requested to be set) in order to allow the other side to reliably make // requested to be set) in order to allow the other side to reliably make // any requests at all. // any requests at all. for (size_t i = 0; i < mMaxReverseConnections; i++) { for (size_t i = 0; i < mMaxThreads; i++) { if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false; if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false; } } Loading libs/binder/RpcState.cpp +26 −26 Original line number Original line Diff line number Diff line Loading @@ -625,28 +625,26 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R } else { } else { LOG_RPC_DETAIL("Got special transaction %u", transaction->code); LOG_RPC_DETAIL("Got special transaction %u", transaction->code); sp<RpcServer> server = session->server().promote(); if (server) { // special case for 'zero' address (special server commands) switch (transaction->code) { switch (transaction->code) { case RPC_SPECIAL_TRANSACT_GET_ROOT: { replyStatus = reply.writeStrongBinder(server->getRootObject()); break; } case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: { case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: { replyStatus = reply.writeInt32(server->getMaxThreads()); replyStatus = reply.writeInt32(session->getMaxThreads()); break; break; } } case RPC_SPECIAL_TRANSACT_GET_SESSION_ID: { case RPC_SPECIAL_TRANSACT_GET_SESSION_ID: { // only sessions w/ services can be the source of a // for client connections, this should always report the value // session ID (so still guarded by non-null server) // originally returned from the server // // sessions associated with servers must have an ID // (hence abort) int32_t id = session->mId.value(); int32_t id = session->mId.value(); replyStatus = reply.writeInt32(id); replyStatus = reply.writeInt32(id); break; break; } } default: { sp<RpcServer> server = session->server().promote(); if (server) { switch (transaction->code) { case RPC_SPECIAL_TRANSACT_GET_ROOT: { replyStatus = reply.writeStrongBinder(server->getRootObject()); break; } default: { default: { replyStatus = UNKNOWN_TRANSACTION; replyStatus = UNKNOWN_TRANSACTION; } } Loading @@ -656,6 +654,8 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R } } } } } } } } if (transaction->flags & IBinder::FLAG_ONEWAY) { if (transaction->flags & IBinder::FLAG_ONEWAY) { if (replyStatus != OK) { if (replyStatus != OK) { Loading libs/binder/include/binder/RpcSession.h +6 −5 Original line number Original line Diff line number Diff line Loading @@ -47,16 +47,17 @@ public: static sp<RpcSession> make(); static sp<RpcSession> make(); /** /** * Set the maximum number of reverse connections allowed to be made (for * Set the maximum number of threads allowed to be made (for things like callbacks). * things like callbacks). By default, this is 0. This must be called before * By default, this is 0. This must be called before setting up this connection as a client. * setting up this connection as a client. * Server sessions will inherits this value from RpcServer. * * * If this is called, 'shutdown' on this session must also be called. * If this is called, 'shutdown' on this session must also be called. * Otherwise, a threadpool will leak. * Otherwise, a threadpool will leak. * * * TODO(b/185167543): start these dynamically * TODO(b/185167543): start these dynamically */ */ void setMaxReverseConnections(size_t connections); void setMaxThreads(size_t threads); size_t getMaxThreads(); /** /** * This should be called once per thread, matching 'join' in the remote * This should be called once per thread, matching 'join' in the remote Loading Loading @@ -257,7 +258,7 @@ private: std::mutex mMutex; // for all below std::mutex mMutex; // for all below size_t mMaxReverseConnections = 0; size_t mMaxThreads = 0; std::condition_variable mAvailableConnectionCv; // for mWaitingThreads std::condition_variable mAvailableConnectionCv; // for mWaitingThreads size_t mWaitingThreads = 0; size_t mWaitingThreads = 0; Loading libs/binder/tests/binderRpcTest.cpp +1 −1 Original line number Original line Diff line number Diff line Loading @@ -446,7 +446,7 @@ public: for (size_t i = 0; i < numSessions; i++) { for (size_t i = 0; i < numSessions; i++) { sp<RpcSession> session = RpcSession::make(); sp<RpcSession> session = RpcSession::make(); session->setMaxReverseConnections(numReverseConnections); session->setMaxThreads(numReverseConnections); switch (socketType) { switch (socketType) { case SocketType::UNIX: case SocketType::UNIX: Loading Loading
libs/binder/RpcServer.cpp +1 −0 Original line number Original line Diff line number Diff line Loading @@ -280,6 +280,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie server->mSessionIdCounter++; server->mSessionIdCounter++; session = RpcSession::make(); session = RpcSession::make(); session->setMaxThreads(server->mMaxThreads); session->setForServer(server, session->setForServer(server, sp<RpcServer::EventListener>::fromExisting( sp<RpcServer::EventListener>::fromExisting( static_cast<RpcServer::EventListener*>(server.get())), static_cast<RpcServer::EventListener*>(server.get())), Loading
libs/binder/RpcSession.cpp +13 −10 Original line number Original line Diff line number Diff line Loading @@ -59,15 +59,18 @@ sp<RpcSession> RpcSession::make() { return sp<RpcSession>::make(); return sp<RpcSession>::make(); } } void RpcSession::setMaxReverseConnections(size_t connections) { void RpcSession::setMaxThreads(size_t threads) { { std::lock_guard<std::mutex> _l(mMutex); std::lock_guard<std::mutex> _l(mMutex); LOG_ALWAYS_FATAL_IF(mClientConnections.size() != 0, LOG_ALWAYS_FATAL_IF(!mClientConnections.empty() || !mServerConnections.empty(), "Must setup reverse connections before setting up client connections, " "Must set max threads before setting up connections, but has %zu client(s) " "but already has %zu clients", "and %zu server(s)", mClientConnections.size()); mClientConnections.size(), mServerConnections.size()); mMaxThreads = threads; } } mMaxReverseConnections = connections; size_t RpcSession::getMaxThreads() { std::lock_guard<std::mutex> _l(mMutex); return mMaxThreads; } } bool RpcSession::setupUnixDomainClient(const char* path) { bool RpcSession::setupUnixDomainClient(const char* path) { Loading Loading @@ -309,7 +312,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { // requested to be set) in order to allow the other side to reliably make // requested to be set) in order to allow the other side to reliably make // any requests at all. // any requests at all. for (size_t i = 0; i < mMaxReverseConnections; i++) { for (size_t i = 0; i < mMaxThreads; i++) { if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false; if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false; } } Loading
libs/binder/RpcState.cpp +26 −26 Original line number Original line Diff line number Diff line Loading @@ -625,28 +625,26 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R } else { } else { LOG_RPC_DETAIL("Got special transaction %u", transaction->code); LOG_RPC_DETAIL("Got special transaction %u", transaction->code); sp<RpcServer> server = session->server().promote(); if (server) { // special case for 'zero' address (special server commands) switch (transaction->code) { switch (transaction->code) { case RPC_SPECIAL_TRANSACT_GET_ROOT: { replyStatus = reply.writeStrongBinder(server->getRootObject()); break; } case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: { case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: { replyStatus = reply.writeInt32(server->getMaxThreads()); replyStatus = reply.writeInt32(session->getMaxThreads()); break; break; } } case RPC_SPECIAL_TRANSACT_GET_SESSION_ID: { case RPC_SPECIAL_TRANSACT_GET_SESSION_ID: { // only sessions w/ services can be the source of a // for client connections, this should always report the value // session ID (so still guarded by non-null server) // originally returned from the server // // sessions associated with servers must have an ID // (hence abort) int32_t id = session->mId.value(); int32_t id = session->mId.value(); replyStatus = reply.writeInt32(id); replyStatus = reply.writeInt32(id); break; break; } } default: { sp<RpcServer> server = session->server().promote(); if (server) { switch (transaction->code) { case RPC_SPECIAL_TRANSACT_GET_ROOT: { replyStatus = reply.writeStrongBinder(server->getRootObject()); break; } default: { default: { replyStatus = UNKNOWN_TRANSACTION; replyStatus = UNKNOWN_TRANSACTION; } } Loading @@ -656,6 +654,8 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R } } } } } } } } if (transaction->flags & IBinder::FLAG_ONEWAY) { if (transaction->flags & IBinder::FLAG_ONEWAY) { if (replyStatus != OK) { if (replyStatus != OK) { Loading
libs/binder/include/binder/RpcSession.h +6 −5 Original line number Original line Diff line number Diff line Loading @@ -47,16 +47,17 @@ public: static sp<RpcSession> make(); static sp<RpcSession> make(); /** /** * Set the maximum number of reverse connections allowed to be made (for * Set the maximum number of threads allowed to be made (for things like callbacks). * things like callbacks). By default, this is 0. This must be called before * By default, this is 0. This must be called before setting up this connection as a client. * setting up this connection as a client. * Server sessions will inherits this value from RpcServer. * * * If this is called, 'shutdown' on this session must also be called. * If this is called, 'shutdown' on this session must also be called. * Otherwise, a threadpool will leak. * Otherwise, a threadpool will leak. * * * TODO(b/185167543): start these dynamically * TODO(b/185167543): start these dynamically */ */ void setMaxReverseConnections(size_t connections); void setMaxThreads(size_t threads); size_t getMaxThreads(); /** /** * This should be called once per thread, matching 'join' in the remote * This should be called once per thread, matching 'join' in the remote Loading Loading @@ -257,7 +258,7 @@ private: std::mutex mMutex; // for all below std::mutex mMutex; // for all below size_t mMaxReverseConnections = 0; size_t mMaxThreads = 0; std::condition_variable mAvailableConnectionCv; // for mWaitingThreads std::condition_variable mAvailableConnectionCv; // for mWaitingThreads size_t mWaitingThreads = 0; size_t mWaitingThreads = 0; Loading
libs/binder/tests/binderRpcTest.cpp +1 −1 Original line number Original line Diff line number Diff line Loading @@ -446,7 +446,7 @@ public: for (size_t i = 0; i < numSessions; i++) { for (size_t i = 0; i < numSessions; i++) { sp<RpcSession> session = RpcSession::make(); sp<RpcSession> session = RpcSession::make(); session->setMaxReverseConnections(numReverseConnections); session->setMaxThreads(numReverseConnections); switch (socketType) { switch (socketType) { case SocketType::UNIX: case SocketType::UNIX: Loading