Loading libs/binder/RpcServer.cpp +29 −7 Original line number Diff line number Diff line Loading @@ -184,10 +184,18 @@ bool RpcServer::acceptOne() { bool RpcServer::shutdown() { std::unique_lock<std::mutex> _l(mLock); if (mShutdownTrigger == nullptr) return false; if (mShutdownTrigger == nullptr) { LOG_RPC_DETAIL("Cannot shutdown. No shutdown trigger installed."); return false; } mShutdownTrigger->trigger(); while (mJoinThreadRunning) mShutdownCv.wait(_l); while (mJoinThreadRunning || !mConnectingThreads.empty()) { ALOGI("Waiting for RpcServer to shut down. Join thread running: %d, Connecting threads: " "%zu", mJoinThreadRunning, mConnectingThreads.size()); mShutdownCv.wait(_l); } // At this point, we know join() is about to exit, but the thread that calls // join() may not have exited yet. Loading Loading @@ -222,17 +230,21 @@ size_t RpcServer::numUninitializedSessions() { void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) { // TODO(b/183988761): cannot trust this simple ID LOG_ALWAYS_FATAL_IF(!server->mAgreedExperimental, "no!"); bool idValid = true; // mShutdownTrigger can only be cleared once connection threads have joined. // It must be set before this thread is started LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr); int32_t id; if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) { ALOGE("Could not read ID from fd %d", clientFd.get()); idValid = false; bool idValid = server->mShutdownTrigger->interruptableRecv(clientFd.get(), &id, sizeof(id)); if (!idValid) { ALOGE("Failed to read ID for client connecting to RPC server."); } std::thread thisThread; sp<RpcSession> session; { std::lock_guard<std::mutex> _l(server->mLock); std::unique_lock<std::mutex> _l(server->mLock); auto threadId = server->mConnectingThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(), Loading @@ -241,6 +253,16 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie ScopeGuard detachGuard = [&]() { thisThread.detach(); }; server->mConnectingThreads.erase(threadId); // TODO(b/185167543): we currently can't disable this because we don't // shutdown sessions as well, only the server itself. So, we need to // keep this separate from the detachGuard, since we temporarily want to // give a notification even when we pass ownership of the thread to // a session. ScopeGuard threadLifetimeGuard = [&]() { _l.unlock(); server->mShutdownCv.notify_all(); }; if (!idValid) { return; } Loading libs/binder/RpcSession.cpp +16 −0 Original line number Diff line number Diff line Loading @@ -144,6 +144,22 @@ bool RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) { } } bool RpcSession::FdTrigger::interruptableRecv(base::borrowed_fd fd, void* data, size_t size) { uint8_t* buffer = reinterpret_cast<uint8_t*>(data); uint8_t* end = buffer + size; while (triggerablePollRead(fd)) { ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL)); if (readSize < 0) { ALOGE("Failed to read %s", strerror(errno)); return false; } buffer += readSize; if (buffer == end) return true; } return false; } status_t RpcSession::readId() { { std::lock_guard<std::mutex> _l(mMutex); Loading libs/binder/include/binder/RpcSession.h +9 −0 Original line number Diff line number Diff line Loading @@ -134,6 +134,15 @@ private: */ bool triggerablePollRead(base::borrowed_fd fd); /** * Read, but allow the read to be interrupted by this trigger. * * Return: * true - read succeeded at 'size' * false - interrupted (failure or trigger) */ bool interruptableRecv(base::borrowed_fd fd, void* data, size_t size); private: base::unique_fd mWrite; base::unique_fd mRead; Loading libs/binder/tests/rpc_fuzzer/main.cpp +11 −2 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ #include <binder/Parcel.h> #include <binder/RpcServer.h> #include <binder/RpcSession.h> #include <fuzzer/FuzzedDataProvider.h> #include <sys/resource.h> #include <sys/un.h> Loading Loading @@ -53,6 +54,7 @@ class SomeBinder : public BBinder { extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { if (size > 50000) return 0; FuzzedDataProvider provider(data, size); unlink(kSock.c_str()); Loading Loading @@ -84,14 +86,21 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { CHECK(base::WriteFully(clientFd, &id, sizeof(id))); #endif CHECK(base::WriteFully(clientFd, data, size)); bool hangupBeforeShutdown = provider.ConsumeBool(); std::vector<uint8_t> writeData = provider.ConsumeRemainingBytes<uint8_t>(); CHECK(base::WriteFully(clientFd, writeData.data(), writeData.size())); if (hangupBeforeShutdown) { clientFd.reset(); } // TODO(185167543): currently this is okay because we only shutdown the one // thread, but once we can shutdown other sessions, we'll need to change // this behavior in order to make sure all of the input is actually read. while (!server->shutdown()) usleep(100); clientFd.reset(); serverThread.join(); // TODO(b/185167543): better way to force a server to shutdown Loading Loading
libs/binder/RpcServer.cpp +29 −7 Original line number Diff line number Diff line Loading @@ -184,10 +184,18 @@ bool RpcServer::acceptOne() { bool RpcServer::shutdown() { std::unique_lock<std::mutex> _l(mLock); if (mShutdownTrigger == nullptr) return false; if (mShutdownTrigger == nullptr) { LOG_RPC_DETAIL("Cannot shutdown. No shutdown trigger installed."); return false; } mShutdownTrigger->trigger(); while (mJoinThreadRunning) mShutdownCv.wait(_l); while (mJoinThreadRunning || !mConnectingThreads.empty()) { ALOGI("Waiting for RpcServer to shut down. Join thread running: %d, Connecting threads: " "%zu", mJoinThreadRunning, mConnectingThreads.size()); mShutdownCv.wait(_l); } // At this point, we know join() is about to exit, but the thread that calls // join() may not have exited yet. Loading Loading @@ -222,17 +230,21 @@ size_t RpcServer::numUninitializedSessions() { void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) { // TODO(b/183988761): cannot trust this simple ID LOG_ALWAYS_FATAL_IF(!server->mAgreedExperimental, "no!"); bool idValid = true; // mShutdownTrigger can only be cleared once connection threads have joined. // It must be set before this thread is started LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr); int32_t id; if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) { ALOGE("Could not read ID from fd %d", clientFd.get()); idValid = false; bool idValid = server->mShutdownTrigger->interruptableRecv(clientFd.get(), &id, sizeof(id)); if (!idValid) { ALOGE("Failed to read ID for client connecting to RPC server."); } std::thread thisThread; sp<RpcSession> session; { std::lock_guard<std::mutex> _l(server->mLock); std::unique_lock<std::mutex> _l(server->mLock); auto threadId = server->mConnectingThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(), Loading @@ -241,6 +253,16 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie ScopeGuard detachGuard = [&]() { thisThread.detach(); }; server->mConnectingThreads.erase(threadId); // TODO(b/185167543): we currently can't disable this because we don't // shutdown sessions as well, only the server itself. So, we need to // keep this separate from the detachGuard, since we temporarily want to // give a notification even when we pass ownership of the thread to // a session. ScopeGuard threadLifetimeGuard = [&]() { _l.unlock(); server->mShutdownCv.notify_all(); }; if (!idValid) { return; } Loading
libs/binder/RpcSession.cpp +16 −0 Original line number Diff line number Diff line Loading @@ -144,6 +144,22 @@ bool RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) { } } bool RpcSession::FdTrigger::interruptableRecv(base::borrowed_fd fd, void* data, size_t size) { uint8_t* buffer = reinterpret_cast<uint8_t*>(data); uint8_t* end = buffer + size; while (triggerablePollRead(fd)) { ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL)); if (readSize < 0) { ALOGE("Failed to read %s", strerror(errno)); return false; } buffer += readSize; if (buffer == end) return true; } return false; } status_t RpcSession::readId() { { std::lock_guard<std::mutex> _l(mMutex); Loading
libs/binder/include/binder/RpcSession.h +9 −0 Original line number Diff line number Diff line Loading @@ -134,6 +134,15 @@ private: */ bool triggerablePollRead(base::borrowed_fd fd); /** * Read, but allow the read to be interrupted by this trigger. * * Return: * true - read succeeded at 'size' * false - interrupted (failure or trigger) */ bool interruptableRecv(base::borrowed_fd fd, void* data, size_t size); private: base::unique_fd mWrite; base::unique_fd mRead; Loading
libs/binder/tests/rpc_fuzzer/main.cpp +11 −2 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ #include <binder/Parcel.h> #include <binder/RpcServer.h> #include <binder/RpcSession.h> #include <fuzzer/FuzzedDataProvider.h> #include <sys/resource.h> #include <sys/un.h> Loading Loading @@ -53,6 +54,7 @@ class SomeBinder : public BBinder { extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { if (size > 50000) return 0; FuzzedDataProvider provider(data, size); unlink(kSock.c_str()); Loading Loading @@ -84,14 +86,21 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { CHECK(base::WriteFully(clientFd, &id, sizeof(id))); #endif CHECK(base::WriteFully(clientFd, data, size)); bool hangupBeforeShutdown = provider.ConsumeBool(); std::vector<uint8_t> writeData = provider.ConsumeRemainingBytes<uint8_t>(); CHECK(base::WriteFully(clientFd, writeData.data(), writeData.size())); if (hangupBeforeShutdown) { clientFd.reset(); } // TODO(185167543): currently this is okay because we only shutdown the one // thread, but once we can shutdown other sessions, we'll need to change // this behavior in order to make sure all of the input is actually read. while (!server->shutdown()) usleep(100); clientFd.reset(); serverThread.join(); // TODO(b/185167543): better way to force a server to shutdown Loading