Loading libs/binder/RpcConnection.cpp +56 −49 Original line number Diff line number Diff line Loading @@ -57,6 +57,10 @@ RpcConnection::RpcConnection() { } RpcConnection::~RpcConnection() { LOG_RPC_DETAIL("RpcConnection destroyed %p", this); std::lock_guard<std::mutex> _l(mSocketMutex); LOG_ALWAYS_FATAL_IF(mServers.size() != 0, "Should not be able to destroy a connection with servers in use."); } sp<RpcConnection> RpcConnection::make() { Loading Loading @@ -222,8 +226,8 @@ status_t RpcConnection::sendDecStrong(const RpcAddress& address) { } void RpcConnection::join() { // establish a connection { // TODO(b/185167543): do this dynamically, instead of from a static number // of threads unique_fd clientFd( TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC))); if (clientFd < 0) { Loading @@ -235,23 +239,22 @@ void RpcConnection::join() { LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); assignServerToThisThread(std::move(clientFd)); } // We may not use the connection we just established (two threads might // establish connections for each other), but for now, just use one // server/socket connection. ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), SocketUse::SERVER); // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) sp<ConnectionSocket> socket = assignServerToThisThread(std::move(clientFd)); while (true) { status_t error = state()->getAndExecuteCommand(socket.fd(), sp<RpcConnection>::fromExisting(this)); state()->getAndExecuteCommand(socket->fd, sp<RpcConnection>::fromExisting(this)); if (error != OK) { ALOGI("Binder socket thread closing w/ status %s", statusToString(error).c_str()); return; break; } } LOG_ALWAYS_FATAL_IF(!removeServerSocket(socket), "bad state: socket object guaranteed to be in list"); } void RpcConnection::setForServer(const wp<RpcServer>& server) { Loading Loading @@ -316,11 +319,23 @@ void RpcConnection::addClient(unique_fd&& fd) { mClients.push_back(connection); } void RpcConnection::assignServerToThisThread(unique_fd&& fd) { sp<RpcConnection::ConnectionSocket> RpcConnection::assignServerToThisThread(unique_fd&& fd) { std::lock_guard<std::mutex> _l(mSocketMutex); sp<ConnectionSocket> connection = sp<ConnectionSocket>::make(); connection->fd = std::move(fd); connection->exclusiveTid = gettid(); mServers.push_back(connection); return connection; } bool RpcConnection::removeServerSocket(const sp<ConnectionSocket>& socket) { std::lock_guard<std::mutex> _l(mSocketMutex); if (auto it = std::find(mServers.begin(), mServers.end(), socket); it != mServers.end()) { mServers.erase(it); return true; } return false; } RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connection, SocketUse use) Loading @@ -335,10 +350,8 @@ RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connect // CHECK FOR DEDICATED CLIENT SOCKET // // A server/looper should always use a dedicated connection. if (use != SocketUse::SERVER) { findSocket(tid, &exclusive, &available, mConnection->mClients, mConnection->mClientsOffset); // A server/looper should always use a dedicated connection if available findSocket(tid, &exclusive, &available, mConnection->mClients, mConnection->mClientsOffset); // WARNING: this assumes a server cannot request its client to send // a transaction, as mServers is excluded below. Loading @@ -354,18 +367,14 @@ RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connect mConnection->mClientsOffset = (mConnection->mClientsOffset + 1) % mConnection->mClients.size(); } } // USE SERVING SOCKET (to start serving or for nested transaction) // USE SERVING SOCKET (for nested transaction) // // asynchronous calls cannot be nested if (use != SocketUse::CLIENT_ASYNC) { // servers should start serving on an available thread only // otherwise, this should only be a nested call bool useAvailable = use == SocketUse::SERVER; findSocket(tid, &exclusive, (useAvailable ? &available : nullptr), mConnection->mServers, 0 /* index hint */); // server sockets are always assigned to a thread findSocket(tid, &exclusive, nullptr /*available*/, mConnection->mServers, 0 /* index hint */); } // if our thread is already using a connection, prioritize using that Loading @@ -379,8 +388,6 @@ RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connect break; } LOG_ALWAYS_FATAL_IF(use == SocketUse::SERVER, "Must create connection to join one."); // in regular binder, this would usually be a deadlock :) LOG_ALWAYS_FATAL_IF(mConnection->mClients.size() == 0, "Not a client of any connection. You must create a connection to an " Loading libs/binder/include/binder/RpcConnection.h +6 −6 Original line number Diff line number Diff line Loading @@ -128,11 +128,6 @@ private: friend sp<RpcConnection>; RpcConnection(); bool setupSocketServer(const SocketAddress& address); bool addSocketClient(const SocketAddress& address); void addClient(base::unique_fd&& fd); void assignServerToThisThread(base::unique_fd&& fd); struct ConnectionSocket : public RefBase { base::unique_fd fd; Loading @@ -141,11 +136,16 @@ private: std::optional<pid_t> exclusiveTid; }; bool setupSocketServer(const SocketAddress& address); bool addSocketClient(const SocketAddress& address); void addClient(base::unique_fd&& fd); sp<ConnectionSocket> assignServerToThisThread(base::unique_fd&& fd); bool removeServerSocket(const sp<ConnectionSocket>& socket); enum class SocketUse { CLIENT, CLIENT_ASYNC, CLIENT_REFCOUNT, SERVER, }; // RAII object for connection socket Loading Loading
libs/binder/RpcConnection.cpp +56 −49 Original line number Diff line number Diff line Loading @@ -57,6 +57,10 @@ RpcConnection::RpcConnection() { } RpcConnection::~RpcConnection() { LOG_RPC_DETAIL("RpcConnection destroyed %p", this); std::lock_guard<std::mutex> _l(mSocketMutex); LOG_ALWAYS_FATAL_IF(mServers.size() != 0, "Should not be able to destroy a connection with servers in use."); } sp<RpcConnection> RpcConnection::make() { Loading Loading @@ -222,8 +226,8 @@ status_t RpcConnection::sendDecStrong(const RpcAddress& address) { } void RpcConnection::join() { // establish a connection { // TODO(b/185167543): do this dynamically, instead of from a static number // of threads unique_fd clientFd( TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC))); if (clientFd < 0) { Loading @@ -235,23 +239,22 @@ void RpcConnection::join() { LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); assignServerToThisThread(std::move(clientFd)); } // We may not use the connection we just established (two threads might // establish connections for each other), but for now, just use one // server/socket connection. ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), SocketUse::SERVER); // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) sp<ConnectionSocket> socket = assignServerToThisThread(std::move(clientFd)); while (true) { status_t error = state()->getAndExecuteCommand(socket.fd(), sp<RpcConnection>::fromExisting(this)); state()->getAndExecuteCommand(socket->fd, sp<RpcConnection>::fromExisting(this)); if (error != OK) { ALOGI("Binder socket thread closing w/ status %s", statusToString(error).c_str()); return; break; } } LOG_ALWAYS_FATAL_IF(!removeServerSocket(socket), "bad state: socket object guaranteed to be in list"); } void RpcConnection::setForServer(const wp<RpcServer>& server) { Loading Loading @@ -316,11 +319,23 @@ void RpcConnection::addClient(unique_fd&& fd) { mClients.push_back(connection); } void RpcConnection::assignServerToThisThread(unique_fd&& fd) { sp<RpcConnection::ConnectionSocket> RpcConnection::assignServerToThisThread(unique_fd&& fd) { std::lock_guard<std::mutex> _l(mSocketMutex); sp<ConnectionSocket> connection = sp<ConnectionSocket>::make(); connection->fd = std::move(fd); connection->exclusiveTid = gettid(); mServers.push_back(connection); return connection; } bool RpcConnection::removeServerSocket(const sp<ConnectionSocket>& socket) { std::lock_guard<std::mutex> _l(mSocketMutex); if (auto it = std::find(mServers.begin(), mServers.end(), socket); it != mServers.end()) { mServers.erase(it); return true; } return false; } RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connection, SocketUse use) Loading @@ -335,10 +350,8 @@ RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connect // CHECK FOR DEDICATED CLIENT SOCKET // // A server/looper should always use a dedicated connection. if (use != SocketUse::SERVER) { findSocket(tid, &exclusive, &available, mConnection->mClients, mConnection->mClientsOffset); // A server/looper should always use a dedicated connection if available findSocket(tid, &exclusive, &available, mConnection->mClients, mConnection->mClientsOffset); // WARNING: this assumes a server cannot request its client to send // a transaction, as mServers is excluded below. Loading @@ -354,18 +367,14 @@ RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connect mConnection->mClientsOffset = (mConnection->mClientsOffset + 1) % mConnection->mClients.size(); } } // USE SERVING SOCKET (to start serving or for nested transaction) // USE SERVING SOCKET (for nested transaction) // // asynchronous calls cannot be nested if (use != SocketUse::CLIENT_ASYNC) { // servers should start serving on an available thread only // otherwise, this should only be a nested call bool useAvailable = use == SocketUse::SERVER; findSocket(tid, &exclusive, (useAvailable ? &available : nullptr), mConnection->mServers, 0 /* index hint */); // server sockets are always assigned to a thread findSocket(tid, &exclusive, nullptr /*available*/, mConnection->mServers, 0 /* index hint */); } // if our thread is already using a connection, prioritize using that Loading @@ -379,8 +388,6 @@ RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connect break; } LOG_ALWAYS_FATAL_IF(use == SocketUse::SERVER, "Must create connection to join one."); // in regular binder, this would usually be a deadlock :) LOG_ALWAYS_FATAL_IF(mConnection->mClients.size() == 0, "Not a client of any connection. You must create a connection to an " Loading
libs/binder/include/binder/RpcConnection.h +6 −6 Original line number Diff line number Diff line Loading @@ -128,11 +128,6 @@ private: friend sp<RpcConnection>; RpcConnection(); bool setupSocketServer(const SocketAddress& address); bool addSocketClient(const SocketAddress& address); void addClient(base::unique_fd&& fd); void assignServerToThisThread(base::unique_fd&& fd); struct ConnectionSocket : public RefBase { base::unique_fd fd; Loading @@ -141,11 +136,16 @@ private: std::optional<pid_t> exclusiveTid; }; bool setupSocketServer(const SocketAddress& address); bool addSocketClient(const SocketAddress& address); void addClient(base::unique_fd&& fd); sp<ConnectionSocket> assignServerToThisThread(base::unique_fd&& fd); bool removeServerSocket(const sp<ConnectionSocket>& socket); enum class SocketUse { CLIENT, CLIENT_ASYNC, CLIENT_REFCOUNT, SERVER, }; // RAII object for connection socket Loading