Loading libs/binder/Binder.cpp +3 −1 Original line number Diff line number Diff line Loading @@ -555,7 +555,9 @@ status_t BBinder::setRpcClientDebug(android::base::unique_fd socketFd, return status; } rpcServer->setRootObjectWeak(weakThis); rpcServer->setupExternalServer(std::move(socketFd)); if (auto status = rpcServer->setupExternalServer(std::move(socketFd)); status != OK) { return status; } rpcServer->setMaxThreads(binderThreadPoolMaxCount); rpcServer->start(); e->mRpcServerLinks.emplace(link); Loading libs/binder/RpcServer.cpp +22 −21 Original line number Diff line number Diff line Loading @@ -55,25 +55,25 @@ void RpcServer::iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction() mAgreedExperimental = true; } bool RpcServer::setupUnixDomainServer(const char* path) { status_t RpcServer::setupUnixDomainServer(const char* path) { return setupSocketServer(UnixSocketAddress(path)); } bool RpcServer::setupVsockServer(unsigned int port) { status_t RpcServer::setupVsockServer(unsigned int port) { // realizing value w/ this type at compile time to avoid ubsan abort constexpr unsigned int kAnyCid = VMADDR_CID_ANY; return setupSocketServer(VsockSocketAddress(kAnyCid, port)); } bool RpcServer::setupInetServer(const char* address, unsigned int port, status_t RpcServer::setupInetServer(const char* address, unsigned int port, unsigned int* assignedPort) { if (assignedPort != nullptr) *assignedPort = 0; auto aiStart = InetSocketAddress::getAddrInfo(address, port); if (aiStart == nullptr) return false; if (aiStart == nullptr) return UNKNOWN_ERROR; for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) { InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, address, port); if (!setupSocketServer(socketAddress)) { if (status_t status = setupSocketServer(socketAddress); status != OK) { continue; } Loading @@ -84,7 +84,7 @@ bool RpcServer::setupInetServer(const char* address, unsigned int port, int savedErrno = errno; ALOGE("Could not getsockname at %s: %s", socketAddress.toString().c_str(), strerror(savedErrno)); return false; return -savedErrno; } LOG_ALWAYS_FATAL_IF(len != sizeof(addr), "Wrong socket type: len %zu vs len %zu", static_cast<size_t>(len), sizeof(addr)); Loading @@ -97,11 +97,11 @@ bool RpcServer::setupInetServer(const char* address, unsigned int port, *assignedPort = realPort; } return true; return OK; } ALOGE("None of the socket address resolved for %s:%u can be set up as inet server.", address, port); return false; return UNKNOWN_ERROR; } void RpcServer::setMaxThreads(size_t threads) { Loading Loading @@ -366,7 +366,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie } if (incoming) { LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(client), true), LOG_ALWAYS_FATAL_IF(OK != session->addOutgoingConnection(std::move(client), true), "server state must already be initialized"); return; } Loading @@ -383,21 +383,22 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie RpcSession::join(std::move(session), std::move(setupResult)); } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) { LOG_RPC_DETAIL("Setting up socket server %s", addr.toString().c_str()); LOG_ALWAYS_FATAL_IF(hasServer(), "Each RpcServer can only have one server."); unique_fd serverFd( TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0))); if (serverFd == -1) { ALOGE("Could not create socket: %s", strerror(errno)); return false; int savedErrno = errno; ALOGE("Could not create socket: %s", strerror(savedErrno)); return -savedErrno; } if (0 != TEMP_FAILURE_RETRY(bind(serverFd.get(), addr.addr(), addr.addrSize()))) { int savedErrno = errno; ALOGE("Could not bind socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return false; return -savedErrno; } // Right now, we create all threads at once, making accept4 slow. To avoid hanging the client, Loading @@ -407,16 +408,16 @@ bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { if (0 != TEMP_FAILURE_RETRY(listen(serverFd.get(), 50 /*backlog*/))) { int savedErrno = errno; ALOGE("Could not listen socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return false; return -savedErrno; } LOG_RPC_DETAIL("Successfully setup socket server %s", addr.toString().c_str()); if (!setupExternalServer(std::move(serverFd))) { if (status_t status = setupExternalServer(std::move(serverFd)); status != OK) { ALOGE("Another thread has set up server while calling setupSocketServer. Race?"); return false; return status; } return true; return OK; } void RpcServer::onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) { Loading Loading @@ -449,15 +450,15 @@ unique_fd RpcServer::releaseServer() { return std::move(mServer); } bool RpcServer::setupExternalServer(base::unique_fd serverFd) { status_t RpcServer::setupExternalServer(base::unique_fd serverFd) { LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); std::lock_guard<std::mutex> _l(mLock); if (mServer.ok()) { ALOGE("Each RpcServer can only have one server."); return false; return INVALID_OPERATION; } mServer = std::move(serverFd); return true; return OK; } } // namespace android libs/binder/RpcSession.cpp +53 −44 Original line number Diff line number Diff line Loading @@ -107,54 +107,55 @@ std::optional<uint32_t> RpcSession::getProtocolVersion() { return mProtocolVersion; } bool RpcSession::setupUnixDomainClient(const char* path) { status_t RpcSession::setupUnixDomainClient(const char* path) { return setupSocketClient(UnixSocketAddress(path)); } bool RpcSession::setupVsockClient(unsigned int cid, unsigned int port) { status_t RpcSession::setupVsockClient(unsigned int cid, unsigned int port) { return setupSocketClient(VsockSocketAddress(cid, port)); } bool RpcSession::setupInetClient(const char* addr, unsigned int port) { status_t RpcSession::setupInetClient(const char* addr, unsigned int port) { auto aiStart = InetSocketAddress::getAddrInfo(addr, port); if (aiStart == nullptr) return false; if (aiStart == nullptr) return UNKNOWN_ERROR; for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) { InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, addr, port); if (setupSocketClient(socketAddress)) return true; if (status_t status = setupSocketClient(socketAddress); status == OK) return OK; } ALOGE("None of the socket address resolved for %s:%u can be added as inet client.", addr, port); return false; return NAME_NOT_FOUND; } bool RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_fd()>&& request) { return setupClient([&](const RpcAddress& sessionId, bool incoming) { status_t RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_fd()>&& request) { return setupClient([&](const RpcAddress& sessionId, bool incoming) -> status_t { // std::move'd from fd becomes -1 (!ok()) if (!fd.ok()) { fd = request(); if (!fd.ok()) return false; if (!fd.ok()) return BAD_VALUE; } return initAndAddConnection(std::move(fd), sessionId, incoming); }); } bool RpcSession::addNullDebuggingClient() { status_t RpcSession::addNullDebuggingClient() { // Note: only works on raw sockets. unique_fd serverFd(TEMP_FAILURE_RETRY(open("/dev/null", O_WRONLY | O_CLOEXEC))); if (serverFd == -1) { ALOGE("Could not connect to /dev/null: %s", strerror(errno)); return false; int savedErrno = errno; ALOGE("Could not connect to /dev/null: %s", strerror(savedErrno)); return -savedErrno; } auto ctx = mRpcTransportCtxFactory->newClientCtx(); if (ctx == nullptr) { ALOGE("Unable to create RpcTransportCtx for null debugging client"); return false; return NO_MEMORY; } auto server = ctx->newTransport(std::move(serverFd)); if (server == nullptr) { ALOGE("Unable to set up RpcTransport"); return false; return UNKNOWN_ERROR; } return addOutgoingConnection(std::move(server), false); } Loading Loading @@ -475,8 +476,8 @@ sp<RpcServer> RpcSession::server() { return server; } bool RpcSession::setupClient( const std::function<bool(const RpcAddress& sessionId, bool incoming)>& connectAndInit) { status_t RpcSession::setupClient( const std::function<status_t(const RpcAddress& sessionId, bool incoming)>& connectAndInit) { { std::lock_guard<std::mutex> _l(mMutex); LOG_ALWAYS_FATAL_IF(mOutgoingConnections.size() != 0, Loading @@ -484,18 +485,23 @@ bool RpcSession::setupClient( mOutgoingConnections.size()); } if (!connectAndInit(RpcAddress::zero(), false /*incoming*/)) return false; if (status_t status = connectAndInit(RpcAddress::zero(), false /*incoming*/); status != OK) return status; { ExclusiveConnection connection; status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), if (status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT, &connection); if (status != OK) return false; status != OK) return status; uint32_t version; status = state()->readNewSessionResponse(connection.get(), if (status_t status = state()->readNewSessionResponse(connection.get(), sp<RpcSession>::fromExisting(this), &version); if (!setProtocolVersion(version)) return false; status != OK) return status; if (!setProtocolVersion(version)) return BAD_VALUE; } // TODO(b/189955605): we should add additional sessions dynamically Loading @@ -504,13 +510,13 @@ bool RpcSession::setupClient( if (status_t status = getRemoteMaxThreads(&numThreadsAvailable); status != OK) { ALOGE("Could not get max threads after initial session setup: %s", statusToString(status).c_str()); return false; return status; } if (status_t status = readId(); status != OK) { ALOGE("Could not get session id after initial session setup: %s", statusToString(status).c_str()); return false; return status; } // TODO(b/189955605): we should add additional sessions dynamically Loading @@ -521,24 +527,26 @@ bool RpcSession::setupClient( // we've already setup one client for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { if (!connectAndInit(mId.value(), false /*incoming*/)) return false; if (status_t status = connectAndInit(mId.value(), false /*incoming*/); status != OK) return status; } for (size_t i = 0; i < mMaxThreads; i++) { if (!connectAndInit(mId.value(), true /*incoming*/)) return false; if (status_t status = connectAndInit(mId.value(), true /*incoming*/); status != OK) return status; } return true; return OK; } bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { status_t RpcSession::setupSocketClient(const RpcSocketAddress& addr) { return setupClient([&](const RpcAddress& sessionId, bool incoming) { return setupOneSocketConnection(addr, sessionId, incoming); }); } bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& sessionId, bool incoming) { status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& sessionId, bool incoming) { for (size_t tries = 0; tries < 5; tries++) { if (tries > 0) usleep(10000); Loading @@ -548,7 +556,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp int savedErrno = errno; ALOGE("Could not create socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return false; return -savedErrno; } if (0 != TEMP_FAILURE_RETRY(connect(serverFd.get(), addr.addr(), addr.addrSize()))) { Loading @@ -559,7 +567,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp int savedErrno = errno; ALOGE("Could not connect socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return false; return -savedErrno; } LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); Loading @@ -567,20 +575,21 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp } ALOGE("Ran out of retries to connect to %s", addr.toString().c_str()); return false; return UNKNOWN_ERROR; } bool RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, bool incoming) { status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, bool incoming) { auto ctx = mRpcTransportCtxFactory->newClientCtx(); if (ctx == nullptr) { ALOGE("Unable to create client RpcTransportCtx with %s sockets", mRpcTransportCtxFactory->toCString()); return false; return NO_MEMORY; } auto server = ctx->newTransport(std::move(fd)); if (server == nullptr) { ALOGE("Unable to set up RpcTransport in %s context", mRpcTransportCtxFactory->toCString()); return false; return UNKNOWN_ERROR; } LOG_RPC_DETAIL("Socket at client with RpcTransport %p", server.get()); Loading @@ -597,12 +606,12 @@ bool RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, if (!sentHeader.ok()) { ALOGE("Could not write connection header to socket: %s", sentHeader.error().message().c_str()); return false; return -sentHeader.error().code(); } if (*sentHeader != sizeof(header)) { ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd", *sentHeader, sizeof(header)); return false; return UNKNOWN_ERROR; } LOG_RPC_DETAIL("Socket at client: header sent"); Loading @@ -614,7 +623,7 @@ bool RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, } } bool RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) { status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) { std::mutex mutex; std::condition_variable joinCv; std::unique_lock<std::mutex> lock(mutex); Loading @@ -640,10 +649,10 @@ bool RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTranspor }); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); return true; return OK; } bool RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) { status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) { sp<RpcConnection> connection = sp<RpcConnection>::make(); { std::lock_guard<std::mutex> _l(mMutex); Loading @@ -653,7 +662,7 @@ bool RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTranspor if (mShutdownTrigger == nullptr) { mShutdownTrigger = FdTrigger::make(); mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); if (mShutdownTrigger == nullptr) return false; if (mShutdownTrigger == nullptr) return INVALID_OPERATION; } connection->rpcTransport = std::move(rpcTransport); Loading @@ -671,7 +680,7 @@ bool RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTranspor connection->exclusiveTid = std::nullopt; } return status == OK; return status; } bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, Loading libs/binder/ServiceManagerHost.cpp +4 −2 Original line number Diff line number Diff line Loading @@ -158,8 +158,10 @@ sp<IBinder> getDeviceService(std::vector<std::string>&& serviceDispatcherArgs) { LOG_ALWAYS_FATAL_IF(!forwardResult->hostPort().has_value()); auto rpcSession = RpcSession::make(); if (!rpcSession->setupInetClient("127.0.0.1", *forwardResult->hostPort())) { ALOGE("Unable to set up inet client on host port %u", *forwardResult->hostPort()); if (status_t status = rpcSession->setupInetClient("127.0.0.1", *forwardResult->hostPort()); status != OK) { ALOGE("Unable to set up inet client on host port %u: %s", *forwardResult->hostPort(), statusToString(status).c_str()); return nullptr; } auto binder = rpcSession->getRootObject(); Loading libs/binder/include/binder/RpcServer.h +6 −6 Original line number Diff line number Diff line Loading @@ -59,12 +59,12 @@ public: * process B makes binder b and sends it to A * A uses this 'back session' to send things back to B */ [[nodiscard]] bool setupUnixDomainServer(const char* path); [[nodiscard]] status_t setupUnixDomainServer(const char* path); /** * Creates an RPC server at the current port. */ [[nodiscard]] bool setupVsockServer(unsigned int port); [[nodiscard]] status_t setupVsockServer(unsigned int port); /** * Creates an RPC server at the current port using IPv4. Loading @@ -80,7 +80,7 @@ public: * "0.0.0.0" allows for connections on any IP address that the device may * have */ [[nodiscard]] bool setupInetServer(const char* address, unsigned int port, [[nodiscard]] status_t setupInetServer(const char* address, unsigned int port, unsigned int* assignedPort); /** Loading @@ -97,7 +97,7 @@ public: * Set up server using an external FD previously set up by releaseServer(). * Return false if there's already a server. */ bool setupExternalServer(base::unique_fd serverFd); [[nodiscard]] status_t setupExternalServer(base::unique_fd serverFd); void iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction(); Loading Loading @@ -175,7 +175,7 @@ private: void onSessionIncomingThreadEnded() override; static void establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd); bool setupSocketServer(const RpcSocketAddress& address); status_t setupSocketServer(const RpcSocketAddress& address); const std::unique_ptr<RpcTransportCtxFactory> mRpcTransportCtxFactory; bool mAgreedExperimental = false; Loading Loading
libs/binder/Binder.cpp +3 −1 Original line number Diff line number Diff line Loading @@ -555,7 +555,9 @@ status_t BBinder::setRpcClientDebug(android::base::unique_fd socketFd, return status; } rpcServer->setRootObjectWeak(weakThis); rpcServer->setupExternalServer(std::move(socketFd)); if (auto status = rpcServer->setupExternalServer(std::move(socketFd)); status != OK) { return status; } rpcServer->setMaxThreads(binderThreadPoolMaxCount); rpcServer->start(); e->mRpcServerLinks.emplace(link); Loading
libs/binder/RpcServer.cpp +22 −21 Original line number Diff line number Diff line Loading @@ -55,25 +55,25 @@ void RpcServer::iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction() mAgreedExperimental = true; } bool RpcServer::setupUnixDomainServer(const char* path) { status_t RpcServer::setupUnixDomainServer(const char* path) { return setupSocketServer(UnixSocketAddress(path)); } bool RpcServer::setupVsockServer(unsigned int port) { status_t RpcServer::setupVsockServer(unsigned int port) { // realizing value w/ this type at compile time to avoid ubsan abort constexpr unsigned int kAnyCid = VMADDR_CID_ANY; return setupSocketServer(VsockSocketAddress(kAnyCid, port)); } bool RpcServer::setupInetServer(const char* address, unsigned int port, status_t RpcServer::setupInetServer(const char* address, unsigned int port, unsigned int* assignedPort) { if (assignedPort != nullptr) *assignedPort = 0; auto aiStart = InetSocketAddress::getAddrInfo(address, port); if (aiStart == nullptr) return false; if (aiStart == nullptr) return UNKNOWN_ERROR; for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) { InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, address, port); if (!setupSocketServer(socketAddress)) { if (status_t status = setupSocketServer(socketAddress); status != OK) { continue; } Loading @@ -84,7 +84,7 @@ bool RpcServer::setupInetServer(const char* address, unsigned int port, int savedErrno = errno; ALOGE("Could not getsockname at %s: %s", socketAddress.toString().c_str(), strerror(savedErrno)); return false; return -savedErrno; } LOG_ALWAYS_FATAL_IF(len != sizeof(addr), "Wrong socket type: len %zu vs len %zu", static_cast<size_t>(len), sizeof(addr)); Loading @@ -97,11 +97,11 @@ bool RpcServer::setupInetServer(const char* address, unsigned int port, *assignedPort = realPort; } return true; return OK; } ALOGE("None of the socket address resolved for %s:%u can be set up as inet server.", address, port); return false; return UNKNOWN_ERROR; } void RpcServer::setMaxThreads(size_t threads) { Loading Loading @@ -366,7 +366,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie } if (incoming) { LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(client), true), LOG_ALWAYS_FATAL_IF(OK != session->addOutgoingConnection(std::move(client), true), "server state must already be initialized"); return; } Loading @@ -383,21 +383,22 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie RpcSession::join(std::move(session), std::move(setupResult)); } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) { LOG_RPC_DETAIL("Setting up socket server %s", addr.toString().c_str()); LOG_ALWAYS_FATAL_IF(hasServer(), "Each RpcServer can only have one server."); unique_fd serverFd( TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0))); if (serverFd == -1) { ALOGE("Could not create socket: %s", strerror(errno)); return false; int savedErrno = errno; ALOGE("Could not create socket: %s", strerror(savedErrno)); return -savedErrno; } if (0 != TEMP_FAILURE_RETRY(bind(serverFd.get(), addr.addr(), addr.addrSize()))) { int savedErrno = errno; ALOGE("Could not bind socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return false; return -savedErrno; } // Right now, we create all threads at once, making accept4 slow. To avoid hanging the client, Loading @@ -407,16 +408,16 @@ bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { if (0 != TEMP_FAILURE_RETRY(listen(serverFd.get(), 50 /*backlog*/))) { int savedErrno = errno; ALOGE("Could not listen socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return false; return -savedErrno; } LOG_RPC_DETAIL("Successfully setup socket server %s", addr.toString().c_str()); if (!setupExternalServer(std::move(serverFd))) { if (status_t status = setupExternalServer(std::move(serverFd)); status != OK) { ALOGE("Another thread has set up server while calling setupSocketServer. Race?"); return false; return status; } return true; return OK; } void RpcServer::onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) { Loading Loading @@ -449,15 +450,15 @@ unique_fd RpcServer::releaseServer() { return std::move(mServer); } bool RpcServer::setupExternalServer(base::unique_fd serverFd) { status_t RpcServer::setupExternalServer(base::unique_fd serverFd) { LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); std::lock_guard<std::mutex> _l(mLock); if (mServer.ok()) { ALOGE("Each RpcServer can only have one server."); return false; return INVALID_OPERATION; } mServer = std::move(serverFd); return true; return OK; } } // namespace android
libs/binder/RpcSession.cpp +53 −44 Original line number Diff line number Diff line Loading @@ -107,54 +107,55 @@ std::optional<uint32_t> RpcSession::getProtocolVersion() { return mProtocolVersion; } bool RpcSession::setupUnixDomainClient(const char* path) { status_t RpcSession::setupUnixDomainClient(const char* path) { return setupSocketClient(UnixSocketAddress(path)); } bool RpcSession::setupVsockClient(unsigned int cid, unsigned int port) { status_t RpcSession::setupVsockClient(unsigned int cid, unsigned int port) { return setupSocketClient(VsockSocketAddress(cid, port)); } bool RpcSession::setupInetClient(const char* addr, unsigned int port) { status_t RpcSession::setupInetClient(const char* addr, unsigned int port) { auto aiStart = InetSocketAddress::getAddrInfo(addr, port); if (aiStart == nullptr) return false; if (aiStart == nullptr) return UNKNOWN_ERROR; for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) { InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, addr, port); if (setupSocketClient(socketAddress)) return true; if (status_t status = setupSocketClient(socketAddress); status == OK) return OK; } ALOGE("None of the socket address resolved for %s:%u can be added as inet client.", addr, port); return false; return NAME_NOT_FOUND; } bool RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_fd()>&& request) { return setupClient([&](const RpcAddress& sessionId, bool incoming) { status_t RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_fd()>&& request) { return setupClient([&](const RpcAddress& sessionId, bool incoming) -> status_t { // std::move'd from fd becomes -1 (!ok()) if (!fd.ok()) { fd = request(); if (!fd.ok()) return false; if (!fd.ok()) return BAD_VALUE; } return initAndAddConnection(std::move(fd), sessionId, incoming); }); } bool RpcSession::addNullDebuggingClient() { status_t RpcSession::addNullDebuggingClient() { // Note: only works on raw sockets. unique_fd serverFd(TEMP_FAILURE_RETRY(open("/dev/null", O_WRONLY | O_CLOEXEC))); if (serverFd == -1) { ALOGE("Could not connect to /dev/null: %s", strerror(errno)); return false; int savedErrno = errno; ALOGE("Could not connect to /dev/null: %s", strerror(savedErrno)); return -savedErrno; } auto ctx = mRpcTransportCtxFactory->newClientCtx(); if (ctx == nullptr) { ALOGE("Unable to create RpcTransportCtx for null debugging client"); return false; return NO_MEMORY; } auto server = ctx->newTransport(std::move(serverFd)); if (server == nullptr) { ALOGE("Unable to set up RpcTransport"); return false; return UNKNOWN_ERROR; } return addOutgoingConnection(std::move(server), false); } Loading Loading @@ -475,8 +476,8 @@ sp<RpcServer> RpcSession::server() { return server; } bool RpcSession::setupClient( const std::function<bool(const RpcAddress& sessionId, bool incoming)>& connectAndInit) { status_t RpcSession::setupClient( const std::function<status_t(const RpcAddress& sessionId, bool incoming)>& connectAndInit) { { std::lock_guard<std::mutex> _l(mMutex); LOG_ALWAYS_FATAL_IF(mOutgoingConnections.size() != 0, Loading @@ -484,18 +485,23 @@ bool RpcSession::setupClient( mOutgoingConnections.size()); } if (!connectAndInit(RpcAddress::zero(), false /*incoming*/)) return false; if (status_t status = connectAndInit(RpcAddress::zero(), false /*incoming*/); status != OK) return status; { ExclusiveConnection connection; status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), if (status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT, &connection); if (status != OK) return false; status != OK) return status; uint32_t version; status = state()->readNewSessionResponse(connection.get(), if (status_t status = state()->readNewSessionResponse(connection.get(), sp<RpcSession>::fromExisting(this), &version); if (!setProtocolVersion(version)) return false; status != OK) return status; if (!setProtocolVersion(version)) return BAD_VALUE; } // TODO(b/189955605): we should add additional sessions dynamically Loading @@ -504,13 +510,13 @@ bool RpcSession::setupClient( if (status_t status = getRemoteMaxThreads(&numThreadsAvailable); status != OK) { ALOGE("Could not get max threads after initial session setup: %s", statusToString(status).c_str()); return false; return status; } if (status_t status = readId(); status != OK) { ALOGE("Could not get session id after initial session setup: %s", statusToString(status).c_str()); return false; return status; } // TODO(b/189955605): we should add additional sessions dynamically Loading @@ -521,24 +527,26 @@ bool RpcSession::setupClient( // we've already setup one client for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { if (!connectAndInit(mId.value(), false /*incoming*/)) return false; if (status_t status = connectAndInit(mId.value(), false /*incoming*/); status != OK) return status; } for (size_t i = 0; i < mMaxThreads; i++) { if (!connectAndInit(mId.value(), true /*incoming*/)) return false; if (status_t status = connectAndInit(mId.value(), true /*incoming*/); status != OK) return status; } return true; return OK; } bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { status_t RpcSession::setupSocketClient(const RpcSocketAddress& addr) { return setupClient([&](const RpcAddress& sessionId, bool incoming) { return setupOneSocketConnection(addr, sessionId, incoming); }); } bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& sessionId, bool incoming) { status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& sessionId, bool incoming) { for (size_t tries = 0; tries < 5; tries++) { if (tries > 0) usleep(10000); Loading @@ -548,7 +556,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp int savedErrno = errno; ALOGE("Could not create socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return false; return -savedErrno; } if (0 != TEMP_FAILURE_RETRY(connect(serverFd.get(), addr.addr(), addr.addrSize()))) { Loading @@ -559,7 +567,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp int savedErrno = errno; ALOGE("Could not connect socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return false; return -savedErrno; } LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); Loading @@ -567,20 +575,21 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp } ALOGE("Ran out of retries to connect to %s", addr.toString().c_str()); return false; return UNKNOWN_ERROR; } bool RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, bool incoming) { status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, bool incoming) { auto ctx = mRpcTransportCtxFactory->newClientCtx(); if (ctx == nullptr) { ALOGE("Unable to create client RpcTransportCtx with %s sockets", mRpcTransportCtxFactory->toCString()); return false; return NO_MEMORY; } auto server = ctx->newTransport(std::move(fd)); if (server == nullptr) { ALOGE("Unable to set up RpcTransport in %s context", mRpcTransportCtxFactory->toCString()); return false; return UNKNOWN_ERROR; } LOG_RPC_DETAIL("Socket at client with RpcTransport %p", server.get()); Loading @@ -597,12 +606,12 @@ bool RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, if (!sentHeader.ok()) { ALOGE("Could not write connection header to socket: %s", sentHeader.error().message().c_str()); return false; return -sentHeader.error().code(); } if (*sentHeader != sizeof(header)) { ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd", *sentHeader, sizeof(header)); return false; return UNKNOWN_ERROR; } LOG_RPC_DETAIL("Socket at client: header sent"); Loading @@ -614,7 +623,7 @@ bool RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, } } bool RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) { status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) { std::mutex mutex; std::condition_variable joinCv; std::unique_lock<std::mutex> lock(mutex); Loading @@ -640,10 +649,10 @@ bool RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTranspor }); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); return true; return OK; } bool RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) { status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) { sp<RpcConnection> connection = sp<RpcConnection>::make(); { std::lock_guard<std::mutex> _l(mMutex); Loading @@ -653,7 +662,7 @@ bool RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTranspor if (mShutdownTrigger == nullptr) { mShutdownTrigger = FdTrigger::make(); mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); if (mShutdownTrigger == nullptr) return false; if (mShutdownTrigger == nullptr) return INVALID_OPERATION; } connection->rpcTransport = std::move(rpcTransport); Loading @@ -671,7 +680,7 @@ bool RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTranspor connection->exclusiveTid = std::nullopt; } return status == OK; return status; } bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, Loading
libs/binder/ServiceManagerHost.cpp +4 −2 Original line number Diff line number Diff line Loading @@ -158,8 +158,10 @@ sp<IBinder> getDeviceService(std::vector<std::string>&& serviceDispatcherArgs) { LOG_ALWAYS_FATAL_IF(!forwardResult->hostPort().has_value()); auto rpcSession = RpcSession::make(); if (!rpcSession->setupInetClient("127.0.0.1", *forwardResult->hostPort())) { ALOGE("Unable to set up inet client on host port %u", *forwardResult->hostPort()); if (status_t status = rpcSession->setupInetClient("127.0.0.1", *forwardResult->hostPort()); status != OK) { ALOGE("Unable to set up inet client on host port %u: %s", *forwardResult->hostPort(), statusToString(status).c_str()); return nullptr; } auto binder = rpcSession->getRootObject(); Loading
libs/binder/include/binder/RpcServer.h +6 −6 Original line number Diff line number Diff line Loading @@ -59,12 +59,12 @@ public: * process B makes binder b and sends it to A * A uses this 'back session' to send things back to B */ [[nodiscard]] bool setupUnixDomainServer(const char* path); [[nodiscard]] status_t setupUnixDomainServer(const char* path); /** * Creates an RPC server at the current port. */ [[nodiscard]] bool setupVsockServer(unsigned int port); [[nodiscard]] status_t setupVsockServer(unsigned int port); /** * Creates an RPC server at the current port using IPv4. Loading @@ -80,7 +80,7 @@ public: * "0.0.0.0" allows for connections on any IP address that the device may * have */ [[nodiscard]] bool setupInetServer(const char* address, unsigned int port, [[nodiscard]] status_t setupInetServer(const char* address, unsigned int port, unsigned int* assignedPort); /** Loading @@ -97,7 +97,7 @@ public: * Set up server using an external FD previously set up by releaseServer(). * Return false if there's already a server. */ bool setupExternalServer(base::unique_fd serverFd); [[nodiscard]] status_t setupExternalServer(base::unique_fd serverFd); void iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction(); Loading Loading @@ -175,7 +175,7 @@ private: void onSessionIncomingThreadEnded() override; static void establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd); bool setupSocketServer(const RpcSocketAddress& address); status_t setupSocketServer(const RpcSocketAddress& address); const std::unique_ptr<RpcTransportCtxFactory> mRpcTransportCtxFactory; bool mAgreedExperimental = false; Loading