Loading libs/binder/RpcServer.cpp +6 −4 Original line number Diff line number Diff line Loading @@ -289,7 +289,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie RpcConnectionHeader header; if (status == OK) { iovec iov{&header, sizeof(header)}; status = client->interruptableReadFully(server->mShutdownTrigger.get(), &iov, 1, {}); status = client->interruptableReadFully(server->mShutdownTrigger.get(), &iov, 1, std::nullopt); if (status != OK) { ALOGE("Failed to read ID for client connecting to RPC server: %s", statusToString(status).c_str()); Loading @@ -303,8 +304,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie if (header.sessionIdSize == kSessionIdBytes) { sessionId.resize(header.sessionIdSize); iovec iov{sessionId.data(), sessionId.size()}; status = client->interruptableReadFully(server->mShutdownTrigger.get(), &iov, 1, {}); status = client->interruptableReadFully(server->mShutdownTrigger.get(), &iov, 1, std::nullopt); if (status != OK) { ALOGE("Failed to read session ID for client connecting to RPC server: %s", statusToString(status).c_str()); Loading Loading @@ -334,7 +335,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie }; iovec iov{&response, sizeof(response)}; status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &iov, 1, {}); status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &iov, 1, std::nullopt); if (status != OK) { ALOGE("Failed to send new session response: %s", statusToString(status).c_str()); // still need to cleanup before we can return Loading libs/binder/RpcSession.cpp +3 −3 Original line number Diff line number Diff line Loading @@ -615,7 +615,7 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ iovec headerIov{&header, sizeof(header)}; auto sendHeaderStatus = server->interruptableWriteFully(mShutdownTrigger.get(), &headerIov, 1, {}); server->interruptableWriteFully(mShutdownTrigger.get(), &headerIov, 1, std::nullopt); if (sendHeaderStatus != OK) { ALOGE("Could not write connection header to socket: %s", statusToString(sendHeaderStatus).c_str()); Loading @@ -625,8 +625,8 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ if (sessionId.size() > 0) { iovec sessionIov{const_cast<void*>(static_cast<const void*>(sessionId.data())), sessionId.size()}; auto sendSessionIdStatus = server->interruptableWriteFully(mShutdownTrigger.get(), &sessionIov, 1, {}); auto sendSessionIdStatus = server->interruptableWriteFully(mShutdownTrigger.get(), &sessionIov, 1, std::nullopt); if (sendSessionIdStatus != OK) { ALOGE("Could not write session ID ('%s') to socket: %s", base::HexString(sessionId.data(), sessionId.size()).c_str(), Loading libs/binder/RpcState.cpp +25 −24 Original line number Diff line number Diff line Loading @@ -311,7 +311,7 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) { status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, iovec* iovs, int niovs, const std::function<status_t()>& altPoll) { const std::optional<android::base::function_ref<status_t()>>& altPoll) { for (int i = 0; i < niovs; i++) { LOG_RPC_DETAIL("Sending %s (part %d of %d) on RpcTransport %p: %s", what, i + 1, niovs, connection->rpcTransport.get(), Loading @@ -335,7 +335,7 @@ status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, iovec* iovs, int niovs) { if (status_t status = connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(), iovs, niovs, {}); iovs, niovs, std::nullopt); status != OK) { LOG_RPC_DETAIL("Failed to read %s (%d iovs) on RpcTransport %p, error: %s", what, niovs, connection->rpcTransport.get(), statusToString(status).c_str()); Loading Loading @@ -369,7 +369,7 @@ status_t RpcState::sendConnectionInit(const sp<RpcSession::RpcConnection>& conne .msg = RPC_CONNECTION_INIT_OKAY, }; iovec iov{&init, sizeof(init)}; return rpcSend(connection, session, "connection init", &iov, 1); return rpcSend(connection, session, "connection init", &iov, 1, std::nullopt); } status_t RpcState::readConnectionInit(const sp<RpcSession::RpcConnection>& connection, Loading Loading @@ -515,10 +515,18 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti // Oneway calls have no sync point, so if many are sent before, whether this // is a twoway or oneway transaction, they may have filled up the socket. // So, make sure we drain them before polling. std::function<status_t()> drainRefs = [&] { // So, make sure we drain them before polling iovec iovs[]{ {&command, sizeof(RpcWireHeader)}, {&transaction, sizeof(RpcWireTransaction)}, {const_cast<uint8_t*>(data.data()), data.dataSize()}, }; if (status_t status = rpcSend(connection, session, "transaction", iovs, arraysize(iovs), [&] { if (waitUs > kWaitLogUs) { ALOGE("Cannot send command, trying to process pending refcounts. Waiting %zuus. Too " ALOGE("Cannot send command, trying to process pending " "refcounts. Waiting %zuus. Too " "many oneway calls?", waitUs); } Loading @@ -530,16 +538,9 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti waitUs = 1; } return drainCommands(connection, session, CommandType::CONTROL_ONLY); }; iovec iovs[]{ {&command, sizeof(RpcWireHeader)}, {&transaction, sizeof(RpcWireTransaction)}, {const_cast<uint8_t*>(data.data()), data.dataSize()}, }; if (status_t status = rpcSend(connection, session, "transaction", iovs, arraysize(iovs), drainRefs); return drainCommands(connection, session, CommandType::CONTROL_ONLY); }); status != OK) { // TODO(b/167966510): need to undo onBinderLeaving - we know the // refcount isn't successfully transferred. Loading Loading @@ -640,7 +641,7 @@ status_t RpcState::sendDecStrongToTarget(const sp<RpcSession::RpcConnection>& co .bodySize = sizeof(RpcDecStrong), }; iovec iovs[]{{&cmd, sizeof(cmd)}, {&body, sizeof(body)}}; return rpcSend(connection, session, "dec ref", iovs, arraysize(iovs)); return rpcSend(connection, session, "dec ref", iovs, arraysize(iovs), std::nullopt); } status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection, Loading Loading @@ -951,7 +952,7 @@ processTransactInternalTailCall: {&rpcReply, sizeof(RpcWireReply)}, {const_cast<uint8_t*>(reply.data()), reply.dataSize()}, }; return rpcSend(connection, session, "reply", iovs, arraysize(iovs)); return rpcSend(connection, session, "reply", iovs, arraysize(iovs), std::nullopt); } status_t RpcState::processDecStrong(const sp<RpcSession::RpcConnection>& connection, Loading libs/binder/RpcState.h +4 −3 Original line number Diff line number Diff line Loading @@ -178,9 +178,10 @@ private: size_t mSize; }; [[nodiscard]] status_t rpcSend(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, iovec* iovs, int niovs, const std::function<status_t()>& altPoll = nullptr); [[nodiscard]] status_t rpcSend( const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, iovec* iovs, int niovs, const std::optional<android::base::function_ref<status_t()>>& altPoll); [[nodiscard]] status_t rpcRec(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, iovec* iovs, int niovs); Loading libs/binder/RpcTransportRaw.cpp +11 −8 Original line number Diff line number Diff line Loading @@ -52,9 +52,10 @@ public: } template <typename SendOrReceive> status_t interruptableReadOrWrite(FdTrigger* fdTrigger, iovec* iovs, int niovs, SendOrReceive sendOrReceiveFun, const char* funName, int16_t event, const std::function<status_t()>& altPoll) { status_t interruptableReadOrWrite( FdTrigger* fdTrigger, iovec* iovs, int niovs, SendOrReceive sendOrReceiveFun, const char* funName, int16_t event, const std::optional<android::base::function_ref<status_t()>>& altPoll) { MAYBE_WAIT_IN_FLAKE_MODE; if (niovs < 0) { Loading Loading @@ -129,7 +130,7 @@ public: } if (altPoll) { if (status_t status = altPoll(); status != OK) return status; if (status_t status = (*altPoll)(); status != OK) return status; if (fdTrigger->isTriggered()) { return DEAD_OBJECT; } Loading @@ -142,14 +143,16 @@ public: } } status_t interruptableWriteFully(FdTrigger* fdTrigger, iovec* iovs, int niovs, const std::function<status_t()>& altPoll) override { status_t interruptableWriteFully( FdTrigger* fdTrigger, iovec* iovs, int niovs, const std::optional<android::base::function_ref<status_t()>>& altPoll) override { return interruptableReadOrWrite(fdTrigger, iovs, niovs, sendmsg, "sendmsg", POLLOUT, altPoll); } status_t interruptableReadFully(FdTrigger* fdTrigger, iovec* iovs, int niovs, const std::function<status_t()>& altPoll) override { status_t interruptableReadFully( FdTrigger* fdTrigger, iovec* iovs, int niovs, const std::optional<android::base::function_ref<status_t()>>& altPoll) override { return interruptableReadOrWrite(fdTrigger, iovs, niovs, recvmsg, "recvmsg", POLLIN, altPoll); } Loading Loading
libs/binder/RpcServer.cpp +6 −4 Original line number Diff line number Diff line Loading @@ -289,7 +289,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie RpcConnectionHeader header; if (status == OK) { iovec iov{&header, sizeof(header)}; status = client->interruptableReadFully(server->mShutdownTrigger.get(), &iov, 1, {}); status = client->interruptableReadFully(server->mShutdownTrigger.get(), &iov, 1, std::nullopt); if (status != OK) { ALOGE("Failed to read ID for client connecting to RPC server: %s", statusToString(status).c_str()); Loading @@ -303,8 +304,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie if (header.sessionIdSize == kSessionIdBytes) { sessionId.resize(header.sessionIdSize); iovec iov{sessionId.data(), sessionId.size()}; status = client->interruptableReadFully(server->mShutdownTrigger.get(), &iov, 1, {}); status = client->interruptableReadFully(server->mShutdownTrigger.get(), &iov, 1, std::nullopt); if (status != OK) { ALOGE("Failed to read session ID for client connecting to RPC server: %s", statusToString(status).c_str()); Loading Loading @@ -334,7 +335,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie }; iovec iov{&response, sizeof(response)}; status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &iov, 1, {}); status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &iov, 1, std::nullopt); if (status != OK) { ALOGE("Failed to send new session response: %s", statusToString(status).c_str()); // still need to cleanup before we can return Loading
libs/binder/RpcSession.cpp +3 −3 Original line number Diff line number Diff line Loading @@ -615,7 +615,7 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ iovec headerIov{&header, sizeof(header)}; auto sendHeaderStatus = server->interruptableWriteFully(mShutdownTrigger.get(), &headerIov, 1, {}); server->interruptableWriteFully(mShutdownTrigger.get(), &headerIov, 1, std::nullopt); if (sendHeaderStatus != OK) { ALOGE("Could not write connection header to socket: %s", statusToString(sendHeaderStatus).c_str()); Loading @@ -625,8 +625,8 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ if (sessionId.size() > 0) { iovec sessionIov{const_cast<void*>(static_cast<const void*>(sessionId.data())), sessionId.size()}; auto sendSessionIdStatus = server->interruptableWriteFully(mShutdownTrigger.get(), &sessionIov, 1, {}); auto sendSessionIdStatus = server->interruptableWriteFully(mShutdownTrigger.get(), &sessionIov, 1, std::nullopt); if (sendSessionIdStatus != OK) { ALOGE("Could not write session ID ('%s') to socket: %s", base::HexString(sessionId.data(), sessionId.size()).c_str(), Loading
libs/binder/RpcState.cpp +25 −24 Original line number Diff line number Diff line Loading @@ -311,7 +311,7 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) { status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, iovec* iovs, int niovs, const std::function<status_t()>& altPoll) { const std::optional<android::base::function_ref<status_t()>>& altPoll) { for (int i = 0; i < niovs; i++) { LOG_RPC_DETAIL("Sending %s (part %d of %d) on RpcTransport %p: %s", what, i + 1, niovs, connection->rpcTransport.get(), Loading @@ -335,7 +335,7 @@ status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, iovec* iovs, int niovs) { if (status_t status = connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(), iovs, niovs, {}); iovs, niovs, std::nullopt); status != OK) { LOG_RPC_DETAIL("Failed to read %s (%d iovs) on RpcTransport %p, error: %s", what, niovs, connection->rpcTransport.get(), statusToString(status).c_str()); Loading Loading @@ -369,7 +369,7 @@ status_t RpcState::sendConnectionInit(const sp<RpcSession::RpcConnection>& conne .msg = RPC_CONNECTION_INIT_OKAY, }; iovec iov{&init, sizeof(init)}; return rpcSend(connection, session, "connection init", &iov, 1); return rpcSend(connection, session, "connection init", &iov, 1, std::nullopt); } status_t RpcState::readConnectionInit(const sp<RpcSession::RpcConnection>& connection, Loading Loading @@ -515,10 +515,18 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti // Oneway calls have no sync point, so if many are sent before, whether this // is a twoway or oneway transaction, they may have filled up the socket. // So, make sure we drain them before polling. std::function<status_t()> drainRefs = [&] { // So, make sure we drain them before polling iovec iovs[]{ {&command, sizeof(RpcWireHeader)}, {&transaction, sizeof(RpcWireTransaction)}, {const_cast<uint8_t*>(data.data()), data.dataSize()}, }; if (status_t status = rpcSend(connection, session, "transaction", iovs, arraysize(iovs), [&] { if (waitUs > kWaitLogUs) { ALOGE("Cannot send command, trying to process pending refcounts. Waiting %zuus. Too " ALOGE("Cannot send command, trying to process pending " "refcounts. Waiting %zuus. Too " "many oneway calls?", waitUs); } Loading @@ -530,16 +538,9 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti waitUs = 1; } return drainCommands(connection, session, CommandType::CONTROL_ONLY); }; iovec iovs[]{ {&command, sizeof(RpcWireHeader)}, {&transaction, sizeof(RpcWireTransaction)}, {const_cast<uint8_t*>(data.data()), data.dataSize()}, }; if (status_t status = rpcSend(connection, session, "transaction", iovs, arraysize(iovs), drainRefs); return drainCommands(connection, session, CommandType::CONTROL_ONLY); }); status != OK) { // TODO(b/167966510): need to undo onBinderLeaving - we know the // refcount isn't successfully transferred. Loading Loading @@ -640,7 +641,7 @@ status_t RpcState::sendDecStrongToTarget(const sp<RpcSession::RpcConnection>& co .bodySize = sizeof(RpcDecStrong), }; iovec iovs[]{{&cmd, sizeof(cmd)}, {&body, sizeof(body)}}; return rpcSend(connection, session, "dec ref", iovs, arraysize(iovs)); return rpcSend(connection, session, "dec ref", iovs, arraysize(iovs), std::nullopt); } status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection, Loading Loading @@ -951,7 +952,7 @@ processTransactInternalTailCall: {&rpcReply, sizeof(RpcWireReply)}, {const_cast<uint8_t*>(reply.data()), reply.dataSize()}, }; return rpcSend(connection, session, "reply", iovs, arraysize(iovs)); return rpcSend(connection, session, "reply", iovs, arraysize(iovs), std::nullopt); } status_t RpcState::processDecStrong(const sp<RpcSession::RpcConnection>& connection, Loading
libs/binder/RpcState.h +4 −3 Original line number Diff line number Diff line Loading @@ -178,9 +178,10 @@ private: size_t mSize; }; [[nodiscard]] status_t rpcSend(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, iovec* iovs, int niovs, const std::function<status_t()>& altPoll = nullptr); [[nodiscard]] status_t rpcSend( const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, iovec* iovs, int niovs, const std::optional<android::base::function_ref<status_t()>>& altPoll); [[nodiscard]] status_t rpcRec(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, iovec* iovs, int niovs); Loading
libs/binder/RpcTransportRaw.cpp +11 −8 Original line number Diff line number Diff line Loading @@ -52,9 +52,10 @@ public: } template <typename SendOrReceive> status_t interruptableReadOrWrite(FdTrigger* fdTrigger, iovec* iovs, int niovs, SendOrReceive sendOrReceiveFun, const char* funName, int16_t event, const std::function<status_t()>& altPoll) { status_t interruptableReadOrWrite( FdTrigger* fdTrigger, iovec* iovs, int niovs, SendOrReceive sendOrReceiveFun, const char* funName, int16_t event, const std::optional<android::base::function_ref<status_t()>>& altPoll) { MAYBE_WAIT_IN_FLAKE_MODE; if (niovs < 0) { Loading Loading @@ -129,7 +130,7 @@ public: } if (altPoll) { if (status_t status = altPoll(); status != OK) return status; if (status_t status = (*altPoll)(); status != OK) return status; if (fdTrigger->isTriggered()) { return DEAD_OBJECT; } Loading @@ -142,14 +143,16 @@ public: } } status_t interruptableWriteFully(FdTrigger* fdTrigger, iovec* iovs, int niovs, const std::function<status_t()>& altPoll) override { status_t interruptableWriteFully( FdTrigger* fdTrigger, iovec* iovs, int niovs, const std::optional<android::base::function_ref<status_t()>>& altPoll) override { return interruptableReadOrWrite(fdTrigger, iovs, niovs, sendmsg, "sendmsg", POLLOUT, altPoll); } status_t interruptableReadFully(FdTrigger* fdTrigger, iovec* iovs, int niovs, const std::function<status_t()>& altPoll) override { status_t interruptableReadFully( FdTrigger* fdTrigger, iovec* iovs, int niovs, const std::optional<android::base::function_ref<status_t()>>& altPoll) override { return interruptableReadOrWrite(fdTrigger, iovs, niovs, recvmsg, "recvmsg", POLLIN, altPoll); } Loading