Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 50f6e980 authored by Steven Moreland's avatar Steven Moreland Committed by Automerger Merge Worker
Browse files

Merge "libbinder: RPC read connection info on 2nd thread" am: bdc80541 am:...

Merge "libbinder: RPC read connection info on 2nd thread" am: bdc80541 am: 806bd998 am: 042d06b8

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1704016

Change-Id: I6b51313c2eddf548023e32c8847df2f2b7938fa1
parents 281cef26 042d06b8
Loading
Loading
Loading
Loading
+54 −30
Original line number Diff line number Diff line
@@ -132,21 +132,49 @@ void RpcServer::join() {
        }
        LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());

        // TODO(b/183988761): cannot trust this simple ID, should not block this
        // thread
        {
            std::lock_guard<std::mutex> _l(mLock);
            std::thread thread =
                    std::thread(&RpcServer::establishConnection, this,
                                std::move(sp<RpcServer>::fromExisting(this)), std::move(clientFd));
            mConnectingThreads[thread.get_id()] = std::move(thread);
        }
    }
}

std::vector<sp<RpcSession>> RpcServer::listSessions() {
    std::lock_guard<std::mutex> _l(mLock);
    std::vector<sp<RpcSession>> sessions;
    for (auto& [id, session] : mSessions) {
        (void)id;
        sessions.push_back(session);
    }
    return sessions;
}

void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) {
    LOG_ALWAYS_FATAL_IF(this != server.get(), "Must pass same ownership object");

    // TODO(b/183988761): cannot trust this simple ID
    LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
    int32_t id;
    if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) {
        ALOGE("Could not read ID from fd %d", clientFd.get());
            continue;
        return;
    }

    std::thread thisThread;
    sp<RpcSession> session;
    {
        std::lock_guard<std::mutex> _l(mLock);

            sp<RpcSession> session;
        auto threadId = mConnectingThreads.find(std::this_thread::get_id());
        LOG_ALWAYS_FATAL_IF(threadId == mConnectingThreads.end(),
                            "Must establish connection on owned thread");
        thisThread = std::move(threadId->second);
        mConnectingThreads.erase(threadId);

        if (id == RPC_SESSION_ID_NEW) {
                // new client!
            LOG_ALWAYS_FATAL_IF(mSessionIdCounter >= INT32_MAX, "Out of session IDs");
            mSessionIdCounter++;

@@ -158,24 +186,20 @@ void RpcServer::join() {
            auto it = mSessions.find(id);
            if (it == mSessions.end()) {
                ALOGE("Cannot add thread, no record of session with ID %d", id);
                    continue;
                return;
            }
            session = it->second;
        }

            session->startThread(std::move(clientFd));
        }
    }
    }

std::vector<sp<RpcSession>> RpcServer::listSessions() {
    std::lock_guard<std::mutex> _l(mLock);
    std::vector<sp<RpcSession>> sessions;
    for (auto& [id, session] : mSessions) {
        (void)id;
        sessions.push_back(session);
    }
    return sessions;
    // avoid strong cycle
    server = nullptr;
    //
    //
    // DO NOT ACCESS MEMBER VARIABLES BELOW
    //

    session->join(std::move(thisThread), std::move(clientFd));
}

bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
+15 −17
Original line number Diff line number Diff line
@@ -131,24 +131,14 @@ status_t RpcSession::readId() {
    return OK;
}

void RpcSession::startThread(unique_fd client) {
    std::lock_guard<std::mutex> _l(mMutex);
    sp<RpcSession> holdThis = sp<RpcSession>::fromExisting(this);
    int fd = client.release();
    auto thread = std::thread([=] {
        holdThis->join(unique_fd(fd));
void RpcSession::join(std::thread thread, unique_fd client) {
    LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");

    {
            std::lock_guard<std::mutex> _l(holdThis->mMutex);
            auto it = mThreads.find(std::this_thread::get_id());
            LOG_ALWAYS_FATAL_IF(it == mThreads.end());
            it->second.detach();
            mThreads.erase(it);
        }
    });
        std::lock_guard<std::mutex> _l(mMutex);
        mThreads[thread.get_id()] = std::move(thread);
    }

void RpcSession::join(unique_fd client) {
    // must be registered to allow arbitrary client code executing commands to
    // be able to do nested calls (we can't only read from it)
    sp<RpcConnection> connection = assignServerToThisThread(std::move(client));
@@ -165,6 +155,14 @@ void RpcSession::join(unique_fd client) {

    LOG_ALWAYS_FATAL_IF(!removeServerConnection(connection),
                        "bad state: connection object guaranteed to be in list");

    {
        std::lock_guard<std::mutex> _l(mMutex);
        auto it = mThreads.find(std::this_thread::get_id());
        LOG_ALWAYS_FATAL_IF(it == mThreads.end());
        it->second.detach();
        mThreads.erase(it);
    }
}

void RpcSession::terminateLocked() {
+3 −0
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@
#include <utils/RefBase.h>

#include <mutex>
#include <thread>

// WARNING: This is a feature which is still in development, and it is subject
// to radical change. Any production use of this may subject your code to any
@@ -115,6 +116,7 @@ private:
    friend sp<RpcServer>;
    RpcServer();

    void establishConnection(sp<RpcServer>&& session, base::unique_fd clientFd);
    bool setupSocketServer(const RpcSocketAddress& address);

    bool mAgreedExperimental = false;
@@ -123,6 +125,7 @@ private:
    base::unique_fd mServer; // socket we are accepting sessions on

    std::mutex mLock; // for below
    std::map<std::thread::id, std::thread> mConnectingThreads;
    sp<IBinder> mRootObject;
    std::map<int32_t, sp<RpcSession>> mSessions;
    int32_t mSessionIdCounter = 0;
+1 −2
Original line number Diff line number Diff line
@@ -114,8 +114,7 @@ private:

    status_t readId();

    void startThread(base::unique_fd client);
    void join(base::unique_fd client);
    void join(std::thread thread, base::unique_fd client);
    void terminateLocked();

    struct RpcConnection : public RefBase {