Loading libs/binder/RpcServer.cpp +12 −0 Original line number Diff line number Diff line Loading @@ -216,4 +216,16 @@ bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { return true; } void RpcServer::onSessionTerminating(const sp<RpcSession>& session) { auto id = session->mId; LOG_ALWAYS_FATAL_IF(id == std::nullopt, "Server sessions must be initialized with ID"); LOG_RPC_DETAIL("Dropping session %d", *id); std::lock_guard<std::mutex> _l(mLock); auto it = mSessions.find(*id); LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %d", *id); LOG_ALWAYS_FATAL_IF(it->second != session, "Bad state, session has id mismatch %d", *id); (void)mSessions.erase(it); } } // namespace android libs/binder/RpcSession.cpp +24 −2 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ #include <string_view> #include <binder/Parcel.h> #include <binder/RpcServer.h> #include <binder/Stability.h> #include <utils/String8.h> Loading Loading @@ -142,8 +143,10 @@ void RpcSession::startThread(unique_fd client) { holdThis->join(unique_fd(fd)); { std::lock_guard<std::mutex> _l(holdThis->mMutex); size_t erased = mThreads.erase(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(erased != 0, "Could not erase thread."); auto it = mThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == mThreads.end()); it->second.detach(); mThreads.erase(it); } }); mThreads[thread.get_id()] = std::move(thread); Loading @@ -168,6 +171,22 @@ void RpcSession::join(unique_fd client) { "bad state: connection object guaranteed to be in list"); } void RpcSession::terminateLocked() { // TODO(b/185167543): // - kindly notify other side of the connection of termination (can't be // locked) // - prevent new client/servers from being added // - stop all threads which are currently reading/writing // - terminate RpcState? if (mTerminated) return; sp<RpcServer> server = mForServer.promote(); if (server) { server->onSessionTerminating(sp<RpcSession>::fromExisting(this)); } } wp<RpcServer> RpcSession::server() { return mForServer; } Loading Loading @@ -264,6 +283,9 @@ bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) { std::lock_guard<std::mutex> _l(mMutex); if (auto it = std::find(mServers.begin(), mServers.end(), connection); it != mServers.end()) { mServers.erase(it); if (mServers.size() == 0) { terminateLocked(); } return true; } return false; Loading libs/binder/RpcState.cpp +26 −11 Original line number Diff line number Diff line Loading @@ -326,7 +326,11 @@ status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address .asyncNumber = asyncNumber, }; std::vector<uint8_t> transactionData(sizeof(RpcWireTransaction) + data.dataSize()); ByteVec transactionData(sizeof(RpcWireTransaction) + data.dataSize()); if (!transactionData.valid()) { return NO_MEMORY; } memcpy(transactionData.data() + 0, &transaction, sizeof(RpcWireTransaction)); memcpy(transactionData.data() + sizeof(RpcWireTransaction), data.data(), data.dataSize()); Loading Loading @@ -379,9 +383,12 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& if (status != OK) return status; } uint8_t* data = new uint8_t[command.bodySize]; ByteVec data(command.bodySize); if (!data.valid()) { return NO_MEMORY; } if (!rpcRec(fd, "reply body", data, command.bodySize)) { if (!rpcRec(fd, "reply body", data.data(), command.bodySize)) { return DEAD_OBJECT; } Loading @@ -391,9 +398,10 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& terminate(); return BAD_VALUE; } RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data); RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data.data()); if (rpcReply->status != OK) return rpcReply->status; data.release(); reply->ipcSetDataReference(rpcReply->data, command.bodySize - offsetof(RpcWireReply, data), nullptr, 0, cleanup_reply_data); Loading Loading @@ -461,7 +469,10 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSessio const RpcWireHeader& command) { LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_TRANSACT, "command: %d", command.command); std::vector<uint8_t> transactionData(command.bodySize); ByteVec transactionData(command.bodySize); if (!transactionData.valid()) { return NO_MEMORY; } if (!rpcRec(fd, "transaction body", transactionData.data(), transactionData.size())) { return DEAD_OBJECT; } Loading @@ -479,7 +490,7 @@ static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t d } status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session, std::vector<uint8_t>&& transactionData) { ByteVec transactionData) { if (transactionData.size() < sizeof(RpcWireTransaction)) { ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!", sizeof(RpcWireTransaction), transactionData.size()); Loading @@ -500,7 +511,6 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R auto it = mNodeForAddress.find(addr); if (it == mNodeForAddress.end()) { ALOGE("Unknown binder address %s.", addr.toString().c_str()); dump(); replyStatus = BAD_VALUE; } else { target = it->second.binder.promote(); Loading Loading @@ -630,7 +640,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R // justification for const_cast (consider avoiding priority_queue): // - AsyncTodo operator< doesn't depend on 'data' object // - gotta go fast std::vector<uint8_t> data = std::move( ByteVec data = std::move( const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top()).data); it->second.asyncTodo.pop(); _l.unlock(); Loading @@ -644,7 +654,10 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R .status = replyStatus, }; std::vector<uint8_t> replyData(sizeof(RpcWireReply) + reply.dataSize()); ByteVec replyData(sizeof(RpcWireReply) + reply.dataSize()); if (!replyData.valid()) { return NO_MEMORY; } memcpy(replyData.data() + 0, &rpcReply, sizeof(RpcWireReply)); memcpy(replyData.data() + sizeof(RpcWireReply), reply.data(), reply.dataSize()); Loading @@ -671,7 +684,10 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R status_t RpcState::processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command) { LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command); std::vector<uint8_t> commandData(command.bodySize); ByteVec commandData(command.bodySize); if (!commandData.valid()) { return NO_MEMORY; } if (!rpcRec(fd, "dec ref body", commandData.data(), commandData.size())) { return DEAD_OBJECT; } Loading @@ -690,7 +706,6 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const RpcWireHead auto it = mNodeForAddress.find(addr); if (it == mNodeForAddress.end()) { ALOGE("Unknown binder address %s for dec strong.", addr.toString().c_str()); dump(); return OK; } Loading libs/binder/RpcState.h +17 −2 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <binder/RpcSession.h> #include <map> #include <optional> #include <queue> namespace android { Loading Loading @@ -100,6 +101,20 @@ private: */ void terminate(); // alternative to std::vector<uint8_t> that doesn't abort on too big of allocations struct ByteVec { explicit ByteVec(size_t size) : mData(size > 0 ? new (std::nothrow) uint8_t[size] : nullptr), mSize(size) {} bool valid() { return mSize == 0 || mData != nullptr; } size_t size() { return mSize; } uint8_t* data() { return mData.get(); } uint8_t* release() { return mData.release(); } private: std::unique_ptr<uint8_t[]> mData; size_t mSize; }; [[nodiscard]] bool rpcSend(const base::unique_fd& fd, const char* what, const void* data, size_t size); [[nodiscard]] bool rpcRec(const base::unique_fd& fd, const char* what, void* data, size_t size); Loading @@ -113,7 +128,7 @@ private: const RpcWireHeader& command); [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session, std::vector<uint8_t>&& transactionData); ByteVec transactionData); [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command); Loading Loading @@ -148,7 +163,7 @@ private: // async transaction queue, _only_ for local binder struct AsyncTodo { std::vector<uint8_t> data; // most convenient format, to move it here ByteVec data; uint64_t asyncNumber = 0; bool operator<(const AsyncTodo& o) const { Loading libs/binder/include/binder/RpcServer.h +4 −0 Original line number Diff line number Diff line Loading @@ -109,6 +109,10 @@ public: ~RpcServer(); // internal use only void onSessionTerminating(const sp<RpcSession>& session); private: friend sp<RpcServer>; RpcServer(); Loading Loading
libs/binder/RpcServer.cpp +12 −0 Original line number Diff line number Diff line Loading @@ -216,4 +216,16 @@ bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { return true; } void RpcServer::onSessionTerminating(const sp<RpcSession>& session) { auto id = session->mId; LOG_ALWAYS_FATAL_IF(id == std::nullopt, "Server sessions must be initialized with ID"); LOG_RPC_DETAIL("Dropping session %d", *id); std::lock_guard<std::mutex> _l(mLock); auto it = mSessions.find(*id); LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %d", *id); LOG_ALWAYS_FATAL_IF(it->second != session, "Bad state, session has id mismatch %d", *id); (void)mSessions.erase(it); } } // namespace android
libs/binder/RpcSession.cpp +24 −2 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ #include <string_view> #include <binder/Parcel.h> #include <binder/RpcServer.h> #include <binder/Stability.h> #include <utils/String8.h> Loading Loading @@ -142,8 +143,10 @@ void RpcSession::startThread(unique_fd client) { holdThis->join(unique_fd(fd)); { std::lock_guard<std::mutex> _l(holdThis->mMutex); size_t erased = mThreads.erase(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(erased != 0, "Could not erase thread."); auto it = mThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == mThreads.end()); it->second.detach(); mThreads.erase(it); } }); mThreads[thread.get_id()] = std::move(thread); Loading @@ -168,6 +171,22 @@ void RpcSession::join(unique_fd client) { "bad state: connection object guaranteed to be in list"); } void RpcSession::terminateLocked() { // TODO(b/185167543): // - kindly notify other side of the connection of termination (can't be // locked) // - prevent new client/servers from being added // - stop all threads which are currently reading/writing // - terminate RpcState? if (mTerminated) return; sp<RpcServer> server = mForServer.promote(); if (server) { server->onSessionTerminating(sp<RpcSession>::fromExisting(this)); } } wp<RpcServer> RpcSession::server() { return mForServer; } Loading Loading @@ -264,6 +283,9 @@ bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) { std::lock_guard<std::mutex> _l(mMutex); if (auto it = std::find(mServers.begin(), mServers.end(), connection); it != mServers.end()) { mServers.erase(it); if (mServers.size() == 0) { terminateLocked(); } return true; } return false; Loading
libs/binder/RpcState.cpp +26 −11 Original line number Diff line number Diff line Loading @@ -326,7 +326,11 @@ status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address .asyncNumber = asyncNumber, }; std::vector<uint8_t> transactionData(sizeof(RpcWireTransaction) + data.dataSize()); ByteVec transactionData(sizeof(RpcWireTransaction) + data.dataSize()); if (!transactionData.valid()) { return NO_MEMORY; } memcpy(transactionData.data() + 0, &transaction, sizeof(RpcWireTransaction)); memcpy(transactionData.data() + sizeof(RpcWireTransaction), data.data(), data.dataSize()); Loading Loading @@ -379,9 +383,12 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& if (status != OK) return status; } uint8_t* data = new uint8_t[command.bodySize]; ByteVec data(command.bodySize); if (!data.valid()) { return NO_MEMORY; } if (!rpcRec(fd, "reply body", data, command.bodySize)) { if (!rpcRec(fd, "reply body", data.data(), command.bodySize)) { return DEAD_OBJECT; } Loading @@ -391,9 +398,10 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& terminate(); return BAD_VALUE; } RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data); RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data.data()); if (rpcReply->status != OK) return rpcReply->status; data.release(); reply->ipcSetDataReference(rpcReply->data, command.bodySize - offsetof(RpcWireReply, data), nullptr, 0, cleanup_reply_data); Loading Loading @@ -461,7 +469,10 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSessio const RpcWireHeader& command) { LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_TRANSACT, "command: %d", command.command); std::vector<uint8_t> transactionData(command.bodySize); ByteVec transactionData(command.bodySize); if (!transactionData.valid()) { return NO_MEMORY; } if (!rpcRec(fd, "transaction body", transactionData.data(), transactionData.size())) { return DEAD_OBJECT; } Loading @@ -479,7 +490,7 @@ static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t d } status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session, std::vector<uint8_t>&& transactionData) { ByteVec transactionData) { if (transactionData.size() < sizeof(RpcWireTransaction)) { ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!", sizeof(RpcWireTransaction), transactionData.size()); Loading @@ -500,7 +511,6 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R auto it = mNodeForAddress.find(addr); if (it == mNodeForAddress.end()) { ALOGE("Unknown binder address %s.", addr.toString().c_str()); dump(); replyStatus = BAD_VALUE; } else { target = it->second.binder.promote(); Loading Loading @@ -630,7 +640,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R // justification for const_cast (consider avoiding priority_queue): // - AsyncTodo operator< doesn't depend on 'data' object // - gotta go fast std::vector<uint8_t> data = std::move( ByteVec data = std::move( const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top()).data); it->second.asyncTodo.pop(); _l.unlock(); Loading @@ -644,7 +654,10 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R .status = replyStatus, }; std::vector<uint8_t> replyData(sizeof(RpcWireReply) + reply.dataSize()); ByteVec replyData(sizeof(RpcWireReply) + reply.dataSize()); if (!replyData.valid()) { return NO_MEMORY; } memcpy(replyData.data() + 0, &rpcReply, sizeof(RpcWireReply)); memcpy(replyData.data() + sizeof(RpcWireReply), reply.data(), reply.dataSize()); Loading @@ -671,7 +684,10 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R status_t RpcState::processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command) { LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command); std::vector<uint8_t> commandData(command.bodySize); ByteVec commandData(command.bodySize); if (!commandData.valid()) { return NO_MEMORY; } if (!rpcRec(fd, "dec ref body", commandData.data(), commandData.size())) { return DEAD_OBJECT; } Loading @@ -690,7 +706,6 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const RpcWireHead auto it = mNodeForAddress.find(addr); if (it == mNodeForAddress.end()) { ALOGE("Unknown binder address %s for dec strong.", addr.toString().c_str()); dump(); return OK; } Loading
libs/binder/RpcState.h +17 −2 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <binder/RpcSession.h> #include <map> #include <optional> #include <queue> namespace android { Loading Loading @@ -100,6 +101,20 @@ private: */ void terminate(); // alternative to std::vector<uint8_t> that doesn't abort on too big of allocations struct ByteVec { explicit ByteVec(size_t size) : mData(size > 0 ? new (std::nothrow) uint8_t[size] : nullptr), mSize(size) {} bool valid() { return mSize == 0 || mData != nullptr; } size_t size() { return mSize; } uint8_t* data() { return mData.get(); } uint8_t* release() { return mData.release(); } private: std::unique_ptr<uint8_t[]> mData; size_t mSize; }; [[nodiscard]] bool rpcSend(const base::unique_fd& fd, const char* what, const void* data, size_t size); [[nodiscard]] bool rpcRec(const base::unique_fd& fd, const char* what, void* data, size_t size); Loading @@ -113,7 +128,7 @@ private: const RpcWireHeader& command); [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session, std::vector<uint8_t>&& transactionData); ByteVec transactionData); [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command); Loading Loading @@ -148,7 +163,7 @@ private: // async transaction queue, _only_ for local binder struct AsyncTodo { std::vector<uint8_t> data; // most convenient format, to move it here ByteVec data; uint64_t asyncNumber = 0; bool operator<(const AsyncTodo& o) const { Loading
libs/binder/include/binder/RpcServer.h +4 −0 Original line number Diff line number Diff line Loading @@ -109,6 +109,10 @@ public: ~RpcServer(); // internal use only void onSessionTerminating(const sp<RpcSession>& session); private: friend sp<RpcServer>; RpcServer(); Loading