Loading libs/binder/RpcConnection.cpp +25 −3 Original line number Diff line number Diff line Loading @@ -133,6 +133,21 @@ status_t RpcConnection::readId() { return OK; } void RpcConnection::startThread(unique_fd client) { std::lock_guard<std::mutex> _l(mSocketMutex); sp<RpcConnection> holdThis = sp<RpcConnection>::fromExisting(this); int fd = client.release(); auto thread = std::thread([=] { holdThis->join(unique_fd(fd)); { std::lock_guard<std::mutex> _l(holdThis->mSocketMutex); size_t erased = mThreads.erase(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(erased != 0, "Could not erase thread."); } }); mThreads[thread.get_id()] = std::move(thread); } void RpcConnection::join(unique_fd client) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) Loading Loading @@ -164,7 +179,7 @@ bool RpcConnection::setupSocketClient(const RpcSocketAddress& addr) { mClients.size()); } if (!setupOneSocketClient(addr)) return false; if (!setupOneSocketClient(addr, RPC_CONNECTION_ID_NEW)) return false; // TODO(b/185167543): we should add additional connections dynamically // instead of all at once. Loading @@ -186,7 +201,7 @@ bool RpcConnection::setupSocketClient(const RpcSocketAddress& addr) { for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { // TODO(b/185167543): avoid race w/ accept4 not being called on server for (size_t tries = 0; tries < 5; tries++) { if (setupOneSocketClient(addr)) break; if (setupOneSocketClient(addr, mId.value())) break; usleep(10000); } } Loading @@ -194,7 +209,7 @@ bool RpcConnection::setupSocketClient(const RpcSocketAddress& addr) { return true; } bool RpcConnection::setupOneSocketClient(const RpcSocketAddress& addr) { bool RpcConnection::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id) { unique_fd serverFd( TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0))); if (serverFd == -1) { Loading @@ -209,6 +224,13 @@ bool RpcConnection::setupOneSocketClient(const RpcSocketAddress& addr) { return false; } if (sizeof(id) != TEMP_FAILURE_RETRY(write(serverFd.get(), &id, sizeof(id)))) { int savedErrno = errno; ALOGE("Could not write id to socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return false; } LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); addClient(std::move(serverFd)); Loading libs/binder/RpcServer.cpp +49 −28 Original line number Diff line number Diff line Loading @@ -126,40 +126,61 @@ void RpcServer::join() { { std::lock_guard<std::mutex> _l(mLock); LOG_ALWAYS_FATAL_IF(mServer.get() == -1, "RpcServer must be setup to join."); // TODO(b/185167543): support more than one client at once mConnection = RpcConnection::make(); mConnection->setForServer(sp<RpcServer>::fromExisting(this), 42 /*placeholder id*/); mStarted = true; for (size_t i = 0; i < mMaxThreads; i++) { pool.push_back(std::thread([=] { // TODO(b/185167543): do this dynamically, instead of from a static number // of threads unique_fd clientFd(TEMP_FAILURE_RETRY( accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC))); } while (true) { unique_fd clientFd( TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC))); if (clientFd < 0) { // If this log becomes confusing, should save more state from // setupUnixDomainServer in order to output here. ALOGE("Could not accept4 socket: %s", strerror(errno)); return; continue; } LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); mConnection->join(std::move(clientFd)); })); // TODO(b/183988761): cannot trust this simple ID LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); int32_t id; if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) { ALOGE("Could not read ID from fd %d", clientFd.get()); continue; } { std::lock_guard<std::mutex> _l(mLock); sp<RpcConnection> connection; if (id == RPC_CONNECTION_ID_NEW) { // new client! LOG_ALWAYS_FATAL_IF(mConnectionIdCounter >= INT32_MAX, "Out of connection IDs"); mConnectionIdCounter++; connection = RpcConnection::make(); connection->setForServer(wp<RpcServer>::fromExisting(this), mConnectionIdCounter); mConnections[mConnectionIdCounter] = connection; } else { auto it = mConnections.find(id); if (it == mConnections.end()) { ALOGE("Cannot add thread, no record of connection with ID %d", id); continue; } connection = it->second; } // TODO(b/185167543): don't waste extra thread for join, and combine threads // between clients for (auto& t : pool) t.join(); connection->startThread(std::move(clientFd)); } } } std::vector<sp<RpcConnection>> RpcServer::listConnections() { std::lock_guard<std::mutex> _l(mLock); if (mConnection == nullptr) return {}; return {mConnection}; std::vector<sp<RpcConnection>> connections; for (auto& [id, connection] : mConnections) { (void)id; connections.push_back(connection); } return connections; } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { Loading libs/binder/RpcWireFormat.h +2 −0 Original line number Diff line number Diff line Loading @@ -51,6 +51,8 @@ enum : uint32_t { RPC_SPECIAL_TRANSACT_GET_CONNECTION_ID = 2, }; constexpr int32_t RPC_CONNECTION_ID_NEW = -1; // serialization is like: // |RpcWireHeader|struct desginated by 'command'| (over and over again) Loading libs/binder/include/binder/RpcConnection.h +11 −1 Original line number Diff line number Diff line Loading @@ -21,7 +21,9 @@ #include <utils/Errors.h> #include <utils/RefBase.h> #include <map> #include <optional> #include <thread> #include <vector> // WARNING: This is a feature which is still in development, and it is subject Loading Loading @@ -113,6 +115,7 @@ private: status_t readId(); void startThread(base::unique_fd client); void join(base::unique_fd client); struct ConnectionSocket : public RefBase { Loading @@ -124,7 +127,7 @@ private: }; bool setupSocketClient(const RpcSocketAddress& address); bool setupOneSocketClient(const RpcSocketAddress& address); bool setupOneSocketClient(const RpcSocketAddress& address, int32_t connectionId); void addClient(base::unique_fd fd); void setForServer(const wp<RpcServer>& server, int32_t connectionId); sp<ConnectionSocket> assignServerToThisThread(base::unique_fd fd); Loading Loading @@ -179,11 +182,18 @@ private: std::unique_ptr<RpcState> mState; std::mutex mSocketMutex; // for all below std::condition_variable mSocketCv; // for mWaitingThreads size_t mWaitingThreads = 0; size_t mClientsOffset = 0; // hint index into clients, ++ when sending an async transaction std::vector<sp<ConnectionSocket>> mClients; std::vector<sp<ConnectionSocket>> mServers; // TODO(b/185167543): use for reverse connections (allow client to also // serve calls on a connection). // TODO(b/185167543): allow sharing between different connections in a // process? (or combine with mServers) std::map<std::thread::id, std::thread> mThreads; }; } // namespace android libs/binder/include/binder/RpcServer.h +4 −1 Original line number Diff line number Diff line Loading @@ -97,6 +97,8 @@ public: /** * You must have at least one client connection before calling this. * * TODO(b/185167543): way to shut down? */ void join(); Loading @@ -120,7 +122,8 @@ private: std::mutex mLock; // for below sp<IBinder> mRootObject; sp<RpcConnection> mConnection; std::map<int32_t, sp<RpcConnection>> mConnections; int32_t mConnectionIdCounter = 0; }; } // namespace android Loading
libs/binder/RpcConnection.cpp +25 −3 Original line number Diff line number Diff line Loading @@ -133,6 +133,21 @@ status_t RpcConnection::readId() { return OK; } void RpcConnection::startThread(unique_fd client) { std::lock_guard<std::mutex> _l(mSocketMutex); sp<RpcConnection> holdThis = sp<RpcConnection>::fromExisting(this); int fd = client.release(); auto thread = std::thread([=] { holdThis->join(unique_fd(fd)); { std::lock_guard<std::mutex> _l(holdThis->mSocketMutex); size_t erased = mThreads.erase(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(erased != 0, "Could not erase thread."); } }); mThreads[thread.get_id()] = std::move(thread); } void RpcConnection::join(unique_fd client) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) Loading Loading @@ -164,7 +179,7 @@ bool RpcConnection::setupSocketClient(const RpcSocketAddress& addr) { mClients.size()); } if (!setupOneSocketClient(addr)) return false; if (!setupOneSocketClient(addr, RPC_CONNECTION_ID_NEW)) return false; // TODO(b/185167543): we should add additional connections dynamically // instead of all at once. Loading @@ -186,7 +201,7 @@ bool RpcConnection::setupSocketClient(const RpcSocketAddress& addr) { for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { // TODO(b/185167543): avoid race w/ accept4 not being called on server for (size_t tries = 0; tries < 5; tries++) { if (setupOneSocketClient(addr)) break; if (setupOneSocketClient(addr, mId.value())) break; usleep(10000); } } Loading @@ -194,7 +209,7 @@ bool RpcConnection::setupSocketClient(const RpcSocketAddress& addr) { return true; } bool RpcConnection::setupOneSocketClient(const RpcSocketAddress& addr) { bool RpcConnection::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id) { unique_fd serverFd( TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0))); if (serverFd == -1) { Loading @@ -209,6 +224,13 @@ bool RpcConnection::setupOneSocketClient(const RpcSocketAddress& addr) { return false; } if (sizeof(id) != TEMP_FAILURE_RETRY(write(serverFd.get(), &id, sizeof(id)))) { int savedErrno = errno; ALOGE("Could not write id to socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return false; } LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); addClient(std::move(serverFd)); Loading
libs/binder/RpcServer.cpp +49 −28 Original line number Diff line number Diff line Loading @@ -126,40 +126,61 @@ void RpcServer::join() { { std::lock_guard<std::mutex> _l(mLock); LOG_ALWAYS_FATAL_IF(mServer.get() == -1, "RpcServer must be setup to join."); // TODO(b/185167543): support more than one client at once mConnection = RpcConnection::make(); mConnection->setForServer(sp<RpcServer>::fromExisting(this), 42 /*placeholder id*/); mStarted = true; for (size_t i = 0; i < mMaxThreads; i++) { pool.push_back(std::thread([=] { // TODO(b/185167543): do this dynamically, instead of from a static number // of threads unique_fd clientFd(TEMP_FAILURE_RETRY( accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC))); } while (true) { unique_fd clientFd( TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC))); if (clientFd < 0) { // If this log becomes confusing, should save more state from // setupUnixDomainServer in order to output here. ALOGE("Could not accept4 socket: %s", strerror(errno)); return; continue; } LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); mConnection->join(std::move(clientFd)); })); // TODO(b/183988761): cannot trust this simple ID LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); int32_t id; if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) { ALOGE("Could not read ID from fd %d", clientFd.get()); continue; } { std::lock_guard<std::mutex> _l(mLock); sp<RpcConnection> connection; if (id == RPC_CONNECTION_ID_NEW) { // new client! LOG_ALWAYS_FATAL_IF(mConnectionIdCounter >= INT32_MAX, "Out of connection IDs"); mConnectionIdCounter++; connection = RpcConnection::make(); connection->setForServer(wp<RpcServer>::fromExisting(this), mConnectionIdCounter); mConnections[mConnectionIdCounter] = connection; } else { auto it = mConnections.find(id); if (it == mConnections.end()) { ALOGE("Cannot add thread, no record of connection with ID %d", id); continue; } connection = it->second; } // TODO(b/185167543): don't waste extra thread for join, and combine threads // between clients for (auto& t : pool) t.join(); connection->startThread(std::move(clientFd)); } } } std::vector<sp<RpcConnection>> RpcServer::listConnections() { std::lock_guard<std::mutex> _l(mLock); if (mConnection == nullptr) return {}; return {mConnection}; std::vector<sp<RpcConnection>> connections; for (auto& [id, connection] : mConnections) { (void)id; connections.push_back(connection); } return connections; } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { Loading
libs/binder/RpcWireFormat.h +2 −0 Original line number Diff line number Diff line Loading @@ -51,6 +51,8 @@ enum : uint32_t { RPC_SPECIAL_TRANSACT_GET_CONNECTION_ID = 2, }; constexpr int32_t RPC_CONNECTION_ID_NEW = -1; // serialization is like: // |RpcWireHeader|struct desginated by 'command'| (over and over again) Loading
libs/binder/include/binder/RpcConnection.h +11 −1 Original line number Diff line number Diff line Loading @@ -21,7 +21,9 @@ #include <utils/Errors.h> #include <utils/RefBase.h> #include <map> #include <optional> #include <thread> #include <vector> // WARNING: This is a feature which is still in development, and it is subject Loading Loading @@ -113,6 +115,7 @@ private: status_t readId(); void startThread(base::unique_fd client); void join(base::unique_fd client); struct ConnectionSocket : public RefBase { Loading @@ -124,7 +127,7 @@ private: }; bool setupSocketClient(const RpcSocketAddress& address); bool setupOneSocketClient(const RpcSocketAddress& address); bool setupOneSocketClient(const RpcSocketAddress& address, int32_t connectionId); void addClient(base::unique_fd fd); void setForServer(const wp<RpcServer>& server, int32_t connectionId); sp<ConnectionSocket> assignServerToThisThread(base::unique_fd fd); Loading Loading @@ -179,11 +182,18 @@ private: std::unique_ptr<RpcState> mState; std::mutex mSocketMutex; // for all below std::condition_variable mSocketCv; // for mWaitingThreads size_t mWaitingThreads = 0; size_t mClientsOffset = 0; // hint index into clients, ++ when sending an async transaction std::vector<sp<ConnectionSocket>> mClients; std::vector<sp<ConnectionSocket>> mServers; // TODO(b/185167543): use for reverse connections (allow client to also // serve calls on a connection). // TODO(b/185167543): allow sharing between different connections in a // process? (or combine with mServers) std::map<std::thread::id, std::thread> mThreads; }; } // namespace android
libs/binder/include/binder/RpcServer.h +4 −1 Original line number Diff line number Diff line Loading @@ -97,6 +97,8 @@ public: /** * You must have at least one client connection before calling this. * * TODO(b/185167543): way to shut down? */ void join(); Loading @@ -120,7 +122,8 @@ private: std::mutex mLock; // for below sp<IBinder> mRootObject; sp<RpcConnection> mConnection; std::map<int32_t, sp<RpcConnection>> mConnections; int32_t mConnectionIdCounter = 0; }; } // namespace android