Loading libs/binder/RpcServer.cpp +3 −3 Original line number Diff line number Diff line Loading @@ -278,7 +278,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie RpcConnectionHeader header; if (status == OK) { status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header, sizeof(header)); sizeof(header), {}); if (status != OK) { ALOGE("Failed to read ID for client connecting to RPC server: %s", statusToString(status).c_str()); Loading @@ -291,7 +291,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie if (header.sessionIdSize > 0) { sessionId.resize(header.sessionIdSize); status = client->interruptableReadFully(server->mShutdownTrigger.get(), sessionId.data(), sessionId.size()); sessionId.data(), sessionId.size(), {}); if (status != OK) { ALOGE("Failed to read session ID for client connecting to RPC server: %s", statusToString(status).c_str()); Loading @@ -316,7 +316,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie }; status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response, sizeof(response)); sizeof(response), {}); if (status != OK) { ALOGE("Failed to send new session response: %s", statusToString(status).c_str()); // still need to cleanup before we can return Loading libs/binder/RpcSession.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -560,7 +560,7 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ } auto sendHeaderStatus = server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header)); server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header), {}); if (sendHeaderStatus != OK) { ALOGE("Could not write connection header to socket: %s", statusToString(sendHeaderStatus).c_str()); Loading @@ -570,7 +570,7 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ if (sessionId.size() > 0) { auto sendSessionIdStatus = server->interruptableWriteFully(mShutdownTrigger.get(), sessionId.data(), sessionId.size()); sessionId.size(), {}); if (sendSessionIdStatus != OK) { ALOGE("Could not write session ID ('%s') to socket: %s", base::HexString(sessionId.data(), sessionId.size()).c_str(), Loading libs/binder/RpcState.cpp +35 −14 Original line number Diff line number Diff line Loading @@ -307,7 +307,7 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) { status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, const void* data, size_t size) { size_t size, const std::function<status_t()>& altPoll) { LOG_RPC_DETAIL("Sending %s on RpcTransport %p: %s", what, connection->rpcTransport.get(), android::base::HexString(data, size).c_str()); Loading @@ -319,7 +319,7 @@ status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection, if (status_t status = connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(), data, size); data, size, altPoll); status != OK) { LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size, connection->rpcTransport.get(), statusToString(status).c_str()); Loading @@ -341,7 +341,7 @@ status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection, if (status_t status = connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(), data, size); data, size, {}); status != OK) { LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size, connection->rpcTransport.get(), statusToString(status).c_str()); Loading Loading @@ -523,21 +523,44 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti memcpy(transactionData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireTransaction), data.data(), data.dataSize()); constexpr size_t kWaitMaxUs = 1000000; constexpr size_t kWaitLogUs = 10000; size_t waitUs = 0; // Oneway calls have no sync point, so if many are sent before, whether this // is a twoway or oneway transaction, they may have filled up the socket. // So, make sure we drain them before polling. std::function<status_t()> drainRefs = [&] { if (waitUs > kWaitLogUs) { ALOGE("Cannot send command, trying to process pending refcounts. Waiting %zuus. Too " "many oneway calls?", waitUs); } if (waitUs > 0) { usleep(waitUs); waitUs = std::min(kWaitMaxUs, waitUs * 2); } else { waitUs = 1; } return drainCommands(connection, session, CommandType::CONTROL_ONLY); }; if (status_t status = rpcSend(connection, session, "transaction", transactionData.data(), transactionData.size()); status != OK) transactionData.size(), drainRefs); status != OK) { // TODO(b/167966510): need to undo onBinderLeaving - we know the // refcount isn't successfully transferred. return status; } if (flags & IBinder::FLAG_ONEWAY) { LOG_RPC_DETAIL("Oneway command, so no longer waiting on RpcTransport %p", connection->rpcTransport.get()); // Do not wait on result. // However, too many oneway calls may cause refcounts to build up and fill up the socket, // so process those. return drainCommands(connection, session, CommandType::CONTROL_ONLY); return OK; } LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction."); Loading Loading @@ -723,7 +746,7 @@ status_t RpcState::processTransactInternal(const sp<RpcSession::RpcConnection>& // for 'recursive' calls to this, we have already read and processed the // binder from the transaction data and taken reference counts into account, // so it is cached here. sp<IBinder> targetRef; sp<IBinder> target; processTransactInternalTailCall: if (transactionData.size() < sizeof(RpcWireTransaction)) { Loading @@ -738,12 +761,9 @@ processTransactInternalTailCall: bool oneway = transaction->flags & IBinder::FLAG_ONEWAY; status_t replyStatus = OK; sp<IBinder> target; if (addr != 0) { if (!targetRef) { if (!target) { replyStatus = onBinderEntering(session, addr, &target); } else { target = targetRef; } if (replyStatus != OK) { Loading Loading @@ -910,7 +930,8 @@ processTransactInternalTailCall: // reset up arguments transactionData = std::move(todo.data); targetRef = std::move(todo.ref); LOG_ALWAYS_FATAL_IF(target != todo.ref, "async list should be associated with a binder"); it->second.asyncTodo.pop(); goto processTransactInternalTailCall; Loading libs/binder/RpcState.h +2 −1 Original line number Diff line number Diff line Loading @@ -177,7 +177,8 @@ private: [[nodiscard]] status_t rpcSend(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, const void* data, size_t size); const void* data, size_t size, const std::function<status_t()>& altPoll = nullptr); [[nodiscard]] status_t rpcRec(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, void* data, size_t size); Loading libs/binder/RpcTransportRaw.cpp +53 −37 Original line number Diff line number Diff line Loading @@ -43,56 +43,72 @@ public: return ret; } status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override { const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data); const uint8_t* end = buffer + size; template <typename Buffer, typename SendOrReceive> status_t interruptableReadOrWrite(FdTrigger* fdTrigger, Buffer buffer, size_t size, SendOrReceive sendOrReceiveFun, const char* funName, int16_t event, const std::function<status_t()>& altPoll) { const Buffer end = buffer + size; MAYBE_WAIT_IN_FLAKE_MODE; status_t status; while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLOUT)) == OK) { ssize_t writeSize = TEMP_FAILURE_RETRY(::send(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL)); if (writeSize < 0) { // Since we didn't poll, we need to manually check to see if it was triggered. Otherwise, we // may never know we should be shutting down. if (fdTrigger->isTriggered()) { return DEAD_OBJECT; } bool havePolled = false; while (true) { ssize_t processSize = TEMP_FAILURE_RETRY( sendOrReceiveFun(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL)); if (processSize < 0) { int savedErrno = errno; LOG_RPC_DETAIL("RpcTransport send(): %s", strerror(savedErrno)); // Still return the error on later passes, since it would expose // a problem with polling if (havePolled || (!havePolled && savedErrno != EAGAIN && savedErrno != EWOULDBLOCK)) { LOG_RPC_DETAIL("RpcTransport %s(): %s", funName, strerror(savedErrno)); return -savedErrno; } } else if (processSize == 0) { return DEAD_OBJECT; } else { buffer += processSize; if (buffer == end) { return OK; } } if (writeSize == 0) return DEAD_OBJECT; buffer += writeSize; if (buffer == end) return OK; if (altPoll) { if (status_t status = altPoll(); status != OK) return status; if (fdTrigger->isTriggered()) { return DEAD_OBJECT; } } else { if (status_t status = fdTrigger->triggerablePoll(mSocket.get(), event); status != OK) return status; if (!havePolled) havePolled = true; } } status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override { uint8_t* buffer = reinterpret_cast<uint8_t*>(data); uint8_t* end = buffer + size; MAYBE_WAIT_IN_FLAKE_MODE; status_t status; while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLIN)) == OK) { ssize_t readSize = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL)); if (readSize < 0) { int savedErrno = errno; LOG_RPC_DETAIL("RpcTransport recv(): %s", strerror(savedErrno)); return -savedErrno; } if (readSize == 0) return DEAD_OBJECT; // EOF buffer += readSize; if (buffer == end) return OK; status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size, const std::function<status_t()>& altPoll) override { return interruptableReadOrWrite(fdTrigger, reinterpret_cast<const uint8_t*>(data), size, send, "send", POLLOUT, altPoll); } return status; status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size, const std::function<status_t()>& altPoll) override { return interruptableReadOrWrite(fdTrigger, reinterpret_cast<uint8_t*>(data), size, recv, "recv", POLLIN, altPoll); } private: android::base::unique_fd mSocket; base::unique_fd mSocket; }; // RpcTransportCtx with TLS disabled. Loading Loading
libs/binder/RpcServer.cpp +3 −3 Original line number Diff line number Diff line Loading @@ -278,7 +278,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie RpcConnectionHeader header; if (status == OK) { status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header, sizeof(header)); sizeof(header), {}); if (status != OK) { ALOGE("Failed to read ID for client connecting to RPC server: %s", statusToString(status).c_str()); Loading @@ -291,7 +291,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie if (header.sessionIdSize > 0) { sessionId.resize(header.sessionIdSize); status = client->interruptableReadFully(server->mShutdownTrigger.get(), sessionId.data(), sessionId.size()); sessionId.data(), sessionId.size(), {}); if (status != OK) { ALOGE("Failed to read session ID for client connecting to RPC server: %s", statusToString(status).c_str()); Loading @@ -316,7 +316,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie }; status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response, sizeof(response)); sizeof(response), {}); if (status != OK) { ALOGE("Failed to send new session response: %s", statusToString(status).c_str()); // still need to cleanup before we can return Loading
libs/binder/RpcSession.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -560,7 +560,7 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ } auto sendHeaderStatus = server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header)); server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header), {}); if (sendHeaderStatus != OK) { ALOGE("Could not write connection header to socket: %s", statusToString(sendHeaderStatus).c_str()); Loading @@ -570,7 +570,7 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ if (sessionId.size() > 0) { auto sendSessionIdStatus = server->interruptableWriteFully(mShutdownTrigger.get(), sessionId.data(), sessionId.size()); sessionId.size(), {}); if (sendSessionIdStatus != OK) { ALOGE("Could not write session ID ('%s') to socket: %s", base::HexString(sessionId.data(), sessionId.size()).c_str(), Loading
libs/binder/RpcState.cpp +35 −14 Original line number Diff line number Diff line Loading @@ -307,7 +307,7 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) { status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, const void* data, size_t size) { size_t size, const std::function<status_t()>& altPoll) { LOG_RPC_DETAIL("Sending %s on RpcTransport %p: %s", what, connection->rpcTransport.get(), android::base::HexString(data, size).c_str()); Loading @@ -319,7 +319,7 @@ status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection, if (status_t status = connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(), data, size); data, size, altPoll); status != OK) { LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size, connection->rpcTransport.get(), statusToString(status).c_str()); Loading @@ -341,7 +341,7 @@ status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection, if (status_t status = connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(), data, size); data, size, {}); status != OK) { LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size, connection->rpcTransport.get(), statusToString(status).c_str()); Loading Loading @@ -523,21 +523,44 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti memcpy(transactionData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireTransaction), data.data(), data.dataSize()); constexpr size_t kWaitMaxUs = 1000000; constexpr size_t kWaitLogUs = 10000; size_t waitUs = 0; // Oneway calls have no sync point, so if many are sent before, whether this // is a twoway or oneway transaction, they may have filled up the socket. // So, make sure we drain them before polling. std::function<status_t()> drainRefs = [&] { if (waitUs > kWaitLogUs) { ALOGE("Cannot send command, trying to process pending refcounts. Waiting %zuus. Too " "many oneway calls?", waitUs); } if (waitUs > 0) { usleep(waitUs); waitUs = std::min(kWaitMaxUs, waitUs * 2); } else { waitUs = 1; } return drainCommands(connection, session, CommandType::CONTROL_ONLY); }; if (status_t status = rpcSend(connection, session, "transaction", transactionData.data(), transactionData.size()); status != OK) transactionData.size(), drainRefs); status != OK) { // TODO(b/167966510): need to undo onBinderLeaving - we know the // refcount isn't successfully transferred. return status; } if (flags & IBinder::FLAG_ONEWAY) { LOG_RPC_DETAIL("Oneway command, so no longer waiting on RpcTransport %p", connection->rpcTransport.get()); // Do not wait on result. // However, too many oneway calls may cause refcounts to build up and fill up the socket, // so process those. return drainCommands(connection, session, CommandType::CONTROL_ONLY); return OK; } LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction."); Loading Loading @@ -723,7 +746,7 @@ status_t RpcState::processTransactInternal(const sp<RpcSession::RpcConnection>& // for 'recursive' calls to this, we have already read and processed the // binder from the transaction data and taken reference counts into account, // so it is cached here. sp<IBinder> targetRef; sp<IBinder> target; processTransactInternalTailCall: if (transactionData.size() < sizeof(RpcWireTransaction)) { Loading @@ -738,12 +761,9 @@ processTransactInternalTailCall: bool oneway = transaction->flags & IBinder::FLAG_ONEWAY; status_t replyStatus = OK; sp<IBinder> target; if (addr != 0) { if (!targetRef) { if (!target) { replyStatus = onBinderEntering(session, addr, &target); } else { target = targetRef; } if (replyStatus != OK) { Loading Loading @@ -910,7 +930,8 @@ processTransactInternalTailCall: // reset up arguments transactionData = std::move(todo.data); targetRef = std::move(todo.ref); LOG_ALWAYS_FATAL_IF(target != todo.ref, "async list should be associated with a binder"); it->second.asyncTodo.pop(); goto processTransactInternalTailCall; Loading
libs/binder/RpcState.h +2 −1 Original line number Diff line number Diff line Loading @@ -177,7 +177,8 @@ private: [[nodiscard]] status_t rpcSend(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, const void* data, size_t size); const void* data, size_t size, const std::function<status_t()>& altPoll = nullptr); [[nodiscard]] status_t rpcRec(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const char* what, void* data, size_t size); Loading
libs/binder/RpcTransportRaw.cpp +53 −37 Original line number Diff line number Diff line Loading @@ -43,56 +43,72 @@ public: return ret; } status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override { const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data); const uint8_t* end = buffer + size; template <typename Buffer, typename SendOrReceive> status_t interruptableReadOrWrite(FdTrigger* fdTrigger, Buffer buffer, size_t size, SendOrReceive sendOrReceiveFun, const char* funName, int16_t event, const std::function<status_t()>& altPoll) { const Buffer end = buffer + size; MAYBE_WAIT_IN_FLAKE_MODE; status_t status; while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLOUT)) == OK) { ssize_t writeSize = TEMP_FAILURE_RETRY(::send(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL)); if (writeSize < 0) { // Since we didn't poll, we need to manually check to see if it was triggered. Otherwise, we // may never know we should be shutting down. if (fdTrigger->isTriggered()) { return DEAD_OBJECT; } bool havePolled = false; while (true) { ssize_t processSize = TEMP_FAILURE_RETRY( sendOrReceiveFun(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL)); if (processSize < 0) { int savedErrno = errno; LOG_RPC_DETAIL("RpcTransport send(): %s", strerror(savedErrno)); // Still return the error on later passes, since it would expose // a problem with polling if (havePolled || (!havePolled && savedErrno != EAGAIN && savedErrno != EWOULDBLOCK)) { LOG_RPC_DETAIL("RpcTransport %s(): %s", funName, strerror(savedErrno)); return -savedErrno; } } else if (processSize == 0) { return DEAD_OBJECT; } else { buffer += processSize; if (buffer == end) { return OK; } } if (writeSize == 0) return DEAD_OBJECT; buffer += writeSize; if (buffer == end) return OK; if (altPoll) { if (status_t status = altPoll(); status != OK) return status; if (fdTrigger->isTriggered()) { return DEAD_OBJECT; } } else { if (status_t status = fdTrigger->triggerablePoll(mSocket.get(), event); status != OK) return status; if (!havePolled) havePolled = true; } } status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override { uint8_t* buffer = reinterpret_cast<uint8_t*>(data); uint8_t* end = buffer + size; MAYBE_WAIT_IN_FLAKE_MODE; status_t status; while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLIN)) == OK) { ssize_t readSize = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL)); if (readSize < 0) { int savedErrno = errno; LOG_RPC_DETAIL("RpcTransport recv(): %s", strerror(savedErrno)); return -savedErrno; } if (readSize == 0) return DEAD_OBJECT; // EOF buffer += readSize; if (buffer == end) return OK; status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size, const std::function<status_t()>& altPoll) override { return interruptableReadOrWrite(fdTrigger, reinterpret_cast<const uint8_t*>(data), size, send, "send", POLLOUT, altPoll); } return status; status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size, const std::function<status_t()>& altPoll) override { return interruptableReadOrWrite(fdTrigger, reinterpret_cast<uint8_t*>(data), size, recv, "recv", POLLIN, altPoll); } private: android::base::unique_fd mSocket; base::unique_fd mSocket; }; // RpcTransportCtx with TLS disabled. Loading