Loading libs/binder/BuildFlags.h 0 → 100644 +25 −0 Original line number Diff line number Diff line /* * Copyright (C) 2022 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace android { #ifdef BINDER_RPC_SINGLE_THREADED constexpr bool kEnableRpcThreads = false; #else constexpr bool kEnableRpcThreads = true; #endif } // namespace android libs/binder/FdTrigger.cpp +24 −2 Original line number Diff line number Diff line Loading @@ -28,25 +28,45 @@ namespace android { std::unique_ptr<FdTrigger> FdTrigger::make() { auto ret = std::make_unique<FdTrigger>(); #ifndef BINDER_RPC_SINGLE_THREADED if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) { ALOGE("Could not create pipe %s", strerror(errno)); return nullptr; } #endif return ret; } void FdTrigger::trigger() { #ifdef BINDER_RPC_SINGLE_THREADED mTriggered = true; #else mWrite.reset(); #endif } bool FdTrigger::isTriggered() { #ifdef BINDER_RPC_SINGLE_THREADED return mTriggered; #else return mWrite == -1; #endif } status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { #ifdef BINDER_RPC_SINGLE_THREADED if (mTriggered) { return DEAD_OBJECT; } #endif LOG_ALWAYS_FATAL_IF(event == 0, "triggerablePoll %d with event 0 is not allowed", fd.get()); pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, {.fd = mRead.get(), .events = 0, .revents = 0}}; pollfd pfd[]{ {.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, #ifndef BINDER_RPC_SINGLE_THREADED {.fd = mRead.get(), .events = 0, .revents = 0}, #endif }; int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1)); if (ret < 0) { return -errno; Loading @@ -55,6 +75,7 @@ status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { // At least one FD has events. Check them. #ifndef BINDER_RPC_SINGLE_THREADED // Detect explicit trigger(): DEAD_OBJECT if (pfd[1].revents & POLLHUP) { return DEAD_OBJECT; Loading @@ -68,6 +89,7 @@ status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { // pfd[1].revents is 0, hence pfd[0].revents must be set, and only possible values are // a subset of event | POLLHUP | POLLERR | POLLNVAL. #endif // POLLNVAL: invalid FD number, e.g. not opened. if (pfd[0].revents & POLLNVAL) { Loading libs/binder/FdTrigger.h +4 −0 Original line number Diff line number Diff line Loading @@ -55,7 +55,11 @@ public: [[nodiscard]] status_t triggerablePoll(base::borrowed_fd fd, int16_t event); private: #ifdef BINDER_RPC_SINGLE_THREADED bool mTriggered = false; #else base::unique_fd mWrite; base::unique_fd mRead; #endif }; } // namespace android libs/binder/RpcServer.cpp +42 −26 Original line number Diff line number Diff line Loading @@ -32,6 +32,7 @@ #include <log/log.h> #include <utils/Compat.h> #include "BuildFlags.h" #include "FdTrigger.h" #include "RpcSocketAddress.h" #include "RpcState.h" Loading Loading @@ -131,27 +132,27 @@ void RpcServer::setSupportedFileDescriptorTransportModes( } void RpcServer::setRootObject(const sp<IBinder>& binder) { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); mRootObjectFactory = nullptr; mRootObjectWeak = mRootObject = binder; } void RpcServer::setRootObjectWeak(const wp<IBinder>& binder) { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); mRootObject.clear(); mRootObjectFactory = nullptr; mRootObjectWeak = binder; } void RpcServer::setPerSessionRootObject( std::function<sp<IBinder>(const void*, size_t)>&& makeObject) { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); mRootObject.clear(); mRootObjectWeak.clear(); mRootObjectFactory = std::move(makeObject); } sp<IBinder> RpcServer::getRootObject() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); bool hasWeak = mRootObjectWeak.unsafe_get(); sp<IBinder> ret = mRootObjectWeak.promote(); ALOGW_IF(hasWeak && ret == nullptr, "RpcServer root object is freed, returning nullptr"); Loading @@ -159,7 +160,7 @@ sp<IBinder> RpcServer::getRootObject() { } std::vector<uint8_t> RpcServer::getCertificate(RpcCertificateFormat format) { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); return mCtx->getCertificate(format); } Loading @@ -168,15 +169,17 @@ static void joinRpcServer(sp<RpcServer>&& thiz) { } void RpcServer::start() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); LOG_ALWAYS_FATAL_IF(mJoinThread.get(), "Already started!"); mJoinThread = std::make_unique<std::thread>(&joinRpcServer, sp<RpcServer>::fromExisting(this)); mJoinThread = std::make_unique<RpcMaybeThread>(&joinRpcServer, sp<RpcServer>::fromExisting(this)); rpcJoinIfSingleThreaded(*mJoinThread); } void RpcServer::join() { { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join."); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined"); mJoinThreadRunning = true; Loading Loading @@ -204,24 +207,31 @@ void RpcServer::join() { LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); { std::lock_guard<std::mutex> _l(mLock); std::thread thread = std::thread(&RpcServer::establishConnection, sp<RpcServer>::fromExisting(this), RpcMutexLockGuard _l(mLock); RpcMaybeThread thread = RpcMaybeThread(&RpcServer::establishConnection, sp<RpcServer>::fromExisting(this), std::move(clientFd), addr, addrLen); mConnectingThreads[thread.get_id()] = std::move(thread); auto& threadRef = mConnectingThreads[thread.get_id()]; threadRef = std::move(thread); rpcJoinIfSingleThreaded(threadRef); } } LOG_RPC_DETAIL("RpcServer::join exiting with %s", statusToString(status).c_str()); { std::lock_guard<std::mutex> _l(mLock); if constexpr (kEnableRpcThreads) { RpcMutexLockGuard _l(mLock); mJoinThreadRunning = false; } else { // Multi-threaded builds clear this in shutdown(), but we need it valid // so the loop above exits cleanly mShutdownTrigger = nullptr; } mShutdownCv.notify_all(); } bool RpcServer::shutdown() { std::unique_lock<std::mutex> _l(mLock); RpcMutexUniqueLock _l(mLock); if (mShutdownTrigger == nullptr) { LOG_RPC_DETAIL("Cannot shutdown. No shutdown trigger installed (already shutdown?)"); return false; Loading @@ -232,10 +242,16 @@ bool RpcServer::shutdown() { for (auto& [id, session] : mSessions) { (void)id; // server lock is a more general lock std::lock_guard<std::mutex> _lSession(session->mMutex); RpcMutexLockGuard _lSession(session->mMutex); session->mShutdownTrigger->trigger(); } if constexpr (!kEnableRpcThreads) { // In single-threaded mode we're done here, everything else that // needs to happen should be at the end of RpcServer::join() return true; } while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) { if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) { ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, " Loading Loading @@ -263,7 +279,7 @@ bool RpcServer::shutdown() { } std::vector<sp<RpcSession>> RpcServer::listSessions() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); std::vector<sp<RpcSession>> sessions; for (auto& [id, session] : mSessions) { (void)id; Loading @@ -273,7 +289,7 @@ std::vector<sp<RpcSession>> RpcServer::listSessions() { } size_t RpcServer::numUninitializedSessions() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); return mConnectingThreads.size(); } Loading Loading @@ -354,12 +370,12 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie } } std::thread thisThread; RpcMaybeThread thisThread; sp<RpcSession> session; { std::unique_lock<std::mutex> _l(server->mLock); RpcMutexUniqueLock _l(server->mLock); auto threadId = server->mConnectingThreads.find(std::this_thread::get_id()); auto threadId = server->mConnectingThreads.find(rpc_this_thread::get_id()); LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(), "Must establish connection on owned thread"); thisThread = std::move(threadId->second); Loading Loading @@ -505,7 +521,7 @@ void RpcServer::onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) LOG_RPC_DETAIL("Dropping session with address %s", base::HexString(id.data(), id.size()).c_str()); std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); auto it = mSessions.find(id); LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %s", base::HexString(id.data(), id.size()).c_str()); Loading @@ -519,17 +535,17 @@ void RpcServer::onSessionIncomingThreadEnded() { } bool RpcServer::hasServer() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); return mServer.ok(); } unique_fd RpcServer::releaseServer() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); return std::move(mServer); } status_t RpcServer::setupExternalServer(base::unique_fd serverFd) { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); if (mServer.ok()) { ALOGE("Each RpcServer can only have one server."); return INVALID_OPERATION; Loading libs/binder/RpcSession.cpp +36 −31 Original line number Diff line number Diff line Loading @@ -21,7 +21,6 @@ #include <dlfcn.h> #include <inttypes.h> #include <poll.h> #include <pthread.h> #include <unistd.h> #include <string_view> Loading Loading @@ -60,7 +59,7 @@ RpcSession::RpcSession(std::unique_ptr<RpcTransportCtx> ctx) : mCtx(std::move(ct RpcSession::~RpcSession() { LOG_RPC_DETAIL("RpcSession destroyed %p", this); std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mConnections.mIncoming.size() != 0, "Should not be able to destroy a session with servers in use."); } Loading @@ -77,7 +76,7 @@ sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTrans } void RpcSession::setMaxIncomingThreads(size_t threads) { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(), "Must set max incoming threads before setting up connections, but has %zu " "client(s) and %zu server(s)", Loading @@ -86,12 +85,12 @@ void RpcSession::setMaxIncomingThreads(size_t threads) { } size_t RpcSession::getMaxIncomingThreads() { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); return mMaxIncomingThreads; } void RpcSession::setMaxOutgoingThreads(size_t threads) { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(), "Must set max outgoing threads before setting up connections, but has %zu " "client(s) and %zu server(s)", Loading @@ -100,7 +99,7 @@ void RpcSession::setMaxOutgoingThreads(size_t threads) { } size_t RpcSession::getMaxOutgoingThreads() { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); return mMaxOutgoingThreads; } Loading @@ -113,7 +112,7 @@ bool RpcSession::setProtocolVersion(uint32_t version) { return false; } std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); if (mProtocolVersion && version > *mProtocolVersion) { ALOGE("Cannot upgrade explicitly capped protocol version %u to newer version %u", *mProtocolVersion, version); Loading @@ -125,7 +124,7 @@ bool RpcSession::setProtocolVersion(uint32_t version) { } std::optional<uint32_t> RpcSession::getProtocolVersion() { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); return mProtocolVersion; } Loading Loading @@ -209,7 +208,7 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { } bool RpcSession::shutdownAndWait(bool wait) { std::unique_lock<std::mutex> _l(mMutex); RpcMutexUniqueLock _l(mMutex); LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed"); mShutdownTrigger->trigger(); Loading @@ -222,6 +221,7 @@ bool RpcSession::shutdownAndWait(bool wait) { } _l.unlock(); mRpcBinderState->clear(); return true; Loading Loading @@ -256,7 +256,7 @@ status_t RpcSession::sendDecStrongToTarget(uint64_t address, size_t target) { status_t RpcSession::readId() { { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mForServer != nullptr, "Can only update ID for client."); } Loading @@ -282,7 +282,7 @@ void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() { mCv.notify_all(); } void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::mutex>& lock, void RpcSession::WaitForShutdownListener::waitForShutdown(RpcMutexUniqueLock& lock, const sp<RpcSession>& session) { while (session->mConnections.mIncoming.size() > 0) { if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) { Loading @@ -293,11 +293,11 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std:: } } void RpcSession::preJoinThreadOwnership(std::thread thread) { LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); void RpcSession::preJoinThreadOwnership(RpcMaybeThread thread) { LOG_ALWAYS_FATAL_IF(thread.get_id() != rpc_this_thread::get_id(), "Must own this thread"); { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); mConnections.mThreads[thread.get_id()] = std::move(thread); } } Loading Loading @@ -404,8 +404,8 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult sp<RpcSession::EventListener> listener; { std::lock_guard<std::mutex> _l(session->mMutex); auto it = session->mConnections.mThreads.find(std::this_thread::get_id()); RpcMutexLockGuard _l(session->mMutex); auto it = session->mConnections.mThreads.find(rpc_this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == session->mConnections.mThreads.end()); it->second.detach(); session->mConnections.mThreads.erase(it); Loading Loading @@ -438,7 +438,7 @@ sp<RpcServer> RpcSession::server() { status_t RpcSession::setupClient(const std::function<status_t(const std::vector<uint8_t>& sessionId, bool incoming)>& connectAndInit) { { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mConnections.mOutgoing.size() != 0, "Must only setup session once, but already has %zu clients", mConnections.mOutgoing.size()); Loading Loading @@ -500,7 +500,11 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector< return status; } #ifdef BINDER_RPC_SINGLE_THREADED constexpr size_t outgoingThreads = 1; #else // BINDER_RPC_SINGLE_THREADED size_t outgoingThreads = std::min(numThreadsAvailable, mMaxOutgoingThreads); #endif // BINDER_RPC_SINGLE_THREADED ALOGI_IF(outgoingThreads != numThreadsAvailable, "Server hints client to start %zu outgoing threads, but client will only start %zu " "because it is preconfigured to start at most %zu outgoing threads.", Loading Loading @@ -655,14 +659,14 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ } status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) { std::mutex mutex; std::condition_variable joinCv; std::unique_lock<std::mutex> lock(mutex); std::thread thread; RpcMutex mutex; RpcConditionVariable joinCv; RpcMutexUniqueLock lock(mutex); RpcMaybeThread thread; sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this); bool ownershipTransferred = false; thread = std::thread([&]() { std::unique_lock<std::mutex> threadLock(mutex); thread = RpcMaybeThread([&]() { RpcMutexUniqueLock threadLock(mutex); std::unique_ptr<RpcTransport> movedRpcTransport = std::move(rpcTransport); // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) sp<RpcSession> session = thiz; Loading @@ -678,6 +682,7 @@ status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTran RpcSession::join(std::move(session), std::move(setupResult)); }); rpcJoinIfSingleThreaded(thread); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); return OK; Loading @@ -697,9 +702,9 @@ status_t RpcSession::initShutdownTrigger() { status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) { sp<RpcConnection> connection = sp<RpcConnection>::make(); { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); connection->rpcTransport = std::move(rpcTransport); connection->exclusiveTid = base::GetThreadId(); connection->exclusiveTid = rpcGetThreadId(); mConnections.mOutgoing.push_back(connection); } Loading Loading @@ -736,7 +741,7 @@ bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListene sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( std::unique_ptr<RpcTransport> rpcTransport) { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); if (mConnections.mIncoming.size() >= mMaxIncomingThreads) { ALOGE("Cannot add thread to session with %zu threads (max is set to %zu)", Loading @@ -754,7 +759,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( sp<RpcConnection> session = sp<RpcConnection>::make(); session->rpcTransport = std::move(rpcTransport); session->exclusiveTid = base::GetThreadId(); session->exclusiveTid = rpcGetThreadId(); mConnections.mIncoming.push_back(session); mConnections.mMaxIncoming = mConnections.mIncoming.size(); Loading @@ -763,7 +768,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( } bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { std::unique_lock<std::mutex> _l(mMutex); RpcMutexUniqueLock _l(mMutex); if (auto it = std::find(mConnections.mIncoming.begin(), mConnections.mIncoming.end(), connection); it != mConnections.mIncoming.end()) { Loading @@ -781,7 +786,7 @@ bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { } void RpcSession::clearConnectionTid(const sp<RpcConnection>& connection) { std::unique_lock<std::mutex> _l(mMutex); RpcMutexUniqueLock _l(mMutex); connection->exclusiveTid = std::nullopt; if (mConnections.mWaitingThreads > 0) { _l.unlock(); Loading @@ -799,8 +804,8 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co connection->mConnection = nullptr; connection->mReentrant = false; uint64_t tid = base::GetThreadId(); std::unique_lock<std::mutex> _l(session->mMutex); uint64_t tid = rpcGetThreadId(); RpcMutexUniqueLock _l(session->mMutex); session->mConnections.mWaitingThreads++; while (true) { Loading Loading
libs/binder/BuildFlags.h 0 → 100644 +25 −0 Original line number Diff line number Diff line /* * Copyright (C) 2022 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace android { #ifdef BINDER_RPC_SINGLE_THREADED constexpr bool kEnableRpcThreads = false; #else constexpr bool kEnableRpcThreads = true; #endif } // namespace android
libs/binder/FdTrigger.cpp +24 −2 Original line number Diff line number Diff line Loading @@ -28,25 +28,45 @@ namespace android { std::unique_ptr<FdTrigger> FdTrigger::make() { auto ret = std::make_unique<FdTrigger>(); #ifndef BINDER_RPC_SINGLE_THREADED if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) { ALOGE("Could not create pipe %s", strerror(errno)); return nullptr; } #endif return ret; } void FdTrigger::trigger() { #ifdef BINDER_RPC_SINGLE_THREADED mTriggered = true; #else mWrite.reset(); #endif } bool FdTrigger::isTriggered() { #ifdef BINDER_RPC_SINGLE_THREADED return mTriggered; #else return mWrite == -1; #endif } status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { #ifdef BINDER_RPC_SINGLE_THREADED if (mTriggered) { return DEAD_OBJECT; } #endif LOG_ALWAYS_FATAL_IF(event == 0, "triggerablePoll %d with event 0 is not allowed", fd.get()); pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, {.fd = mRead.get(), .events = 0, .revents = 0}}; pollfd pfd[]{ {.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, #ifndef BINDER_RPC_SINGLE_THREADED {.fd = mRead.get(), .events = 0, .revents = 0}, #endif }; int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1)); if (ret < 0) { return -errno; Loading @@ -55,6 +75,7 @@ status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { // At least one FD has events. Check them. #ifndef BINDER_RPC_SINGLE_THREADED // Detect explicit trigger(): DEAD_OBJECT if (pfd[1].revents & POLLHUP) { return DEAD_OBJECT; Loading @@ -68,6 +89,7 @@ status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { // pfd[1].revents is 0, hence pfd[0].revents must be set, and only possible values are // a subset of event | POLLHUP | POLLERR | POLLNVAL. #endif // POLLNVAL: invalid FD number, e.g. not opened. if (pfd[0].revents & POLLNVAL) { Loading
libs/binder/FdTrigger.h +4 −0 Original line number Diff line number Diff line Loading @@ -55,7 +55,11 @@ public: [[nodiscard]] status_t triggerablePoll(base::borrowed_fd fd, int16_t event); private: #ifdef BINDER_RPC_SINGLE_THREADED bool mTriggered = false; #else base::unique_fd mWrite; base::unique_fd mRead; #endif }; } // namespace android
libs/binder/RpcServer.cpp +42 −26 Original line number Diff line number Diff line Loading @@ -32,6 +32,7 @@ #include <log/log.h> #include <utils/Compat.h> #include "BuildFlags.h" #include "FdTrigger.h" #include "RpcSocketAddress.h" #include "RpcState.h" Loading Loading @@ -131,27 +132,27 @@ void RpcServer::setSupportedFileDescriptorTransportModes( } void RpcServer::setRootObject(const sp<IBinder>& binder) { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); mRootObjectFactory = nullptr; mRootObjectWeak = mRootObject = binder; } void RpcServer::setRootObjectWeak(const wp<IBinder>& binder) { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); mRootObject.clear(); mRootObjectFactory = nullptr; mRootObjectWeak = binder; } void RpcServer::setPerSessionRootObject( std::function<sp<IBinder>(const void*, size_t)>&& makeObject) { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); mRootObject.clear(); mRootObjectWeak.clear(); mRootObjectFactory = std::move(makeObject); } sp<IBinder> RpcServer::getRootObject() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); bool hasWeak = mRootObjectWeak.unsafe_get(); sp<IBinder> ret = mRootObjectWeak.promote(); ALOGW_IF(hasWeak && ret == nullptr, "RpcServer root object is freed, returning nullptr"); Loading @@ -159,7 +160,7 @@ sp<IBinder> RpcServer::getRootObject() { } std::vector<uint8_t> RpcServer::getCertificate(RpcCertificateFormat format) { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); return mCtx->getCertificate(format); } Loading @@ -168,15 +169,17 @@ static void joinRpcServer(sp<RpcServer>&& thiz) { } void RpcServer::start() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); LOG_ALWAYS_FATAL_IF(mJoinThread.get(), "Already started!"); mJoinThread = std::make_unique<std::thread>(&joinRpcServer, sp<RpcServer>::fromExisting(this)); mJoinThread = std::make_unique<RpcMaybeThread>(&joinRpcServer, sp<RpcServer>::fromExisting(this)); rpcJoinIfSingleThreaded(*mJoinThread); } void RpcServer::join() { { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join."); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined"); mJoinThreadRunning = true; Loading Loading @@ -204,24 +207,31 @@ void RpcServer::join() { LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); { std::lock_guard<std::mutex> _l(mLock); std::thread thread = std::thread(&RpcServer::establishConnection, sp<RpcServer>::fromExisting(this), RpcMutexLockGuard _l(mLock); RpcMaybeThread thread = RpcMaybeThread(&RpcServer::establishConnection, sp<RpcServer>::fromExisting(this), std::move(clientFd), addr, addrLen); mConnectingThreads[thread.get_id()] = std::move(thread); auto& threadRef = mConnectingThreads[thread.get_id()]; threadRef = std::move(thread); rpcJoinIfSingleThreaded(threadRef); } } LOG_RPC_DETAIL("RpcServer::join exiting with %s", statusToString(status).c_str()); { std::lock_guard<std::mutex> _l(mLock); if constexpr (kEnableRpcThreads) { RpcMutexLockGuard _l(mLock); mJoinThreadRunning = false; } else { // Multi-threaded builds clear this in shutdown(), but we need it valid // so the loop above exits cleanly mShutdownTrigger = nullptr; } mShutdownCv.notify_all(); } bool RpcServer::shutdown() { std::unique_lock<std::mutex> _l(mLock); RpcMutexUniqueLock _l(mLock); if (mShutdownTrigger == nullptr) { LOG_RPC_DETAIL("Cannot shutdown. No shutdown trigger installed (already shutdown?)"); return false; Loading @@ -232,10 +242,16 @@ bool RpcServer::shutdown() { for (auto& [id, session] : mSessions) { (void)id; // server lock is a more general lock std::lock_guard<std::mutex> _lSession(session->mMutex); RpcMutexLockGuard _lSession(session->mMutex); session->mShutdownTrigger->trigger(); } if constexpr (!kEnableRpcThreads) { // In single-threaded mode we're done here, everything else that // needs to happen should be at the end of RpcServer::join() return true; } while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) { if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) { ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, " Loading Loading @@ -263,7 +279,7 @@ bool RpcServer::shutdown() { } std::vector<sp<RpcSession>> RpcServer::listSessions() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); std::vector<sp<RpcSession>> sessions; for (auto& [id, session] : mSessions) { (void)id; Loading @@ -273,7 +289,7 @@ std::vector<sp<RpcSession>> RpcServer::listSessions() { } size_t RpcServer::numUninitializedSessions() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); return mConnectingThreads.size(); } Loading Loading @@ -354,12 +370,12 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie } } std::thread thisThread; RpcMaybeThread thisThread; sp<RpcSession> session; { std::unique_lock<std::mutex> _l(server->mLock); RpcMutexUniqueLock _l(server->mLock); auto threadId = server->mConnectingThreads.find(std::this_thread::get_id()); auto threadId = server->mConnectingThreads.find(rpc_this_thread::get_id()); LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(), "Must establish connection on owned thread"); thisThread = std::move(threadId->second); Loading Loading @@ -505,7 +521,7 @@ void RpcServer::onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) LOG_RPC_DETAIL("Dropping session with address %s", base::HexString(id.data(), id.size()).c_str()); std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); auto it = mSessions.find(id); LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %s", base::HexString(id.data(), id.size()).c_str()); Loading @@ -519,17 +535,17 @@ void RpcServer::onSessionIncomingThreadEnded() { } bool RpcServer::hasServer() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); return mServer.ok(); } unique_fd RpcServer::releaseServer() { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); return std::move(mServer); } status_t RpcServer::setupExternalServer(base::unique_fd serverFd) { std::lock_guard<std::mutex> _l(mLock); RpcMutexLockGuard _l(mLock); if (mServer.ok()) { ALOGE("Each RpcServer can only have one server."); return INVALID_OPERATION; Loading
libs/binder/RpcSession.cpp +36 −31 Original line number Diff line number Diff line Loading @@ -21,7 +21,6 @@ #include <dlfcn.h> #include <inttypes.h> #include <poll.h> #include <pthread.h> #include <unistd.h> #include <string_view> Loading Loading @@ -60,7 +59,7 @@ RpcSession::RpcSession(std::unique_ptr<RpcTransportCtx> ctx) : mCtx(std::move(ct RpcSession::~RpcSession() { LOG_RPC_DETAIL("RpcSession destroyed %p", this); std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mConnections.mIncoming.size() != 0, "Should not be able to destroy a session with servers in use."); } Loading @@ -77,7 +76,7 @@ sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTrans } void RpcSession::setMaxIncomingThreads(size_t threads) { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(), "Must set max incoming threads before setting up connections, but has %zu " "client(s) and %zu server(s)", Loading @@ -86,12 +85,12 @@ void RpcSession::setMaxIncomingThreads(size_t threads) { } size_t RpcSession::getMaxIncomingThreads() { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); return mMaxIncomingThreads; } void RpcSession::setMaxOutgoingThreads(size_t threads) { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(), "Must set max outgoing threads before setting up connections, but has %zu " "client(s) and %zu server(s)", Loading @@ -100,7 +99,7 @@ void RpcSession::setMaxOutgoingThreads(size_t threads) { } size_t RpcSession::getMaxOutgoingThreads() { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); return mMaxOutgoingThreads; } Loading @@ -113,7 +112,7 @@ bool RpcSession::setProtocolVersion(uint32_t version) { return false; } std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); if (mProtocolVersion && version > *mProtocolVersion) { ALOGE("Cannot upgrade explicitly capped protocol version %u to newer version %u", *mProtocolVersion, version); Loading @@ -125,7 +124,7 @@ bool RpcSession::setProtocolVersion(uint32_t version) { } std::optional<uint32_t> RpcSession::getProtocolVersion() { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); return mProtocolVersion; } Loading Loading @@ -209,7 +208,7 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { } bool RpcSession::shutdownAndWait(bool wait) { std::unique_lock<std::mutex> _l(mMutex); RpcMutexUniqueLock _l(mMutex); LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed"); mShutdownTrigger->trigger(); Loading @@ -222,6 +221,7 @@ bool RpcSession::shutdownAndWait(bool wait) { } _l.unlock(); mRpcBinderState->clear(); return true; Loading Loading @@ -256,7 +256,7 @@ status_t RpcSession::sendDecStrongToTarget(uint64_t address, size_t target) { status_t RpcSession::readId() { { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mForServer != nullptr, "Can only update ID for client."); } Loading @@ -282,7 +282,7 @@ void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() { mCv.notify_all(); } void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::mutex>& lock, void RpcSession::WaitForShutdownListener::waitForShutdown(RpcMutexUniqueLock& lock, const sp<RpcSession>& session) { while (session->mConnections.mIncoming.size() > 0) { if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) { Loading @@ -293,11 +293,11 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std:: } } void RpcSession::preJoinThreadOwnership(std::thread thread) { LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); void RpcSession::preJoinThreadOwnership(RpcMaybeThread thread) { LOG_ALWAYS_FATAL_IF(thread.get_id() != rpc_this_thread::get_id(), "Must own this thread"); { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); mConnections.mThreads[thread.get_id()] = std::move(thread); } } Loading Loading @@ -404,8 +404,8 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult sp<RpcSession::EventListener> listener; { std::lock_guard<std::mutex> _l(session->mMutex); auto it = session->mConnections.mThreads.find(std::this_thread::get_id()); RpcMutexLockGuard _l(session->mMutex); auto it = session->mConnections.mThreads.find(rpc_this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == session->mConnections.mThreads.end()); it->second.detach(); session->mConnections.mThreads.erase(it); Loading Loading @@ -438,7 +438,7 @@ sp<RpcServer> RpcSession::server() { status_t RpcSession::setupClient(const std::function<status_t(const std::vector<uint8_t>& sessionId, bool incoming)>& connectAndInit) { { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mConnections.mOutgoing.size() != 0, "Must only setup session once, but already has %zu clients", mConnections.mOutgoing.size()); Loading Loading @@ -500,7 +500,11 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector< return status; } #ifdef BINDER_RPC_SINGLE_THREADED constexpr size_t outgoingThreads = 1; #else // BINDER_RPC_SINGLE_THREADED size_t outgoingThreads = std::min(numThreadsAvailable, mMaxOutgoingThreads); #endif // BINDER_RPC_SINGLE_THREADED ALOGI_IF(outgoingThreads != numThreadsAvailable, "Server hints client to start %zu outgoing threads, but client will only start %zu " "because it is preconfigured to start at most %zu outgoing threads.", Loading Loading @@ -655,14 +659,14 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ } status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) { std::mutex mutex; std::condition_variable joinCv; std::unique_lock<std::mutex> lock(mutex); std::thread thread; RpcMutex mutex; RpcConditionVariable joinCv; RpcMutexUniqueLock lock(mutex); RpcMaybeThread thread; sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this); bool ownershipTransferred = false; thread = std::thread([&]() { std::unique_lock<std::mutex> threadLock(mutex); thread = RpcMaybeThread([&]() { RpcMutexUniqueLock threadLock(mutex); std::unique_ptr<RpcTransport> movedRpcTransport = std::move(rpcTransport); // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) sp<RpcSession> session = thiz; Loading @@ -678,6 +682,7 @@ status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTran RpcSession::join(std::move(session), std::move(setupResult)); }); rpcJoinIfSingleThreaded(thread); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); return OK; Loading @@ -697,9 +702,9 @@ status_t RpcSession::initShutdownTrigger() { status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) { sp<RpcConnection> connection = sp<RpcConnection>::make(); { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); connection->rpcTransport = std::move(rpcTransport); connection->exclusiveTid = base::GetThreadId(); connection->exclusiveTid = rpcGetThreadId(); mConnections.mOutgoing.push_back(connection); } Loading Loading @@ -736,7 +741,7 @@ bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListene sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( std::unique_ptr<RpcTransport> rpcTransport) { std::lock_guard<std::mutex> _l(mMutex); RpcMutexLockGuard _l(mMutex); if (mConnections.mIncoming.size() >= mMaxIncomingThreads) { ALOGE("Cannot add thread to session with %zu threads (max is set to %zu)", Loading @@ -754,7 +759,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( sp<RpcConnection> session = sp<RpcConnection>::make(); session->rpcTransport = std::move(rpcTransport); session->exclusiveTid = base::GetThreadId(); session->exclusiveTid = rpcGetThreadId(); mConnections.mIncoming.push_back(session); mConnections.mMaxIncoming = mConnections.mIncoming.size(); Loading @@ -763,7 +768,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( } bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { std::unique_lock<std::mutex> _l(mMutex); RpcMutexUniqueLock _l(mMutex); if (auto it = std::find(mConnections.mIncoming.begin(), mConnections.mIncoming.end(), connection); it != mConnections.mIncoming.end()) { Loading @@ -781,7 +786,7 @@ bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { } void RpcSession::clearConnectionTid(const sp<RpcConnection>& connection) { std::unique_lock<std::mutex> _l(mMutex); RpcMutexUniqueLock _l(mMutex); connection->exclusiveTid = std::nullopt; if (mConnections.mWaitingThreads > 0) { _l.unlock(); Loading @@ -799,8 +804,8 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co connection->mConnection = nullptr; connection->mReentrant = false; uint64_t tid = base::GetThreadId(); std::unique_lock<std::mutex> _l(session->mMutex); uint64_t tid = rpcGetThreadId(); RpcMutexUniqueLock _l(session->mMutex); session->mConnections.mWaitingThreads++; while (true) { Loading