Loading libs/binder/RpcServer.cpp +24 −11 Original line number Original line Diff line number Diff line Loading @@ -239,15 +239,16 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie // It must be set before this thread is started // It must be set before this thread is started LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr); LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr); int32_t id; RpcConnectionHeader header; status_t status = status_t status = server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &header, server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &id, sizeof(id)); sizeof(header)); bool idValid = status == OK; bool idValid = status == OK; if (!idValid) { if (!idValid) { ALOGE("Failed to read ID for client connecting to RPC server: %s", ALOGE("Failed to read ID for client connecting to RPC server: %s", statusToString(status).c_str()); statusToString(status).c_str()); // still need to cleanup before we can return // still need to cleanup before we can return } } bool reverse = header.options & RPC_CONNECTION_OPTION_REVERSE; std::thread thisThread; std::thread thisThread; sp<RpcSession> session; sp<RpcSession> session; Loading @@ -269,24 +270,37 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie return; return; } } if (id == RPC_SESSION_ID_NEW) { if (header.sessionId == RPC_SESSION_ID_NEW) { if (reverse) { ALOGE("Cannot create a new session with a reverse connection, would leak"); return; } LOG_ALWAYS_FATAL_IF(server->mSessionIdCounter >= INT32_MAX, "Out of session IDs"); LOG_ALWAYS_FATAL_IF(server->mSessionIdCounter >= INT32_MAX, "Out of session IDs"); server->mSessionIdCounter++; server->mSessionIdCounter++; session = RpcSession::make(); session = RpcSession::make(); session->setForServer(wp<RpcServer>(server), server->mSessionIdCounter, session->setForServer(server, server->mShutdownTrigger); sp<RpcServer::EventListener>::fromExisting( static_cast<RpcServer::EventListener*>(server.get())), server->mSessionIdCounter, server->mShutdownTrigger); server->mSessions[server->mSessionIdCounter] = session; server->mSessions[server->mSessionIdCounter] = session; } else { } else { auto it = server->mSessions.find(id); auto it = server->mSessions.find(header.sessionId); if (it == server->mSessions.end()) { if (it == server->mSessions.end()) { ALOGE("Cannot add thread, no record of session with ID %d", id); ALOGE("Cannot add thread, no record of session with ID %d", header.sessionId); return; return; } } session = it->second; session = it->second; } } if (reverse) { LOG_ALWAYS_FATAL_IF(!session->addClientConnection(std::move(clientFd)), "server state must already be initialized"); return; } detachGuard.Disable(); detachGuard.Disable(); session->preJoin(std::move(thisThread)); session->preJoin(std::move(thisThread)); } } Loading @@ -294,7 +308,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie // avoid strong cycle // avoid strong cycle server = nullptr; server = nullptr; session->join(std::move(clientFd)); RpcSession::join(std::move(session), std::move(clientFd)); } } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { Loading Loading @@ -341,8 +355,7 @@ void RpcServer::onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& sessi (void)mSessions.erase(it); (void)mSessions.erase(it); } } void RpcServer::onSessionServerThreadEnded(const sp<RpcSession>& session) { void RpcServer::onSessionServerThreadEnded() { (void)session; mShutdownCv.notify_all(); mShutdownCv.notify_all(); } } Loading libs/binder/RpcSession.cpp +119 −24 Original line number Original line Diff line number Diff line Loading @@ -59,6 +59,17 @@ sp<RpcSession> RpcSession::make() { return sp<RpcSession>::make(); return sp<RpcSession>::make(); } } void RpcSession::setMaxReverseConnections(size_t connections) { { std::lock_guard<std::mutex> _l(mMutex); LOG_ALWAYS_FATAL_IF(mClientConnections.size() != 0, "Must setup reverse connections before setting up client connections, " "but already has %zu clients", mClientConnections.size()); } mMaxReverseConnections = connections; } bool RpcSession::setupUnixDomainClient(const char* path) { bool RpcSession::setupUnixDomainClient(const char* path) { return setupSocketClient(UnixSocketAddress(path)); return setupSocketClient(UnixSocketAddress(path)); } } Loading Loading @@ -99,6 +110,20 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads); return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads); } } bool RpcSession::shutdown() { std::unique_lock<std::mutex> _l(mMutex); LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session"); LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed"); LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed"); mShutdownTrigger->trigger(); mShutdownListener->waitForShutdown(_l); mState->terminate(); LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed"); return true; } status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data, status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags) { Parcel* reply, uint32_t flags) { ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), Loading Loading @@ -179,6 +204,24 @@ status_t RpcSession::readId() { return OK; return OK; } } void RpcSession::WaitForShutdownListener::onSessionLockedAllServerThreadsEnded( const sp<RpcSession>& session) { (void)session; mShutdown = true; } void RpcSession::WaitForShutdownListener::onSessionServerThreadEnded() { mCv.notify_all(); } void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::mutex>& lock) { while (!mShutdown) { if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) { ALOGE("Waiting for RpcSession to shut down (1s w/o progress)."); } } } void RpcSession::preJoin(std::thread thread) { void RpcSession::preJoin(std::thread thread) { LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); Loading @@ -188,14 +231,13 @@ void RpcSession::preJoin(std::thread thread) { } } } } void RpcSession::join(unique_fd client) { void RpcSession::join(sp<RpcSession>&& session, unique_fd client) { // must be registered to allow arbitrary client code executing commands to // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) // be able to do nested calls (we can't only read from it) sp<RpcConnection> connection = assignServerToThisThread(std::move(client)); sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client)); while (true) { while (true) { status_t error = status_t error = session->state()->getAndExecuteCommand(connection->fd, session); state()->getAndExecuteCommand(connection->fd, sp<RpcSession>::fromExisting(this)); if (error != OK) { if (error != OK) { LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", Loading @@ -204,22 +246,24 @@ void RpcSession::join(unique_fd client) { } } } } LOG_ALWAYS_FATAL_IF(!removeServerConnection(connection), LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection), "bad state: connection object guaranteed to be in list"); "bad state: connection object guaranteed to be in list"); sp<RpcServer> server; sp<RpcSession::EventListener> listener; { { std::lock_guard<std::mutex> _l(mMutex); std::lock_guard<std::mutex> _l(session->mMutex); auto it = mThreads.find(std::this_thread::get_id()); auto it = session->mThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == mThreads.end()); LOG_ALWAYS_FATAL_IF(it == session->mThreads.end()); it->second.detach(); it->second.detach(); mThreads.erase(it); session->mThreads.erase(it); server = mForServer.promote(); listener = session->mEventListener.promote(); } } if (server != nullptr) { session = nullptr; server->onSessionServerThreadEnded(sp<RpcSession>::fromExisting(this)); if (listener != nullptr) { listener->onSessionServerThreadEnded(); } } } } Loading @@ -235,7 +279,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { mClientConnections.size()); mClientConnections.size()); } } if (!setupOneSocketClient(addr, RPC_SESSION_ID_NEW)) return false; if (!setupOneSocketConnection(addr, RPC_SESSION_ID_NEW, false /*reverse*/)) return false; // TODO(b/185167543): we should add additional sessions dynamically // TODO(b/185167543): we should add additional sessions dynamically // instead of all at once. // instead of all at once. Loading @@ -256,13 +300,23 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { // we've already setup one client // we've already setup one client for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { // TODO(b/185167543): shutdown existing connections? // TODO(b/185167543): shutdown existing connections? if (!setupOneSocketClient(addr, mId.value())) return false; if (!setupOneSocketConnection(addr, mId.value(), false /*reverse*/)) return false; } // TODO(b/185167543): we should add additional sessions dynamically // instead of all at once - the other side should be responsible for setting // up additional connections. We need to create at least one (unless 0 are // requested to be set) in order to allow the other side to reliably make // any requests at all. for (size_t i = 0; i < mMaxReverseConnections; i++) { if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false; } } return true; return true; } } bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id) { bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t id, bool reverse) { for (size_t tries = 0; tries < 5; tries++) { for (size_t tries = 0; tries < 5; tries++) { if (tries > 0) usleep(10000); if (tries > 0) usleep(10000); Loading @@ -286,17 +340,48 @@ bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id) return false; return false; } } if (sizeof(id) != TEMP_FAILURE_RETRY(write(serverFd.get(), &id, sizeof(id)))) { RpcConnectionHeader header{ .sessionId = id, }; if (reverse) header.options |= RPC_CONNECTION_OPTION_REVERSE; if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) { int savedErrno = errno; int savedErrno = errno; ALOGE("Could not write id to socket at %s: %s", addr.toString().c_str(), ALOGE("Could not write connection header to socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); strerror(savedErrno)); return false; return false; } } LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); if (reverse) { std::mutex mutex; std::condition_variable joinCv; std::unique_lock<std::mutex> lock(mutex); std::thread thread; sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this); bool ownershipTransferred = false; thread = std::thread([&]() { std::unique_lock<std::mutex> threadLock(mutex); unique_fd fd = std::move(serverFd); // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) sp<RpcSession> session = thiz; session->preJoin(std::move(thread)); ownershipTransferred = true; joinCv.notify_one(); threadLock.unlock(); // do not use & vars below RpcSession::join(std::move(session), std::move(fd)); }); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); return true; } else { return addClientConnection(std::move(serverFd)); return addClientConnection(std::move(serverFd)); } } } ALOGE("Ran out of retries to connect to %s", addr.toString().c_str()); ALOGE("Ran out of retries to connect to %s", addr.toString().c_str()); return false; return false; Loading @@ -305,8 +390,11 @@ bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id) bool RpcSession::addClientConnection(unique_fd fd) { bool RpcSession::addClientConnection(unique_fd fd) { std::lock_guard<std::mutex> _l(mMutex); std::lock_guard<std::mutex> _l(mMutex); // first client connection added, but setForServer not called, so // initializaing for a client. if (mShutdownTrigger == nullptr) { if (mShutdownTrigger == nullptr) { mShutdownTrigger = FdTrigger::make(); mShutdownTrigger = FdTrigger::make(); mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); if (mShutdownTrigger == nullptr) return false; if (mShutdownTrigger == nullptr) return false; } } Loading @@ -316,14 +404,19 @@ bool RpcSession::addClientConnection(unique_fd fd) { return true; return true; } } void RpcSession::setForServer(const wp<RpcServer>& server, int32_t sessionId, void RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, int32_t sessionId, const std::shared_ptr<FdTrigger>& shutdownTrigger) { const std::shared_ptr<FdTrigger>& shutdownTrigger) { LOG_ALWAYS_FATAL_IF(mForServer.unsafe_get() != nullptr); LOG_ALWAYS_FATAL_IF(mForServer != nullptr); LOG_ALWAYS_FATAL_IF(server == nullptr); LOG_ALWAYS_FATAL_IF(mEventListener != nullptr); LOG_ALWAYS_FATAL_IF(eventListener == nullptr); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr); LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr); LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr); mId = sessionId; mId = sessionId; mForServer = server; mForServer = server; mEventListener = eventListener; mShutdownTrigger = shutdownTrigger; mShutdownTrigger = shutdownTrigger; } } Loading @@ -343,9 +436,9 @@ bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) { it != mServerConnections.end()) { it != mServerConnections.end()) { mServerConnections.erase(it); mServerConnections.erase(it); if (mServerConnections.size() == 0) { if (mServerConnections.size() == 0) { sp<RpcServer> server = mForServer.promote(); sp<EventListener> listener = mEventListener.promote(); if (server) { if (listener) { server->onSessionLockedAllServerThreadsEnded(sp<RpcSession>::fromExisting(this)); listener->onSessionLockedAllServerThreadsEnded(sp<RpcSession>::fromExisting(this)); } } } } return true; return true; Loading Loading @@ -405,6 +498,8 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi break; break; } } // TODO(b/185167543): this should return an error, rather than crash a // server // in regular binder, this would usually be a deadlock :) // in regular binder, this would usually be a deadlock :) LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0, LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0, "Session has no client connections. This is required for an RPC server " "Session has no client connections. This is required for an RPC server " Loading libs/binder/RpcState.cpp +1 −0 Original line number Original line Diff line number Diff line Loading @@ -383,6 +383,7 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& return status; return status; if (flags & IBinder::FLAG_ONEWAY) { if (flags & IBinder::FLAG_ONEWAY) { LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", fd.get()); return OK; // do not wait for result return OK; // do not wait for result } } Loading libs/binder/RpcState.h +1 −1 Original line number Original line Diff line number Diff line Loading @@ -86,7 +86,6 @@ public: size_t countBinders(); size_t countBinders(); void dump(); void dump(); private: /** /** * Called when reading or writing data to a session fails to clean up * Called when reading or writing data to a session fails to clean up * data associated with the session in order to cleanup binders. * data associated with the session in order to cleanup binders. Loading @@ -105,6 +104,7 @@ private: */ */ void terminate(); void terminate(); private: // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps // large allocations to avoid being requested from allocating too much data. // large allocations to avoid being requested from allocating too much data. struct CommandData { struct CommandData { Loading libs/binder/RpcWireFormat.h +12 −2 Original line number Original line Diff line number Diff line Loading @@ -20,6 +20,18 @@ namespace android { #pragma clang diagnostic push #pragma clang diagnostic push #pragma clang diagnostic error "-Wpadded" #pragma clang diagnostic error "-Wpadded" constexpr int32_t RPC_SESSION_ID_NEW = -1; enum : uint8_t { RPC_CONNECTION_OPTION_REVERSE = 0x1, }; struct RpcConnectionHeader { int32_t sessionId; uint8_t options; uint8_t reserved[3]; }; enum : uint32_t { enum : uint32_t { /** /** * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected Loading Loading @@ -51,8 +63,6 @@ enum : uint32_t { RPC_SPECIAL_TRANSACT_GET_SESSION_ID = 2, RPC_SPECIAL_TRANSACT_GET_SESSION_ID = 2, }; }; constexpr int32_t RPC_SESSION_ID_NEW = -1; // serialization is like: // serialization is like: // |RpcWireHeader|struct desginated by 'command'| (over and over again) // |RpcWireHeader|struct desginated by 'command'| (over and over again) Loading Loading
libs/binder/RpcServer.cpp +24 −11 Original line number Original line Diff line number Diff line Loading @@ -239,15 +239,16 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie // It must be set before this thread is started // It must be set before this thread is started LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr); LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr); int32_t id; RpcConnectionHeader header; status_t status = status_t status = server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &header, server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &id, sizeof(id)); sizeof(header)); bool idValid = status == OK; bool idValid = status == OK; if (!idValid) { if (!idValid) { ALOGE("Failed to read ID for client connecting to RPC server: %s", ALOGE("Failed to read ID for client connecting to RPC server: %s", statusToString(status).c_str()); statusToString(status).c_str()); // still need to cleanup before we can return // still need to cleanup before we can return } } bool reverse = header.options & RPC_CONNECTION_OPTION_REVERSE; std::thread thisThread; std::thread thisThread; sp<RpcSession> session; sp<RpcSession> session; Loading @@ -269,24 +270,37 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie return; return; } } if (id == RPC_SESSION_ID_NEW) { if (header.sessionId == RPC_SESSION_ID_NEW) { if (reverse) { ALOGE("Cannot create a new session with a reverse connection, would leak"); return; } LOG_ALWAYS_FATAL_IF(server->mSessionIdCounter >= INT32_MAX, "Out of session IDs"); LOG_ALWAYS_FATAL_IF(server->mSessionIdCounter >= INT32_MAX, "Out of session IDs"); server->mSessionIdCounter++; server->mSessionIdCounter++; session = RpcSession::make(); session = RpcSession::make(); session->setForServer(wp<RpcServer>(server), server->mSessionIdCounter, session->setForServer(server, server->mShutdownTrigger); sp<RpcServer::EventListener>::fromExisting( static_cast<RpcServer::EventListener*>(server.get())), server->mSessionIdCounter, server->mShutdownTrigger); server->mSessions[server->mSessionIdCounter] = session; server->mSessions[server->mSessionIdCounter] = session; } else { } else { auto it = server->mSessions.find(id); auto it = server->mSessions.find(header.sessionId); if (it == server->mSessions.end()) { if (it == server->mSessions.end()) { ALOGE("Cannot add thread, no record of session with ID %d", id); ALOGE("Cannot add thread, no record of session with ID %d", header.sessionId); return; return; } } session = it->second; session = it->second; } } if (reverse) { LOG_ALWAYS_FATAL_IF(!session->addClientConnection(std::move(clientFd)), "server state must already be initialized"); return; } detachGuard.Disable(); detachGuard.Disable(); session->preJoin(std::move(thisThread)); session->preJoin(std::move(thisThread)); } } Loading @@ -294,7 +308,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie // avoid strong cycle // avoid strong cycle server = nullptr; server = nullptr; session->join(std::move(clientFd)); RpcSession::join(std::move(session), std::move(clientFd)); } } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { Loading Loading @@ -341,8 +355,7 @@ void RpcServer::onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& sessi (void)mSessions.erase(it); (void)mSessions.erase(it); } } void RpcServer::onSessionServerThreadEnded(const sp<RpcSession>& session) { void RpcServer::onSessionServerThreadEnded() { (void)session; mShutdownCv.notify_all(); mShutdownCv.notify_all(); } } Loading
libs/binder/RpcSession.cpp +119 −24 Original line number Original line Diff line number Diff line Loading @@ -59,6 +59,17 @@ sp<RpcSession> RpcSession::make() { return sp<RpcSession>::make(); return sp<RpcSession>::make(); } } void RpcSession::setMaxReverseConnections(size_t connections) { { std::lock_guard<std::mutex> _l(mMutex); LOG_ALWAYS_FATAL_IF(mClientConnections.size() != 0, "Must setup reverse connections before setting up client connections, " "but already has %zu clients", mClientConnections.size()); } mMaxReverseConnections = connections; } bool RpcSession::setupUnixDomainClient(const char* path) { bool RpcSession::setupUnixDomainClient(const char* path) { return setupSocketClient(UnixSocketAddress(path)); return setupSocketClient(UnixSocketAddress(path)); } } Loading Loading @@ -99,6 +110,20 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads); return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads); } } bool RpcSession::shutdown() { std::unique_lock<std::mutex> _l(mMutex); LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session"); LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed"); LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed"); mShutdownTrigger->trigger(); mShutdownListener->waitForShutdown(_l); mState->terminate(); LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed"); return true; } status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data, status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags) { Parcel* reply, uint32_t flags) { ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), Loading Loading @@ -179,6 +204,24 @@ status_t RpcSession::readId() { return OK; return OK; } } void RpcSession::WaitForShutdownListener::onSessionLockedAllServerThreadsEnded( const sp<RpcSession>& session) { (void)session; mShutdown = true; } void RpcSession::WaitForShutdownListener::onSessionServerThreadEnded() { mCv.notify_all(); } void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::mutex>& lock) { while (!mShutdown) { if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) { ALOGE("Waiting for RpcSession to shut down (1s w/o progress)."); } } } void RpcSession::preJoin(std::thread thread) { void RpcSession::preJoin(std::thread thread) { LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); Loading @@ -188,14 +231,13 @@ void RpcSession::preJoin(std::thread thread) { } } } } void RpcSession::join(unique_fd client) { void RpcSession::join(sp<RpcSession>&& session, unique_fd client) { // must be registered to allow arbitrary client code executing commands to // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) // be able to do nested calls (we can't only read from it) sp<RpcConnection> connection = assignServerToThisThread(std::move(client)); sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client)); while (true) { while (true) { status_t error = status_t error = session->state()->getAndExecuteCommand(connection->fd, session); state()->getAndExecuteCommand(connection->fd, sp<RpcSession>::fromExisting(this)); if (error != OK) { if (error != OK) { LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", Loading @@ -204,22 +246,24 @@ void RpcSession::join(unique_fd client) { } } } } LOG_ALWAYS_FATAL_IF(!removeServerConnection(connection), LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection), "bad state: connection object guaranteed to be in list"); "bad state: connection object guaranteed to be in list"); sp<RpcServer> server; sp<RpcSession::EventListener> listener; { { std::lock_guard<std::mutex> _l(mMutex); std::lock_guard<std::mutex> _l(session->mMutex); auto it = mThreads.find(std::this_thread::get_id()); auto it = session->mThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == mThreads.end()); LOG_ALWAYS_FATAL_IF(it == session->mThreads.end()); it->second.detach(); it->second.detach(); mThreads.erase(it); session->mThreads.erase(it); server = mForServer.promote(); listener = session->mEventListener.promote(); } } if (server != nullptr) { session = nullptr; server->onSessionServerThreadEnded(sp<RpcSession>::fromExisting(this)); if (listener != nullptr) { listener->onSessionServerThreadEnded(); } } } } Loading @@ -235,7 +279,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { mClientConnections.size()); mClientConnections.size()); } } if (!setupOneSocketClient(addr, RPC_SESSION_ID_NEW)) return false; if (!setupOneSocketConnection(addr, RPC_SESSION_ID_NEW, false /*reverse*/)) return false; // TODO(b/185167543): we should add additional sessions dynamically // TODO(b/185167543): we should add additional sessions dynamically // instead of all at once. // instead of all at once. Loading @@ -256,13 +300,23 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { // we've already setup one client // we've already setup one client for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { // TODO(b/185167543): shutdown existing connections? // TODO(b/185167543): shutdown existing connections? if (!setupOneSocketClient(addr, mId.value())) return false; if (!setupOneSocketConnection(addr, mId.value(), false /*reverse*/)) return false; } // TODO(b/185167543): we should add additional sessions dynamically // instead of all at once - the other side should be responsible for setting // up additional connections. We need to create at least one (unless 0 are // requested to be set) in order to allow the other side to reliably make // any requests at all. for (size_t i = 0; i < mMaxReverseConnections; i++) { if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false; } } return true; return true; } } bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id) { bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t id, bool reverse) { for (size_t tries = 0; tries < 5; tries++) { for (size_t tries = 0; tries < 5; tries++) { if (tries > 0) usleep(10000); if (tries > 0) usleep(10000); Loading @@ -286,17 +340,48 @@ bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id) return false; return false; } } if (sizeof(id) != TEMP_FAILURE_RETRY(write(serverFd.get(), &id, sizeof(id)))) { RpcConnectionHeader header{ .sessionId = id, }; if (reverse) header.options |= RPC_CONNECTION_OPTION_REVERSE; if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) { int savedErrno = errno; int savedErrno = errno; ALOGE("Could not write id to socket at %s: %s", addr.toString().c_str(), ALOGE("Could not write connection header to socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); strerror(savedErrno)); return false; return false; } } LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); if (reverse) { std::mutex mutex; std::condition_variable joinCv; std::unique_lock<std::mutex> lock(mutex); std::thread thread; sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this); bool ownershipTransferred = false; thread = std::thread([&]() { std::unique_lock<std::mutex> threadLock(mutex); unique_fd fd = std::move(serverFd); // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) sp<RpcSession> session = thiz; session->preJoin(std::move(thread)); ownershipTransferred = true; joinCv.notify_one(); threadLock.unlock(); // do not use & vars below RpcSession::join(std::move(session), std::move(fd)); }); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); return true; } else { return addClientConnection(std::move(serverFd)); return addClientConnection(std::move(serverFd)); } } } ALOGE("Ran out of retries to connect to %s", addr.toString().c_str()); ALOGE("Ran out of retries to connect to %s", addr.toString().c_str()); return false; return false; Loading @@ -305,8 +390,11 @@ bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id) bool RpcSession::addClientConnection(unique_fd fd) { bool RpcSession::addClientConnection(unique_fd fd) { std::lock_guard<std::mutex> _l(mMutex); std::lock_guard<std::mutex> _l(mMutex); // first client connection added, but setForServer not called, so // initializaing for a client. if (mShutdownTrigger == nullptr) { if (mShutdownTrigger == nullptr) { mShutdownTrigger = FdTrigger::make(); mShutdownTrigger = FdTrigger::make(); mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); if (mShutdownTrigger == nullptr) return false; if (mShutdownTrigger == nullptr) return false; } } Loading @@ -316,14 +404,19 @@ bool RpcSession::addClientConnection(unique_fd fd) { return true; return true; } } void RpcSession::setForServer(const wp<RpcServer>& server, int32_t sessionId, void RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, int32_t sessionId, const std::shared_ptr<FdTrigger>& shutdownTrigger) { const std::shared_ptr<FdTrigger>& shutdownTrigger) { LOG_ALWAYS_FATAL_IF(mForServer.unsafe_get() != nullptr); LOG_ALWAYS_FATAL_IF(mForServer != nullptr); LOG_ALWAYS_FATAL_IF(server == nullptr); LOG_ALWAYS_FATAL_IF(mEventListener != nullptr); LOG_ALWAYS_FATAL_IF(eventListener == nullptr); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr); LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr); LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr); mId = sessionId; mId = sessionId; mForServer = server; mForServer = server; mEventListener = eventListener; mShutdownTrigger = shutdownTrigger; mShutdownTrigger = shutdownTrigger; } } Loading @@ -343,9 +436,9 @@ bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) { it != mServerConnections.end()) { it != mServerConnections.end()) { mServerConnections.erase(it); mServerConnections.erase(it); if (mServerConnections.size() == 0) { if (mServerConnections.size() == 0) { sp<RpcServer> server = mForServer.promote(); sp<EventListener> listener = mEventListener.promote(); if (server) { if (listener) { server->onSessionLockedAllServerThreadsEnded(sp<RpcSession>::fromExisting(this)); listener->onSessionLockedAllServerThreadsEnded(sp<RpcSession>::fromExisting(this)); } } } } return true; return true; Loading Loading @@ -405,6 +498,8 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi break; break; } } // TODO(b/185167543): this should return an error, rather than crash a // server // in regular binder, this would usually be a deadlock :) // in regular binder, this would usually be a deadlock :) LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0, LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0, "Session has no client connections. This is required for an RPC server " "Session has no client connections. This is required for an RPC server " Loading
libs/binder/RpcState.cpp +1 −0 Original line number Original line Diff line number Diff line Loading @@ -383,6 +383,7 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& return status; return status; if (flags & IBinder::FLAG_ONEWAY) { if (flags & IBinder::FLAG_ONEWAY) { LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", fd.get()); return OK; // do not wait for result return OK; // do not wait for result } } Loading
libs/binder/RpcState.h +1 −1 Original line number Original line Diff line number Diff line Loading @@ -86,7 +86,6 @@ public: size_t countBinders(); size_t countBinders(); void dump(); void dump(); private: /** /** * Called when reading or writing data to a session fails to clean up * Called when reading or writing data to a session fails to clean up * data associated with the session in order to cleanup binders. * data associated with the session in order to cleanup binders. Loading @@ -105,6 +104,7 @@ private: */ */ void terminate(); void terminate(); private: // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps // large allocations to avoid being requested from allocating too much data. // large allocations to avoid being requested from allocating too much data. struct CommandData { struct CommandData { Loading
libs/binder/RpcWireFormat.h +12 −2 Original line number Original line Diff line number Diff line Loading @@ -20,6 +20,18 @@ namespace android { #pragma clang diagnostic push #pragma clang diagnostic push #pragma clang diagnostic error "-Wpadded" #pragma clang diagnostic error "-Wpadded" constexpr int32_t RPC_SESSION_ID_NEW = -1; enum : uint8_t { RPC_CONNECTION_OPTION_REVERSE = 0x1, }; struct RpcConnectionHeader { int32_t sessionId; uint8_t options; uint8_t reserved[3]; }; enum : uint32_t { enum : uint32_t { /** /** * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected Loading Loading @@ -51,8 +63,6 @@ enum : uint32_t { RPC_SPECIAL_TRANSACT_GET_SESSION_ID = 2, RPC_SPECIAL_TRANSACT_GET_SESSION_ID = 2, }; }; constexpr int32_t RPC_SESSION_ID_NEW = -1; // serialization is like: // serialization is like: // |RpcWireHeader|struct desginated by 'command'| (over and over again) // |RpcWireHeader|struct desginated by 'command'| (over and over again) Loading