Loading libs/binder/RpcServer.cpp +54 −30 Original line number Diff line number Diff line Loading @@ -132,21 +132,49 @@ void RpcServer::join() { } LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); // TODO(b/183988761): cannot trust this simple ID, should not block this // thread { std::lock_guard<std::mutex> _l(mLock); std::thread thread = std::thread(&RpcServer::establishConnection, this, std::move(sp<RpcServer>::fromExisting(this)), std::move(clientFd)); mConnectingThreads[thread.get_id()] = std::move(thread); } } } std::vector<sp<RpcSession>> RpcServer::listSessions() { std::lock_guard<std::mutex> _l(mLock); std::vector<sp<RpcSession>> sessions; for (auto& [id, session] : mSessions) { (void)id; sessions.push_back(session); } return sessions; } void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) { LOG_ALWAYS_FATAL_IF(this != server.get(), "Must pass same ownership object"); // TODO(b/183988761): cannot trust this simple ID LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); int32_t id; if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) { ALOGE("Could not read ID from fd %d", clientFd.get()); continue; return; } std::thread thisThread; sp<RpcSession> session; { std::lock_guard<std::mutex> _l(mLock); sp<RpcSession> session; auto threadId = mConnectingThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(threadId == mConnectingThreads.end(), "Must establish connection on owned thread"); thisThread = std::move(threadId->second); mConnectingThreads.erase(threadId); if (id == RPC_SESSION_ID_NEW) { // new client! LOG_ALWAYS_FATAL_IF(mSessionIdCounter >= INT32_MAX, "Out of session IDs"); mSessionIdCounter++; Loading @@ -158,24 +186,20 @@ void RpcServer::join() { auto it = mSessions.find(id); if (it == mSessions.end()) { ALOGE("Cannot add thread, no record of session with ID %d", id); continue; return; } session = it->second; } session->startThread(std::move(clientFd)); } } } std::vector<sp<RpcSession>> RpcServer::listSessions() { std::lock_guard<std::mutex> _l(mLock); std::vector<sp<RpcSession>> sessions; for (auto& [id, session] : mSessions) { (void)id; sessions.push_back(session); } return sessions; // avoid strong cycle server = nullptr; // // // DO NOT ACCESS MEMBER VARIABLES BELOW // session->join(std::move(thisThread), std::move(clientFd)); } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { Loading libs/binder/RpcSession.cpp +15 −17 Original line number Diff line number Diff line Loading @@ -131,24 +131,14 @@ status_t RpcSession::readId() { return OK; } void RpcSession::startThread(unique_fd client) { std::lock_guard<std::mutex> _l(mMutex); sp<RpcSession> holdThis = sp<RpcSession>::fromExisting(this); int fd = client.release(); auto thread = std::thread([=] { holdThis->join(unique_fd(fd)); void RpcSession::join(std::thread thread, unique_fd client) { LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); { std::lock_guard<std::mutex> _l(holdThis->mMutex); auto it = mThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == mThreads.end()); it->second.detach(); mThreads.erase(it); } }); std::lock_guard<std::mutex> _l(mMutex); mThreads[thread.get_id()] = std::move(thread); } void RpcSession::join(unique_fd client) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) sp<RpcConnection> connection = assignServerToThisThread(std::move(client)); Loading @@ -165,6 +155,14 @@ void RpcSession::join(unique_fd client) { LOG_ALWAYS_FATAL_IF(!removeServerConnection(connection), "bad state: connection object guaranteed to be in list"); { std::lock_guard<std::mutex> _l(mMutex); auto it = mThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == mThreads.end()); it->second.detach(); mThreads.erase(it); } } void RpcSession::terminateLocked() { Loading libs/binder/include/binder/RpcServer.h +3 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ #include <utils/RefBase.h> #include <mutex> #include <thread> // WARNING: This is a feature which is still in development, and it is subject // to radical change. Any production use of this may subject your code to any Loading Loading @@ -115,6 +116,7 @@ private: friend sp<RpcServer>; RpcServer(); void establishConnection(sp<RpcServer>&& session, base::unique_fd clientFd); bool setupSocketServer(const RpcSocketAddress& address); bool mAgreedExperimental = false; Loading @@ -123,6 +125,7 @@ private: base::unique_fd mServer; // socket we are accepting sessions on std::mutex mLock; // for below std::map<std::thread::id, std::thread> mConnectingThreads; sp<IBinder> mRootObject; std::map<int32_t, sp<RpcSession>> mSessions; int32_t mSessionIdCounter = 0; Loading libs/binder/include/binder/RpcSession.h +1 −2 Original line number Diff line number Diff line Loading @@ -114,8 +114,7 @@ private: status_t readId(); void startThread(base::unique_fd client); void join(base::unique_fd client); void join(std::thread thread, base::unique_fd client); void terminateLocked(); struct RpcConnection : public RefBase { Loading Loading
libs/binder/RpcServer.cpp +54 −30 Original line number Diff line number Diff line Loading @@ -132,21 +132,49 @@ void RpcServer::join() { } LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); // TODO(b/183988761): cannot trust this simple ID, should not block this // thread { std::lock_guard<std::mutex> _l(mLock); std::thread thread = std::thread(&RpcServer::establishConnection, this, std::move(sp<RpcServer>::fromExisting(this)), std::move(clientFd)); mConnectingThreads[thread.get_id()] = std::move(thread); } } } std::vector<sp<RpcSession>> RpcServer::listSessions() { std::lock_guard<std::mutex> _l(mLock); std::vector<sp<RpcSession>> sessions; for (auto& [id, session] : mSessions) { (void)id; sessions.push_back(session); } return sessions; } void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) { LOG_ALWAYS_FATAL_IF(this != server.get(), "Must pass same ownership object"); // TODO(b/183988761): cannot trust this simple ID LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); int32_t id; if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) { ALOGE("Could not read ID from fd %d", clientFd.get()); continue; return; } std::thread thisThread; sp<RpcSession> session; { std::lock_guard<std::mutex> _l(mLock); sp<RpcSession> session; auto threadId = mConnectingThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(threadId == mConnectingThreads.end(), "Must establish connection on owned thread"); thisThread = std::move(threadId->second); mConnectingThreads.erase(threadId); if (id == RPC_SESSION_ID_NEW) { // new client! LOG_ALWAYS_FATAL_IF(mSessionIdCounter >= INT32_MAX, "Out of session IDs"); mSessionIdCounter++; Loading @@ -158,24 +186,20 @@ void RpcServer::join() { auto it = mSessions.find(id); if (it == mSessions.end()) { ALOGE("Cannot add thread, no record of session with ID %d", id); continue; return; } session = it->second; } session->startThread(std::move(clientFd)); } } } std::vector<sp<RpcSession>> RpcServer::listSessions() { std::lock_guard<std::mutex> _l(mLock); std::vector<sp<RpcSession>> sessions; for (auto& [id, session] : mSessions) { (void)id; sessions.push_back(session); } return sessions; // avoid strong cycle server = nullptr; // // // DO NOT ACCESS MEMBER VARIABLES BELOW // session->join(std::move(thisThread), std::move(clientFd)); } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { Loading
libs/binder/RpcSession.cpp +15 −17 Original line number Diff line number Diff line Loading @@ -131,24 +131,14 @@ status_t RpcSession::readId() { return OK; } void RpcSession::startThread(unique_fd client) { std::lock_guard<std::mutex> _l(mMutex); sp<RpcSession> holdThis = sp<RpcSession>::fromExisting(this); int fd = client.release(); auto thread = std::thread([=] { holdThis->join(unique_fd(fd)); void RpcSession::join(std::thread thread, unique_fd client) { LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); { std::lock_guard<std::mutex> _l(holdThis->mMutex); auto it = mThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == mThreads.end()); it->second.detach(); mThreads.erase(it); } }); std::lock_guard<std::mutex> _l(mMutex); mThreads[thread.get_id()] = std::move(thread); } void RpcSession::join(unique_fd client) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) sp<RpcConnection> connection = assignServerToThisThread(std::move(client)); Loading @@ -165,6 +155,14 @@ void RpcSession::join(unique_fd client) { LOG_ALWAYS_FATAL_IF(!removeServerConnection(connection), "bad state: connection object guaranteed to be in list"); { std::lock_guard<std::mutex> _l(mMutex); auto it = mThreads.find(std::this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == mThreads.end()); it->second.detach(); mThreads.erase(it); } } void RpcSession::terminateLocked() { Loading
libs/binder/include/binder/RpcServer.h +3 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ #include <utils/RefBase.h> #include <mutex> #include <thread> // WARNING: This is a feature which is still in development, and it is subject // to radical change. Any production use of this may subject your code to any Loading Loading @@ -115,6 +116,7 @@ private: friend sp<RpcServer>; RpcServer(); void establishConnection(sp<RpcServer>&& session, base::unique_fd clientFd); bool setupSocketServer(const RpcSocketAddress& address); bool mAgreedExperimental = false; Loading @@ -123,6 +125,7 @@ private: base::unique_fd mServer; // socket we are accepting sessions on std::mutex mLock; // for below std::map<std::thread::id, std::thread> mConnectingThreads; sp<IBinder> mRootObject; std::map<int32_t, sp<RpcSession>> mSessions; int32_t mSessionIdCounter = 0; Loading
libs/binder/include/binder/RpcSession.h +1 −2 Original line number Diff line number Diff line Loading @@ -114,8 +114,7 @@ private: status_t readId(); void startThread(base::unique_fd client); void join(base::unique_fd client); void join(std::thread thread, base::unique_fd client); void terminateLocked(); struct RpcConnection : public RefBase { Loading