Loading libs/binder/RpcSession.cpp +2 −1 Original line number Diff line number Diff line Loading @@ -240,7 +240,8 @@ void RpcSession::join(sp<RpcSession>&& session, unique_fd client) { sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client)); while (true) { status_t error = session->state()->getAndExecuteCommand(connection->fd, session); status_t error = session->state()->getAndExecuteCommand(connection->fd, session, RpcState::CommandType::ANY); if (error != OK) { LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", Loading libs/binder/RpcState.cpp +22 −5 Original line number Diff line number Diff line Loading @@ -386,7 +386,11 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& if (flags & IBinder::FLAG_ONEWAY) { LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", fd.get()); return OK; // do not wait for result // Do not wait on result. // However, too many oneway calls may cause refcounts to build up and fill up the socket, // so process those. return drainCommands(fd, session, CommandType::CONTROL_ONLY); } LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction."); Loading @@ -413,7 +417,8 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& if (command.command == RPC_COMMAND_REPLY) break; if (status_t status = processServerCommand(fd, session, command); status != OK) if (status_t status = processServerCommand(fd, session, command, CommandType::ANY); status != OK) return status; } Loading Loading @@ -471,7 +476,8 @@ status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& ad return OK; } status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session) { status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session, CommandType type) { LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get()); RpcWireHeader command; Loading @@ -479,11 +485,21 @@ status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcS status != OK) return status; return processServerCommand(fd, session, command); return processServerCommand(fd, session, command, type); } status_t RpcState::drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session, CommandType type) { uint8_t buf; while (0 < TEMP_FAILURE_RETRY(recv(fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) { status_t status = getAndExecuteCommand(fd, session, type); if (status != OK) return status; } return OK; } status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcSession>& session, const RpcWireHeader& command) { const RpcWireHeader& command, CommandType type) { IPCThreadState* kernelBinderState = IPCThreadState::selfOrNull(); IPCThreadState::SpGuard spGuard{ .address = __builtin_frame_address(0), Loading @@ -501,6 +517,7 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS switch (command.command) { case RPC_COMMAND_TRANSACT: if (type != CommandType::ANY) return BAD_TYPE; return processTransact(fd, session, command); case RPC_COMMAND_DEC_STRONG: return processDecStrong(fd, session, command); Loading libs/binder/RpcState.h +9 −2 Original line number Diff line number Diff line Loading @@ -66,8 +66,15 @@ public: const sp<RpcSession>& session, Parcel* reply, uint32_t flags); [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address); enum class CommandType { ANY, CONTROL_ONLY, }; [[nodiscard]] status_t getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session); const sp<RpcSession>& session, CommandType type); [[nodiscard]] status_t drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session, CommandType type); /** * Called by Parcel for outgoing binders. This implies one refcount of Loading Loading @@ -129,7 +136,7 @@ private: Parcel* reply); [[nodiscard]] status_t processServerCommand(const base::unique_fd& fd, const sp<RpcSession>& session, const RpcWireHeader& command); const RpcWireHeader& command, CommandType type); [[nodiscard]] status_t processTransact(const base::unique_fd& fd, const sp<RpcSession>& session, const RpcWireHeader& command); [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd, Loading libs/binder/tests/binderRpcTest.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -876,7 +876,7 @@ TEST_P(BinderRpc, ThreadingStressTest) { TEST_P(BinderRpc, OnewayStressTest) { constexpr size_t kNumClientThreads = 10; constexpr size_t kNumServerThreads = 10; constexpr size_t kNumCalls = 50; constexpr size_t kNumCalls = 500; auto proc = createRpcTestSocketServerProcess(kNumServerThreads); Loading Loading
libs/binder/RpcSession.cpp +2 −1 Original line number Diff line number Diff line Loading @@ -240,7 +240,8 @@ void RpcSession::join(sp<RpcSession>&& session, unique_fd client) { sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client)); while (true) { status_t error = session->state()->getAndExecuteCommand(connection->fd, session); status_t error = session->state()->getAndExecuteCommand(connection->fd, session, RpcState::CommandType::ANY); if (error != OK) { LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", Loading
libs/binder/RpcState.cpp +22 −5 Original line number Diff line number Diff line Loading @@ -386,7 +386,11 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& if (flags & IBinder::FLAG_ONEWAY) { LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", fd.get()); return OK; // do not wait for result // Do not wait on result. // However, too many oneway calls may cause refcounts to build up and fill up the socket, // so process those. return drainCommands(fd, session, CommandType::CONTROL_ONLY); } LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction."); Loading @@ -413,7 +417,8 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& if (command.command == RPC_COMMAND_REPLY) break; if (status_t status = processServerCommand(fd, session, command); status != OK) if (status_t status = processServerCommand(fd, session, command, CommandType::ANY); status != OK) return status; } Loading Loading @@ -471,7 +476,8 @@ status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& ad return OK; } status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session) { status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session, CommandType type) { LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get()); RpcWireHeader command; Loading @@ -479,11 +485,21 @@ status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcS status != OK) return status; return processServerCommand(fd, session, command); return processServerCommand(fd, session, command, type); } status_t RpcState::drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session, CommandType type) { uint8_t buf; while (0 < TEMP_FAILURE_RETRY(recv(fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) { status_t status = getAndExecuteCommand(fd, session, type); if (status != OK) return status; } return OK; } status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcSession>& session, const RpcWireHeader& command) { const RpcWireHeader& command, CommandType type) { IPCThreadState* kernelBinderState = IPCThreadState::selfOrNull(); IPCThreadState::SpGuard spGuard{ .address = __builtin_frame_address(0), Loading @@ -501,6 +517,7 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS switch (command.command) { case RPC_COMMAND_TRANSACT: if (type != CommandType::ANY) return BAD_TYPE; return processTransact(fd, session, command); case RPC_COMMAND_DEC_STRONG: return processDecStrong(fd, session, command); Loading
libs/binder/RpcState.h +9 −2 Original line number Diff line number Diff line Loading @@ -66,8 +66,15 @@ public: const sp<RpcSession>& session, Parcel* reply, uint32_t flags); [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address); enum class CommandType { ANY, CONTROL_ONLY, }; [[nodiscard]] status_t getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session); const sp<RpcSession>& session, CommandType type); [[nodiscard]] status_t drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session, CommandType type); /** * Called by Parcel for outgoing binders. This implies one refcount of Loading Loading @@ -129,7 +136,7 @@ private: Parcel* reply); [[nodiscard]] status_t processServerCommand(const base::unique_fd& fd, const sp<RpcSession>& session, const RpcWireHeader& command); const RpcWireHeader& command, CommandType type); [[nodiscard]] status_t processTransact(const base::unique_fd& fd, const sp<RpcSession>& session, const RpcWireHeader& command); [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd, Loading
libs/binder/tests/binderRpcTest.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -876,7 +876,7 @@ TEST_P(BinderRpc, ThreadingStressTest) { TEST_P(BinderRpc, OnewayStressTest) { constexpr size_t kNumClientThreads = 10; constexpr size_t kNumServerThreads = 10; constexpr size_t kNumCalls = 50; constexpr size_t kNumCalls = 500; auto proc = createRpcTestSocketServerProcess(kNumServerThreads); Loading