Loading libs/binder/RpcState.cpp +48 −43 Original line number Diff line number Diff line Loading @@ -693,21 +693,26 @@ status_t RpcState::transactInternal(const sp<RpcSession::RpcConnection>& connect .parcelDataSize = static_cast<uint32_t>(data.dataSize()), }; // Oneway calls have no sync point, so if many are sent before, whether this // is a twoway or oneway transaction, they may have filled up the socket. // So, make sure we drain them before polling constexpr size_t kWaitMaxUs = 1000000; constexpr size_t kWaitLogUs = 10000; size_t waitUs = 0; iovec iovs[]{ {&command, sizeof(RpcWireHeader)}, {&transaction, sizeof(RpcWireTransaction)}, {const_cast<uint8_t*>(data.data()), data.dataSize()}, objectTableSpan.toIovec(), }; auto altPoll = [&] { if (waitUs > kWaitLogUs) { std::optional<SmallFunction<status_t()>> altPoll = std::nullopt; struct { const sp<RpcSession::RpcConnection> connection; const sp<RpcSession> session; size_t waitUs; } tmpHeap = {connection, session, 0}; if (session->getMaxIncomingThreads() == 0) { altPoll = [this, &tmpHeap] { // Oneway calls have no sync point, so if many are sent before, whether this // is a twoway or oneway transaction, they may have filled up the socket. // So, make sure we drain them before polling constexpr size_t kWaitMaxUs = 1000000; constexpr size_t kWaitLogUs = 10000; if (tmpHeap.waitUs > kWaitLogUs) { // At this point, the transaction buffer is filling up, and we would like to // poll and wait for us to be able to write. However, if we just wait, then // in the case of too many oneway calls, if the other side is blocked on Loading @@ -726,23 +731,23 @@ status_t RpcState::transactInternal(const sp<RpcSession::RpcConnection>& connect // to happen on the incoming calls in order to avoid this sleep. ALOGE("Cannot send command, trying to process pending refcounts. Waiting " "%zuus. Common when too much data is sent or too many oneway calls build up", waitUs); tmpHeap.waitUs); } if (waitUs > 0) { usleep(waitUs); waitUs = std::min(kWaitMaxUs, waitUs * 2); if (tmpHeap.waitUs > 0) { usleep(tmpHeap.waitUs); tmpHeap.waitUs = std::min(kWaitMaxUs, tmpHeap.waitUs * 2); } else { waitUs = 1; tmpHeap.waitUs = 1; } // This is restricted to "CONTROL_ONLY" because we should not receive any // nested transactions until the entire transaction is sent and starts // executing. return drainCommands(connection, session, CommandType::CONTROL_ONLY); return drainCommands(tmpHeap.connection, tmpHeap.session, CommandType::CONTROL_ONLY); }; if (status_t status = rpcSend(connection, session, "transaction", iovs, countof(iovs), std::ref(altPoll), } if (status_t status = rpcSend(connection, session, "transaction", iovs, countof(iovs), altPoll, rpcFields->mImpl ? &rpcFields->mImpl->mFds : nullptr); status != OK) { // rpcSend calls shutdownAndWait, so all refcounts should be reset. If we ever tolerate Loading libs/binder/include/binder/Functional.h +3 −1 Original line number Diff line number Diff line Loading @@ -56,7 +56,9 @@ constexpr void assert_small_callable() { static_assert(sizeof(F) <= kFunctionBufferSize, "Supplied callable is larger than std::function optimization buffer. " "Try using std::ref, but make sure lambda lives long enough to be called."); "std::ref can avoid this or allow referencing a global function, " "but you should avoid creating large callables that require allocations every " "time"); } template <typename T> Loading libs/binder/rust/rpcbinder/src/server/android.rs +1 −1 Original line number Diff line number Diff line Loading @@ -133,7 +133,7 @@ impl RpcServer { unsafe fn checked_from_ptr(ptr: *mut ARpcServer) -> Result<RpcServer, Error> { if ptr.is_null() { return Err(Error::new(ErrorKind::Other, "Failed to start server")); return Err(Error::other("Failed to start server")); } // SAFETY: Our caller must pass us a valid or null pointer, and we've checked that it's not // null. Loading libs/binder/tests/binderRpcTest.cpp +29 −0 Original line number Diff line number Diff line Loading @@ -602,6 +602,35 @@ TEST_P(BinderRpc, OnewayStressTest) { saturateThreadPool(kNumServerThreads, proc.rootIface); } TEST_P(BinderRpc, OnewayStressTestWithIncomingThread) { if (clientOrServerSingleThreaded()) { GTEST_SKIP() << "This test requires multiple threads"; } constexpr size_t kNumClientThreads = 10; constexpr size_t kNumServerThreads = 10; constexpr size_t kNumCalls = 1000; auto proc = createRpcTestSocketServerProcess( {.numMaxThreads = kNumServerThreads, .numIncomingConnectionsBySession = {1}}); std::vector<std::thread> threads; for (size_t i = 0; i < kNumClientThreads; i++) { threads.push_back(std::thread([&] { for (size_t j = 0; j < kNumCalls; j++) { EXPECT_OK(proc.rootIface->sendString("a")); } })); } for (auto& t : threads) t.join(); saturateThreadPool(kNumServerThreads, proc.rootIface); // There is a race with a leaked binder and session when // .numIncomingConnectionsBySession > 0, so force the shutdown proc.forceShutdown(); } TEST_P(BinderRpc, OnewayCallQueueingWithFds) { if (!supportsFdTransport()) { GTEST_SKIP() << "Would fail trivially (which is tested elsewhere)"; Loading services/inputflinger/InputFilter.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -61,6 +61,7 @@ void InputFilter::notifyInputDevicesChanged(const NotifyInputDevicesChangedArgs& aidlInfo.deviceId = info.getId(); aidlInfo.external = info.isExternal(); aidlInfo.keyboardType = info.getKeyboardType(); aidlInfo.isVirtual = info.getId() < 0 || info.isVirtualDevice(); } if (isFilterEnabled()) { LOG_ALWAYS_FATAL_IF(!mInputFilterRust->notifyInputDevicesChanged(mDeviceInfos).isOk()); Loading Loading
libs/binder/RpcState.cpp +48 −43 Original line number Diff line number Diff line Loading @@ -693,21 +693,26 @@ status_t RpcState::transactInternal(const sp<RpcSession::RpcConnection>& connect .parcelDataSize = static_cast<uint32_t>(data.dataSize()), }; // Oneway calls have no sync point, so if many are sent before, whether this // is a twoway or oneway transaction, they may have filled up the socket. // So, make sure we drain them before polling constexpr size_t kWaitMaxUs = 1000000; constexpr size_t kWaitLogUs = 10000; size_t waitUs = 0; iovec iovs[]{ {&command, sizeof(RpcWireHeader)}, {&transaction, sizeof(RpcWireTransaction)}, {const_cast<uint8_t*>(data.data()), data.dataSize()}, objectTableSpan.toIovec(), }; auto altPoll = [&] { if (waitUs > kWaitLogUs) { std::optional<SmallFunction<status_t()>> altPoll = std::nullopt; struct { const sp<RpcSession::RpcConnection> connection; const sp<RpcSession> session; size_t waitUs; } tmpHeap = {connection, session, 0}; if (session->getMaxIncomingThreads() == 0) { altPoll = [this, &tmpHeap] { // Oneway calls have no sync point, so if many are sent before, whether this // is a twoway or oneway transaction, they may have filled up the socket. // So, make sure we drain them before polling constexpr size_t kWaitMaxUs = 1000000; constexpr size_t kWaitLogUs = 10000; if (tmpHeap.waitUs > kWaitLogUs) { // At this point, the transaction buffer is filling up, and we would like to // poll and wait for us to be able to write. However, if we just wait, then // in the case of too many oneway calls, if the other side is blocked on Loading @@ -726,23 +731,23 @@ status_t RpcState::transactInternal(const sp<RpcSession::RpcConnection>& connect // to happen on the incoming calls in order to avoid this sleep. ALOGE("Cannot send command, trying to process pending refcounts. Waiting " "%zuus. Common when too much data is sent or too many oneway calls build up", waitUs); tmpHeap.waitUs); } if (waitUs > 0) { usleep(waitUs); waitUs = std::min(kWaitMaxUs, waitUs * 2); if (tmpHeap.waitUs > 0) { usleep(tmpHeap.waitUs); tmpHeap.waitUs = std::min(kWaitMaxUs, tmpHeap.waitUs * 2); } else { waitUs = 1; tmpHeap.waitUs = 1; } // This is restricted to "CONTROL_ONLY" because we should not receive any // nested transactions until the entire transaction is sent and starts // executing. return drainCommands(connection, session, CommandType::CONTROL_ONLY); return drainCommands(tmpHeap.connection, tmpHeap.session, CommandType::CONTROL_ONLY); }; if (status_t status = rpcSend(connection, session, "transaction", iovs, countof(iovs), std::ref(altPoll), } if (status_t status = rpcSend(connection, session, "transaction", iovs, countof(iovs), altPoll, rpcFields->mImpl ? &rpcFields->mImpl->mFds : nullptr); status != OK) { // rpcSend calls shutdownAndWait, so all refcounts should be reset. If we ever tolerate Loading
libs/binder/include/binder/Functional.h +3 −1 Original line number Diff line number Diff line Loading @@ -56,7 +56,9 @@ constexpr void assert_small_callable() { static_assert(sizeof(F) <= kFunctionBufferSize, "Supplied callable is larger than std::function optimization buffer. " "Try using std::ref, but make sure lambda lives long enough to be called."); "std::ref can avoid this or allow referencing a global function, " "but you should avoid creating large callables that require allocations every " "time"); } template <typename T> Loading
libs/binder/rust/rpcbinder/src/server/android.rs +1 −1 Original line number Diff line number Diff line Loading @@ -133,7 +133,7 @@ impl RpcServer { unsafe fn checked_from_ptr(ptr: *mut ARpcServer) -> Result<RpcServer, Error> { if ptr.is_null() { return Err(Error::new(ErrorKind::Other, "Failed to start server")); return Err(Error::other("Failed to start server")); } // SAFETY: Our caller must pass us a valid or null pointer, and we've checked that it's not // null. Loading
libs/binder/tests/binderRpcTest.cpp +29 −0 Original line number Diff line number Diff line Loading @@ -602,6 +602,35 @@ TEST_P(BinderRpc, OnewayStressTest) { saturateThreadPool(kNumServerThreads, proc.rootIface); } TEST_P(BinderRpc, OnewayStressTestWithIncomingThread) { if (clientOrServerSingleThreaded()) { GTEST_SKIP() << "This test requires multiple threads"; } constexpr size_t kNumClientThreads = 10; constexpr size_t kNumServerThreads = 10; constexpr size_t kNumCalls = 1000; auto proc = createRpcTestSocketServerProcess( {.numMaxThreads = kNumServerThreads, .numIncomingConnectionsBySession = {1}}); std::vector<std::thread> threads; for (size_t i = 0; i < kNumClientThreads; i++) { threads.push_back(std::thread([&] { for (size_t j = 0; j < kNumCalls; j++) { EXPECT_OK(proc.rootIface->sendString("a")); } })); } for (auto& t : threads) t.join(); saturateThreadPool(kNumServerThreads, proc.rootIface); // There is a race with a leaked binder and session when // .numIncomingConnectionsBySession > 0, so force the shutdown proc.forceShutdown(); } TEST_P(BinderRpc, OnewayCallQueueingWithFds) { if (!supportsFdTransport()) { GTEST_SKIP() << "Would fail trivially (which is tested elsewhere)"; Loading
services/inputflinger/InputFilter.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -61,6 +61,7 @@ void InputFilter::notifyInputDevicesChanged(const NotifyInputDevicesChangedArgs& aidlInfo.deviceId = info.getId(); aidlInfo.external = info.isExternal(); aidlInfo.keyboardType = info.getKeyboardType(); aidlInfo.isVirtual = info.getId() < 0 || info.isVirtualDevice(); } if (isFilterEnabled()) { LOG_ALWAYS_FATAL_IF(!mInputFilterRust->notifyInputDevicesChanged(mDeviceInfos).isOk()); Loading