Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1b1e2be7 authored by Steven Moreland's avatar Steven Moreland Committed by Automerger Merge Worker
Browse files

Merge "libbinder: RPC socket bound to service thread" am: 40c3c041 am: 44890c30

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1680506

Change-Id: I8f9ed7ad41a694e001ca4d3ef2c858417833bd1f
parents 4446d109 44890c30
Loading
Loading
Loading
Loading
+56 −49
Original line number Original line Diff line number Diff line
@@ -57,6 +57,10 @@ RpcConnection::RpcConnection() {
}
}
RpcConnection::~RpcConnection() {
RpcConnection::~RpcConnection() {
    LOG_RPC_DETAIL("RpcConnection destroyed %p", this);
    LOG_RPC_DETAIL("RpcConnection destroyed %p", this);

    std::lock_guard<std::mutex> _l(mSocketMutex);
    LOG_ALWAYS_FATAL_IF(mServers.size() != 0,
                        "Should not be able to destroy a connection with servers in use.");
}
}


sp<RpcConnection> RpcConnection::make() {
sp<RpcConnection> RpcConnection::make() {
@@ -222,8 +226,8 @@ status_t RpcConnection::sendDecStrong(const RpcAddress& address) {
}
}


void RpcConnection::join() {
void RpcConnection::join() {
    // establish a connection
    // TODO(b/185167543): do this dynamically, instead of from a static number
    {
    // of threads
    unique_fd clientFd(
    unique_fd clientFd(
            TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC)));
            TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC)));
    if (clientFd < 0) {
    if (clientFd < 0) {
@@ -235,23 +239,22 @@ void RpcConnection::join() {


    LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
    LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());


        assignServerToThisThread(std::move(clientFd));
    // must be registered to allow arbitrary client code executing commands to
    }
    // be able to do nested calls (we can't only read from it)

    sp<ConnectionSocket> socket = assignServerToThisThread(std::move(clientFd));
    // We may not use the connection we just established (two threads might
    // establish connections for each other), but for now, just use one
    // server/socket connection.
    ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), SocketUse::SERVER);


    while (true) {
    while (true) {
        status_t error =
        status_t error =
                state()->getAndExecuteCommand(socket.fd(), sp<RpcConnection>::fromExisting(this));
                state()->getAndExecuteCommand(socket->fd, sp<RpcConnection>::fromExisting(this));


        if (error != OK) {
        if (error != OK) {
            ALOGI("Binder socket thread closing w/ status %s", statusToString(error).c_str());
            ALOGI("Binder socket thread closing w/ status %s", statusToString(error).c_str());
            return;
            break;
        }
        }
    }
    }

    LOG_ALWAYS_FATAL_IF(!removeServerSocket(socket),
                        "bad state: socket object guaranteed to be in list");
}
}


void RpcConnection::setForServer(const wp<RpcServer>& server) {
void RpcConnection::setForServer(const wp<RpcServer>& server) {
@@ -316,11 +319,23 @@ void RpcConnection::addClient(unique_fd&& fd) {
    mClients.push_back(connection);
    mClients.push_back(connection);
}
}


void RpcConnection::assignServerToThisThread(unique_fd&& fd) {
sp<RpcConnection::ConnectionSocket> RpcConnection::assignServerToThisThread(unique_fd&& fd) {
    std::lock_guard<std::mutex> _l(mSocketMutex);
    std::lock_guard<std::mutex> _l(mSocketMutex);
    sp<ConnectionSocket> connection = sp<ConnectionSocket>::make();
    sp<ConnectionSocket> connection = sp<ConnectionSocket>::make();
    connection->fd = std::move(fd);
    connection->fd = std::move(fd);
    connection->exclusiveTid = gettid();
    mServers.push_back(connection);
    mServers.push_back(connection);

    return connection;
}

bool RpcConnection::removeServerSocket(const sp<ConnectionSocket>& socket) {
    std::lock_guard<std::mutex> _l(mSocketMutex);
    if (auto it = std::find(mServers.begin(), mServers.end(), socket); it != mServers.end()) {
        mServers.erase(it);
        return true;
    }
    return false;
}
}


RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connection, SocketUse use)
RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connection, SocketUse use)
@@ -335,10 +350,8 @@ RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connect


        // CHECK FOR DEDICATED CLIENT SOCKET
        // CHECK FOR DEDICATED CLIENT SOCKET
        //
        //
        // A server/looper should always use a dedicated connection.
        // A server/looper should always use a dedicated connection if available
        if (use != SocketUse::SERVER) {
        findSocket(tid, &exclusive, &available, mConnection->mClients, mConnection->mClientsOffset);
            findSocket(tid, &exclusive, &available, mConnection->mClients,
                       mConnection->mClientsOffset);


        // WARNING: this assumes a server cannot request its client to send
        // WARNING: this assumes a server cannot request its client to send
        // a transaction, as mServers is excluded below.
        // a transaction, as mServers is excluded below.
@@ -354,18 +367,14 @@ RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connect
            mConnection->mClientsOffset =
            mConnection->mClientsOffset =
                    (mConnection->mClientsOffset + 1) % mConnection->mClients.size();
                    (mConnection->mClientsOffset + 1) % mConnection->mClients.size();
        }
        }
        }


        // USE SERVING SOCKET (to start serving or for nested transaction)
        // USE SERVING SOCKET (for nested transaction)
        //
        //
        // asynchronous calls cannot be nested
        // asynchronous calls cannot be nested
        if (use != SocketUse::CLIENT_ASYNC) {
        if (use != SocketUse::CLIENT_ASYNC) {
            // servers should start serving on an available thread only
            // server sockets are always assigned to a thread
            // otherwise, this should only be a nested call
            findSocket(tid, &exclusive, nullptr /*available*/, mConnection->mServers,
            bool useAvailable = use == SocketUse::SERVER;
                       0 /* index hint */);

            findSocket(tid, &exclusive, (useAvailable ? &available : nullptr),
                       mConnection->mServers, 0 /* index hint */);
        }
        }


        // if our thread is already using a connection, prioritize using that
        // if our thread is already using a connection, prioritize using that
@@ -379,8 +388,6 @@ RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connect
            break;
            break;
        }
        }


        LOG_ALWAYS_FATAL_IF(use == SocketUse::SERVER, "Must create connection to join one.");

        // in regular binder, this would usually be a deadlock :)
        // in regular binder, this would usually be a deadlock :)
        LOG_ALWAYS_FATAL_IF(mConnection->mClients.size() == 0,
        LOG_ALWAYS_FATAL_IF(mConnection->mClients.size() == 0,
                            "Not a client of any connection. You must create a connection to an "
                            "Not a client of any connection. You must create a connection to an "
+6 −6
Original line number Original line Diff line number Diff line
@@ -128,11 +128,6 @@ private:
    friend sp<RpcConnection>;
    friend sp<RpcConnection>;
    RpcConnection();
    RpcConnection();


    bool setupSocketServer(const SocketAddress& address);
    bool addSocketClient(const SocketAddress& address);
    void addClient(base::unique_fd&& fd);
    void assignServerToThisThread(base::unique_fd&& fd);

    struct ConnectionSocket : public RefBase {
    struct ConnectionSocket : public RefBase {
        base::unique_fd fd;
        base::unique_fd fd;


@@ -141,11 +136,16 @@ private:
        std::optional<pid_t> exclusiveTid;
        std::optional<pid_t> exclusiveTid;
    };
    };


    bool setupSocketServer(const SocketAddress& address);
    bool addSocketClient(const SocketAddress& address);
    void addClient(base::unique_fd&& fd);
    sp<ConnectionSocket> assignServerToThisThread(base::unique_fd&& fd);
    bool removeServerSocket(const sp<ConnectionSocket>& socket);

    enum class SocketUse {
    enum class SocketUse {
        CLIENT,
        CLIENT,
        CLIENT_ASYNC,
        CLIENT_ASYNC,
        CLIENT_REFCOUNT,
        CLIENT_REFCOUNT,
        SERVER,
    };
    };


    // RAII object for connection socket
    // RAII object for connection socket