Loading libs/binder/RpcServer.cpp +40 −21 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ #include <thread> #include <vector> #include <android-base/scopeguard.h> #include <binder/Parcel.h> #include <binder/RpcServer.h> #include <log/log.h> Loading @@ -32,6 +33,7 @@ namespace android { using base::ScopeGuard; using base::unique_fd; RpcServer::RpcServer() {} Loading Loading @@ -125,19 +127,21 @@ sp<IBinder> RpcServer::getRootObject() { } void RpcServer::join() { while (true) { (void)acceptOne(); } } bool RpcServer::acceptOne() { LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); { std::lock_guard<std::mutex> _l(mLock); LOG_ALWAYS_FATAL_IF(mServer.get() == -1, "RpcServer must be setup to join."); } while (true) { unique_fd clientFd(TEMP_FAILURE_RETRY( accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC))); unique_fd clientFd( TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC))); if (clientFd < 0) { ALOGE("Could not accept4 socket: %s", strerror(errno)); continue; return false; } LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); Loading @@ -148,7 +152,8 @@ void RpcServer::join() { std::move(sp<RpcServer>::fromExisting(this)), std::move(clientFd)); mConnectingThreads[thread.get_id()] = std::move(thread); } } return true; } std::vector<sp<RpcSession>> RpcServer::listSessions() { Loading @@ -161,15 +166,21 @@ std::vector<sp<RpcSession>> RpcServer::listSessions() { return sessions; } size_t RpcServer::numUninitializedSessions() { std::lock_guard<std::mutex> _l(mLock); return mConnectingThreads.size(); } void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) { LOG_ALWAYS_FATAL_IF(this != server.get(), "Must pass same ownership object"); // TODO(b/183988761): cannot trust this simple ID LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); bool idValid = true; int32_t id; if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) { ALOGE("Could not read ID from fd %d", clientFd.get()); return; idValid = false; } std::thread thisThread; Loading @@ -181,8 +192,13 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie LOG_ALWAYS_FATAL_IF(threadId == mConnectingThreads.end(), "Must establish connection on owned thread"); thisThread = std::move(threadId->second); ScopeGuard detachGuard = [&]() { thisThread.detach(); }; mConnectingThreads.erase(threadId); if (!idValid) { return; } if (id == RPC_SESSION_ID_NEW) { LOG_ALWAYS_FATAL_IF(mSessionIdCounter >= INT32_MAX, "Out of session IDs"); mSessionIdCounter++; Loading @@ -199,6 +215,9 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie } session = it->second; } detachGuard.Disable(); session->preJoin(std::move(thisThread)); } // avoid strong cycle Loading @@ -208,7 +227,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie // DO NOT ACCESS MEMBER VARIABLES BELOW // session->join(std::move(thisThread), std::move(clientFd)); session->join(std::move(clientFd)); } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { Loading libs/binder/RpcSession.cpp +3 −1 Original line number Diff line number Diff line Loading @@ -131,14 +131,16 @@ status_t RpcSession::readId() { return OK; } void RpcSession::join(std::thread thread, unique_fd client) { void RpcSession::preJoin(std::thread thread) { LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); { std::lock_guard<std::mutex> _l(mMutex); mThreads[thread.get_id()] = std::move(thread); } } void RpcSession::join(unique_fd client) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) sp<RpcConnection> connection = assignServerToThisThread(std::move(client)); Loading libs/binder/RpcState.cpp +28 −7 Original line number Diff line number Diff line Loading @@ -182,6 +182,27 @@ void RpcState::terminate() { } } RpcState::CommandData::CommandData(size_t size) : mSize(size) { // The maximum size for regular binder is 1MB for all concurrent // transactions. A very small proportion of transactions are even // larger than a page, but we need to avoid allocating too much // data on behalf of an arbitrary client, or we could risk being in // a position where a single additional allocation could run out of // memory. // // Note, this limit may not reflect the total amount of data allocated for a // transaction (in some cases, additional fixed size amounts are added), // though for rough consistency, we should avoid cases where this data type // is used for multiple dynamic allocations for a single transaction. constexpr size_t kMaxTransactionAllocation = 100 * 1000; if (size == 0) return; if (size > kMaxTransactionAllocation) { ALOGW("Transaction requested too much data allocation %zu", size); return; } mData.reset(new (std::nothrow) uint8_t[size]); } bool RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data, size_t size) { LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str()); Loading Loading @@ -326,7 +347,7 @@ status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address .asyncNumber = asyncNumber, }; ByteVec transactionData(sizeof(RpcWireTransaction) + data.dataSize()); CommandData transactionData(sizeof(RpcWireTransaction) + data.dataSize()); if (!transactionData.valid()) { return NO_MEMORY; } Loading Loading @@ -383,7 +404,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& if (status != OK) return status; } ByteVec data(command.bodySize); CommandData data(command.bodySize); if (!data.valid()) { return NO_MEMORY; } Loading Loading @@ -469,7 +490,7 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSessio const RpcWireHeader& command) { LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_TRANSACT, "command: %d", command.command); ByteVec transactionData(command.bodySize); CommandData transactionData(command.bodySize); if (!transactionData.valid()) { return NO_MEMORY; } Loading @@ -490,7 +511,7 @@ static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t d } status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session, ByteVec transactionData) { CommandData transactionData) { if (transactionData.size() < sizeof(RpcWireTransaction)) { ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!", sizeof(RpcWireTransaction), transactionData.size()); Loading Loading @@ -640,7 +661,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R // justification for const_cast (consider avoiding priority_queue): // - AsyncTodo operator< doesn't depend on 'data' object // - gotta go fast ByteVec data = std::move( CommandData data = std::move( const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top()).data); it->second.asyncTodo.pop(); _l.unlock(); Loading @@ -654,7 +675,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R .status = replyStatus, }; ByteVec replyData(sizeof(RpcWireReply) + reply.dataSize()); CommandData replyData(sizeof(RpcWireReply) + reply.dataSize()); if (!replyData.valid()) { return NO_MEMORY; } Loading Loading @@ -684,7 +705,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R status_t RpcState::processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command) { LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command); ByteVec commandData(command.bodySize); CommandData commandData(command.bodySize); if (!commandData.valid()) { return NO_MEMORY; } Loading libs/binder/RpcState.h +6 −6 Original line number Diff line number Diff line Loading @@ -101,10 +101,10 @@ private: */ void terminate(); // alternative to std::vector<uint8_t> that doesn't abort on too big of allocations struct ByteVec { explicit ByteVec(size_t size) : mData(size > 0 ? new (std::nothrow) uint8_t[size] : nullptr), mSize(size) {} // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps // large allocations to avoid being requested from allocating too much data. struct CommandData { explicit CommandData(size_t size); bool valid() { return mSize == 0 || mData != nullptr; } size_t size() { return mSize; } uint8_t* data() { return mData.get(); } Loading @@ -128,7 +128,7 @@ private: const RpcWireHeader& command); [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session, ByteVec transactionData); CommandData transactionData); [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command); Loading Loading @@ -163,7 +163,7 @@ private: // async transaction queue, _only_ for local binder struct AsyncTodo { ByteVec data; CommandData data; uint64_t asyncNumber = 0; bool operator<(const AsyncTodo& o) const { Loading libs/binder/include/binder/RpcServer.h +7 −0 Original line number Diff line number Diff line Loading @@ -107,10 +107,17 @@ public: */ void join(); /** * Accept one connection on this server. You must have at least one client * session before calling this. */ [[nodiscard]] bool acceptOne(); /** * For debugging! */ std::vector<sp<RpcSession>> listSessions(); size_t numUninitializedSessions(); ~RpcServer(); Loading Loading
libs/binder/RpcServer.cpp +40 −21 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ #include <thread> #include <vector> #include <android-base/scopeguard.h> #include <binder/Parcel.h> #include <binder/RpcServer.h> #include <log/log.h> Loading @@ -32,6 +33,7 @@ namespace android { using base::ScopeGuard; using base::unique_fd; RpcServer::RpcServer() {} Loading Loading @@ -125,19 +127,21 @@ sp<IBinder> RpcServer::getRootObject() { } void RpcServer::join() { while (true) { (void)acceptOne(); } } bool RpcServer::acceptOne() { LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); { std::lock_guard<std::mutex> _l(mLock); LOG_ALWAYS_FATAL_IF(mServer.get() == -1, "RpcServer must be setup to join."); } while (true) { unique_fd clientFd(TEMP_FAILURE_RETRY( accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC))); unique_fd clientFd( TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC))); if (clientFd < 0) { ALOGE("Could not accept4 socket: %s", strerror(errno)); continue; return false; } LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); Loading @@ -148,7 +152,8 @@ void RpcServer::join() { std::move(sp<RpcServer>::fromExisting(this)), std::move(clientFd)); mConnectingThreads[thread.get_id()] = std::move(thread); } } return true; } std::vector<sp<RpcSession>> RpcServer::listSessions() { Loading @@ -161,15 +166,21 @@ std::vector<sp<RpcSession>> RpcServer::listSessions() { return sessions; } size_t RpcServer::numUninitializedSessions() { std::lock_guard<std::mutex> _l(mLock); return mConnectingThreads.size(); } void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) { LOG_ALWAYS_FATAL_IF(this != server.get(), "Must pass same ownership object"); // TODO(b/183988761): cannot trust this simple ID LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); bool idValid = true; int32_t id; if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) { ALOGE("Could not read ID from fd %d", clientFd.get()); return; idValid = false; } std::thread thisThread; Loading @@ -181,8 +192,13 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie LOG_ALWAYS_FATAL_IF(threadId == mConnectingThreads.end(), "Must establish connection on owned thread"); thisThread = std::move(threadId->second); ScopeGuard detachGuard = [&]() { thisThread.detach(); }; mConnectingThreads.erase(threadId); if (!idValid) { return; } if (id == RPC_SESSION_ID_NEW) { LOG_ALWAYS_FATAL_IF(mSessionIdCounter >= INT32_MAX, "Out of session IDs"); mSessionIdCounter++; Loading @@ -199,6 +215,9 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie } session = it->second; } detachGuard.Disable(); session->preJoin(std::move(thisThread)); } // avoid strong cycle Loading @@ -208,7 +227,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie // DO NOT ACCESS MEMBER VARIABLES BELOW // session->join(std::move(thisThread), std::move(clientFd)); session->join(std::move(clientFd)); } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { Loading
libs/binder/RpcSession.cpp +3 −1 Original line number Diff line number Diff line Loading @@ -131,14 +131,16 @@ status_t RpcSession::readId() { return OK; } void RpcSession::join(std::thread thread, unique_fd client) { void RpcSession::preJoin(std::thread thread) { LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); { std::lock_guard<std::mutex> _l(mMutex); mThreads[thread.get_id()] = std::move(thread); } } void RpcSession::join(unique_fd client) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) sp<RpcConnection> connection = assignServerToThisThread(std::move(client)); Loading
libs/binder/RpcState.cpp +28 −7 Original line number Diff line number Diff line Loading @@ -182,6 +182,27 @@ void RpcState::terminate() { } } RpcState::CommandData::CommandData(size_t size) : mSize(size) { // The maximum size for regular binder is 1MB for all concurrent // transactions. A very small proportion of transactions are even // larger than a page, but we need to avoid allocating too much // data on behalf of an arbitrary client, or we could risk being in // a position where a single additional allocation could run out of // memory. // // Note, this limit may not reflect the total amount of data allocated for a // transaction (in some cases, additional fixed size amounts are added), // though for rough consistency, we should avoid cases where this data type // is used for multiple dynamic allocations for a single transaction. constexpr size_t kMaxTransactionAllocation = 100 * 1000; if (size == 0) return; if (size > kMaxTransactionAllocation) { ALOGW("Transaction requested too much data allocation %zu", size); return; } mData.reset(new (std::nothrow) uint8_t[size]); } bool RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data, size_t size) { LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str()); Loading Loading @@ -326,7 +347,7 @@ status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address .asyncNumber = asyncNumber, }; ByteVec transactionData(sizeof(RpcWireTransaction) + data.dataSize()); CommandData transactionData(sizeof(RpcWireTransaction) + data.dataSize()); if (!transactionData.valid()) { return NO_MEMORY; } Loading Loading @@ -383,7 +404,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& if (status != OK) return status; } ByteVec data(command.bodySize); CommandData data(command.bodySize); if (!data.valid()) { return NO_MEMORY; } Loading Loading @@ -469,7 +490,7 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSessio const RpcWireHeader& command) { LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_TRANSACT, "command: %d", command.command); ByteVec transactionData(command.bodySize); CommandData transactionData(command.bodySize); if (!transactionData.valid()) { return NO_MEMORY; } Loading @@ -490,7 +511,7 @@ static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t d } status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session, ByteVec transactionData) { CommandData transactionData) { if (transactionData.size() < sizeof(RpcWireTransaction)) { ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!", sizeof(RpcWireTransaction), transactionData.size()); Loading Loading @@ -640,7 +661,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R // justification for const_cast (consider avoiding priority_queue): // - AsyncTodo operator< doesn't depend on 'data' object // - gotta go fast ByteVec data = std::move( CommandData data = std::move( const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top()).data); it->second.asyncTodo.pop(); _l.unlock(); Loading @@ -654,7 +675,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R .status = replyStatus, }; ByteVec replyData(sizeof(RpcWireReply) + reply.dataSize()); CommandData replyData(sizeof(RpcWireReply) + reply.dataSize()); if (!replyData.valid()) { return NO_MEMORY; } Loading Loading @@ -684,7 +705,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R status_t RpcState::processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command) { LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command); ByteVec commandData(command.bodySize); CommandData commandData(command.bodySize); if (!commandData.valid()) { return NO_MEMORY; } Loading
libs/binder/RpcState.h +6 −6 Original line number Diff line number Diff line Loading @@ -101,10 +101,10 @@ private: */ void terminate(); // alternative to std::vector<uint8_t> that doesn't abort on too big of allocations struct ByteVec { explicit ByteVec(size_t size) : mData(size > 0 ? new (std::nothrow) uint8_t[size] : nullptr), mSize(size) {} // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps // large allocations to avoid being requested from allocating too much data. struct CommandData { explicit CommandData(size_t size); bool valid() { return mSize == 0 || mData != nullptr; } size_t size() { return mSize; } uint8_t* data() { return mData.get(); } Loading @@ -128,7 +128,7 @@ private: const RpcWireHeader& command); [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session, ByteVec transactionData); CommandData transactionData); [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command); Loading Loading @@ -163,7 +163,7 @@ private: // async transaction queue, _only_ for local binder struct AsyncTodo { ByteVec data; CommandData data; uint64_t asyncNumber = 0; bool operator<(const AsyncTodo& o) const { Loading
libs/binder/include/binder/RpcServer.h +7 −0 Original line number Diff line number Diff line Loading @@ -107,10 +107,17 @@ public: */ void join(); /** * Accept one connection on this server. You must have at least one client * session before calling this. */ [[nodiscard]] bool acceptOne(); /** * For debugging! */ std::vector<sp<RpcSession>> listSessions(); size_t numUninitializedSessions(); ~RpcServer(); Loading