Loading libs/binder/RpcConnection.cpp +44 −7 Original line number Diff line number Diff line Loading @@ -139,8 +139,8 @@ bool RpcConnection::setupUnixDomainServer(const char* path) { return setupSocketServer(UnixSocketAddress(path)); } bool RpcConnection::addUnixDomainClient(const char* path) { return addSocketClient(UnixSocketAddress(path)); bool RpcConnection::setupUnixDomainClient(const char* path) { return setupSocketClient(UnixSocketAddress(path)); } #ifdef __BIONIC__ Loading Loading @@ -171,8 +171,8 @@ bool RpcConnection::setupVsockServer(unsigned int port) { return setupSocketServer(VsockSocketAddress(kAnyCid, port)); } bool RpcConnection::addVsockClient(unsigned int cid, unsigned int port) { return addSocketClient(VsockSocketAddress(cid, port)); bool RpcConnection::setupVsockClient(unsigned int cid, unsigned int port) { return setupSocketClient(VsockSocketAddress(cid, port)); } #endif // __BIONIC__ Loading Loading @@ -240,12 +240,12 @@ bool RpcConnection::setupInetServer(unsigned int port, unsigned int* assignedPor return false; } bool RpcConnection::addInetClient(const char* addr, unsigned int port) { bool RpcConnection::setupInetClient(const char* addr, unsigned int port) { auto aiStart = GetAddrInfo(addr, port); if (aiStart == nullptr) return false; for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) { InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, addr, port); if (addSocketClient(socketAddress)) return true; if (setupSocketClient(socketAddress)) return true; } ALOGE("None of the socket address resolved for %s:%u can be added as inet client.", addr, port); return false; Loading @@ -268,6 +268,11 @@ sp<IBinder> RpcConnection::getRootObject() { return state()->getRootObject(socket.fd(), sp<RpcConnection>::fromExisting(this)); } status_t RpcConnection::getMaxThreads(size_t* maxThreads) { ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), SocketUse::CLIENT); return state()->getMaxThreads(socket.fd(), sp<RpcConnection>::fromExisting(this), maxThreads); } status_t RpcConnection::transact(const RpcAddress& address, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags) { ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), Loading Loading @@ -348,7 +353,39 @@ bool RpcConnection::setupSocketServer(const SocketAddress& addr) { return true; } bool RpcConnection::addSocketClient(const SocketAddress& addr) { bool RpcConnection::setupSocketClient(const SocketAddress& addr) { { std::lock_guard<std::mutex> _l(mSocketMutex); LOG_ALWAYS_FATAL_IF(mClients.size() != 0, "Must only setup connection once, but already has %zu clients", mClients.size()); } if (!setupOneSocketClient(addr)) return false; // TODO(b/185167543): we should add additional connections dynamically // instead of all at once. // TODO(b/186470974): first risk of blocking size_t numThreadsAvailable; if (status_t status = getMaxThreads(&numThreadsAvailable); status != OK) { ALOGE("Could not get max threads after initial connection to %s: %s", addr.toString().c_str(), statusToString(status).c_str()); return false; } // we've already setup one client for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { // TODO(b/185167543): avoid race w/ accept4 not being called on server for (size_t tries = 0; tries < 5; tries++) { if (setupOneSocketClient(addr)) break; usleep(10000); } } return true; } bool RpcConnection::setupOneSocketClient(const SocketAddress& addr) { unique_fd serverFd( TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0))); if (serverFd == -1) { Loading libs/binder/RpcServer.cpp +42 −7 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <sys/socket.h> #include <sys/un.h> #include <thread> #include <vector> #include <binder/Parcel.h> Loading @@ -41,6 +42,31 @@ void RpcServer::iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction() mAgreedExperimental = true; } void RpcServer::setMaxThreads(size_t threads) { LOG_ALWAYS_FATAL_IF(threads <= 0, "RpcServer is useless without threads"); { // this lock should only ever be needed in the error case std::lock_guard<std::mutex> _l(mLock); LOG_ALWAYS_FATAL_IF(mConnections.size() > 0, "Must specify max threads before creating a connection"); } mMaxThreads = threads; } size_t RpcServer::getMaxThreads() { return mMaxThreads; } void RpcServer::setRootObject(const sp<IBinder>& binder) { std::lock_guard<std::mutex> _l(mLock); mRootObject = binder; } sp<IBinder> RpcServer::getRootObject() { std::lock_guard<std::mutex> _l(mLock); return mRootObject; } sp<RpcConnection> RpcServer::addClientConnection() { LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); Loading @@ -48,19 +74,28 @@ sp<RpcConnection> RpcServer::addClientConnection() { connection->setForServer(sp<RpcServer>::fromExisting(this)); { std::lock_guard<std::mutex> _l(mLock); LOG_ALWAYS_FATAL_IF(mStarted, "currently only supports adding client connections at creation time"); mConnections.push_back(connection); } return connection; } void RpcServer::setRootObject(const sp<IBinder>& binder) { void RpcServer::join() { std::vector<std::thread> pool; { std::lock_guard<std::mutex> _l(mLock); mRootObject = binder; mStarted = true; for (const sp<RpcConnection>& connection : mConnections) { for (size_t i = 0; i < mMaxThreads; i++) { pool.push_back(std::thread([=] { connection->join(); })); } } } sp<IBinder> RpcServer::getRootObject() { std::lock_guard<std::mutex> _l(mLock); return mRootObject; // TODO(b/185167543): don't waste extra thread for join, and combine threads // between clients for (auto& t : pool) t.join(); } } // namespace android libs/binder/RpcState.cpp +42 −15 Original line number Diff line number Diff line Loading @@ -248,6 +248,31 @@ sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, return reply.readStrongBinder(); } status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcConnection>& connection, size_t* maxThreads) { Parcel data; data.markForRpc(connection); Parcel reply; status_t status = transact(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS, data, connection, &reply, 0); if (status != OK) { ALOGE("Error getting max threads: %s", statusToString(status).c_str()); return status; } int32_t threads; status = reply.readInt32(&threads); if (status != OK) return status; if (threads <= 0) { ALOGE("Error invalid max threads: %d", threads); return BAD_VALUE; } *maxThreads = threads; return OK; } status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address, uint32_t code, const Parcel& data, const sp<RpcConnection>& connection, Parcel* reply, uint32_t flags) { Loading Loading @@ -516,24 +541,26 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, replyStatus = target->transact(transaction->code, data, &reply, transaction->flags); } else { LOG_RPC_DETAIL("Got special transaction %u", transaction->code); sp<RpcServer> server = connection->server().promote(); if (server) { // special case for 'zero' address (special server commands) switch (transaction->code) { case RPC_SPECIAL_TRANSACT_GET_ROOT: { sp<IBinder> root; sp<RpcServer> server = connection->server().promote(); if (server) { root = server->getRootObject(); } else { ALOGE("Root object requested, but no server attached."); replyStatus = reply.writeStrongBinder(server->getRootObject()); break; } replyStatus = reply.writeStrongBinder(root); case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: { replyStatus = reply.writeInt32(server->getMaxThreads()); break; } default: { replyStatus = UNKNOWN_TRANSACTION; } } } else { ALOGE("Special command sent, but no server object attached."); } } } Loading libs/binder/RpcState.h +2 −0 Original line number Diff line number Diff line Loading @@ -51,6 +51,8 @@ public: ~RpcState(); sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcConnection>& connection); status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcConnection>& connection, size_t* maxThreadsOut); [[nodiscard]] status_t transact(const base::unique_fd& fd, const RpcAddress& address, uint32_t code, const Parcel& data, Loading libs/binder/RpcWireFormat.h +1 −0 Original line number Diff line number Diff line Loading @@ -47,6 +47,7 @@ enum : uint32_t { */ enum : uint32_t { RPC_SPECIAL_TRANSACT_GET_ROOT = 0, RPC_SPECIAL_TRANSACT_GET_MAX_THREADS = 1, }; // serialization is like: Loading Loading
libs/binder/RpcConnection.cpp +44 −7 Original line number Diff line number Diff line Loading @@ -139,8 +139,8 @@ bool RpcConnection::setupUnixDomainServer(const char* path) { return setupSocketServer(UnixSocketAddress(path)); } bool RpcConnection::addUnixDomainClient(const char* path) { return addSocketClient(UnixSocketAddress(path)); bool RpcConnection::setupUnixDomainClient(const char* path) { return setupSocketClient(UnixSocketAddress(path)); } #ifdef __BIONIC__ Loading Loading @@ -171,8 +171,8 @@ bool RpcConnection::setupVsockServer(unsigned int port) { return setupSocketServer(VsockSocketAddress(kAnyCid, port)); } bool RpcConnection::addVsockClient(unsigned int cid, unsigned int port) { return addSocketClient(VsockSocketAddress(cid, port)); bool RpcConnection::setupVsockClient(unsigned int cid, unsigned int port) { return setupSocketClient(VsockSocketAddress(cid, port)); } #endif // __BIONIC__ Loading Loading @@ -240,12 +240,12 @@ bool RpcConnection::setupInetServer(unsigned int port, unsigned int* assignedPor return false; } bool RpcConnection::addInetClient(const char* addr, unsigned int port) { bool RpcConnection::setupInetClient(const char* addr, unsigned int port) { auto aiStart = GetAddrInfo(addr, port); if (aiStart == nullptr) return false; for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) { InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, addr, port); if (addSocketClient(socketAddress)) return true; if (setupSocketClient(socketAddress)) return true; } ALOGE("None of the socket address resolved for %s:%u can be added as inet client.", addr, port); return false; Loading @@ -268,6 +268,11 @@ sp<IBinder> RpcConnection::getRootObject() { return state()->getRootObject(socket.fd(), sp<RpcConnection>::fromExisting(this)); } status_t RpcConnection::getMaxThreads(size_t* maxThreads) { ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), SocketUse::CLIENT); return state()->getMaxThreads(socket.fd(), sp<RpcConnection>::fromExisting(this), maxThreads); } status_t RpcConnection::transact(const RpcAddress& address, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags) { ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), Loading Loading @@ -348,7 +353,39 @@ bool RpcConnection::setupSocketServer(const SocketAddress& addr) { return true; } bool RpcConnection::addSocketClient(const SocketAddress& addr) { bool RpcConnection::setupSocketClient(const SocketAddress& addr) { { std::lock_guard<std::mutex> _l(mSocketMutex); LOG_ALWAYS_FATAL_IF(mClients.size() != 0, "Must only setup connection once, but already has %zu clients", mClients.size()); } if (!setupOneSocketClient(addr)) return false; // TODO(b/185167543): we should add additional connections dynamically // instead of all at once. // TODO(b/186470974): first risk of blocking size_t numThreadsAvailable; if (status_t status = getMaxThreads(&numThreadsAvailable); status != OK) { ALOGE("Could not get max threads after initial connection to %s: %s", addr.toString().c_str(), statusToString(status).c_str()); return false; } // we've already setup one client for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { // TODO(b/185167543): avoid race w/ accept4 not being called on server for (size_t tries = 0; tries < 5; tries++) { if (setupOneSocketClient(addr)) break; usleep(10000); } } return true; } bool RpcConnection::setupOneSocketClient(const SocketAddress& addr) { unique_fd serverFd( TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0))); if (serverFd == -1) { Loading
libs/binder/RpcServer.cpp +42 −7 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <sys/socket.h> #include <sys/un.h> #include <thread> #include <vector> #include <binder/Parcel.h> Loading @@ -41,6 +42,31 @@ void RpcServer::iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction() mAgreedExperimental = true; } void RpcServer::setMaxThreads(size_t threads) { LOG_ALWAYS_FATAL_IF(threads <= 0, "RpcServer is useless without threads"); { // this lock should only ever be needed in the error case std::lock_guard<std::mutex> _l(mLock); LOG_ALWAYS_FATAL_IF(mConnections.size() > 0, "Must specify max threads before creating a connection"); } mMaxThreads = threads; } size_t RpcServer::getMaxThreads() { return mMaxThreads; } void RpcServer::setRootObject(const sp<IBinder>& binder) { std::lock_guard<std::mutex> _l(mLock); mRootObject = binder; } sp<IBinder> RpcServer::getRootObject() { std::lock_guard<std::mutex> _l(mLock); return mRootObject; } sp<RpcConnection> RpcServer::addClientConnection() { LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); Loading @@ -48,19 +74,28 @@ sp<RpcConnection> RpcServer::addClientConnection() { connection->setForServer(sp<RpcServer>::fromExisting(this)); { std::lock_guard<std::mutex> _l(mLock); LOG_ALWAYS_FATAL_IF(mStarted, "currently only supports adding client connections at creation time"); mConnections.push_back(connection); } return connection; } void RpcServer::setRootObject(const sp<IBinder>& binder) { void RpcServer::join() { std::vector<std::thread> pool; { std::lock_guard<std::mutex> _l(mLock); mRootObject = binder; mStarted = true; for (const sp<RpcConnection>& connection : mConnections) { for (size_t i = 0; i < mMaxThreads; i++) { pool.push_back(std::thread([=] { connection->join(); })); } } } sp<IBinder> RpcServer::getRootObject() { std::lock_guard<std::mutex> _l(mLock); return mRootObject; // TODO(b/185167543): don't waste extra thread for join, and combine threads // between clients for (auto& t : pool) t.join(); } } // namespace android
libs/binder/RpcState.cpp +42 −15 Original line number Diff line number Diff line Loading @@ -248,6 +248,31 @@ sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, return reply.readStrongBinder(); } status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcConnection>& connection, size_t* maxThreads) { Parcel data; data.markForRpc(connection); Parcel reply; status_t status = transact(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS, data, connection, &reply, 0); if (status != OK) { ALOGE("Error getting max threads: %s", statusToString(status).c_str()); return status; } int32_t threads; status = reply.readInt32(&threads); if (status != OK) return status; if (threads <= 0) { ALOGE("Error invalid max threads: %d", threads); return BAD_VALUE; } *maxThreads = threads; return OK; } status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address, uint32_t code, const Parcel& data, const sp<RpcConnection>& connection, Parcel* reply, uint32_t flags) { Loading Loading @@ -516,24 +541,26 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, replyStatus = target->transact(transaction->code, data, &reply, transaction->flags); } else { LOG_RPC_DETAIL("Got special transaction %u", transaction->code); sp<RpcServer> server = connection->server().promote(); if (server) { // special case for 'zero' address (special server commands) switch (transaction->code) { case RPC_SPECIAL_TRANSACT_GET_ROOT: { sp<IBinder> root; sp<RpcServer> server = connection->server().promote(); if (server) { root = server->getRootObject(); } else { ALOGE("Root object requested, but no server attached."); replyStatus = reply.writeStrongBinder(server->getRootObject()); break; } replyStatus = reply.writeStrongBinder(root); case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: { replyStatus = reply.writeInt32(server->getMaxThreads()); break; } default: { replyStatus = UNKNOWN_TRANSACTION; } } } else { ALOGE("Special command sent, but no server object attached."); } } } Loading
libs/binder/RpcState.h +2 −0 Original line number Diff line number Diff line Loading @@ -51,6 +51,8 @@ public: ~RpcState(); sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcConnection>& connection); status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcConnection>& connection, size_t* maxThreadsOut); [[nodiscard]] status_t transact(const base::unique_fd& fd, const RpcAddress& address, uint32_t code, const Parcel& data, Loading
libs/binder/RpcWireFormat.h +1 −0 Original line number Diff line number Diff line Loading @@ -47,6 +47,7 @@ enum : uint32_t { */ enum : uint32_t { RPC_SPECIAL_TRANSACT_GET_ROOT = 0, RPC_SPECIAL_TRANSACT_GET_MAX_THREADS = 1, }; // serialization is like: Loading