Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f137de90 authored by Steven Moreland's avatar Steven Moreland
Browse files

libbinder: finalize connect/server APIs

Before, you needed to manually setup the required number of sockets on
the client and server sides of a connection and manually setup threads.
Now, you configure the thread count on RpcServer and call join once, and
on the client side, you connect once, and the connection figured out how
many connections it will make.

Now, we will be able to manage how these sockets/threads get setup
without affecting any client code in various tests.

So, a server looks like this:

    sp<RpcServer> server = RpcServer::make();
    // still until we are ready to open this up
    server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
    server->setMaxThreads(3 /* for example */);
    // call this for each client (currently this must be setup in
    // advance)
    sp<RpcConnection> connection = server->addClientConnection();
    // other server types are supported
    if (!connection->setupInetServer(1234 /*some port*/)) .. error ..
    // process requests for each client
    server->join();

And a client looks like this:

    sp<RpcConnection> connection = RpcConnection::make();
    if (!connection->setupInetClient(/*some IP address*/, 1234 /*some port*/))
        .. error ..

The above code will create 3 threads on the server serving 3 separate
socket connections that the client can use to make up to 3 simultaneous
sets of syncrhonous calls (this can't be shared because the sockets may
be needed for binder socket calls).

This means that each address (ip + port) in this case can server a single process.

Future considerations:
- if we wanted, we could dynamically setup this connection, so that
  extra threads and sockets are only created as needed. This would be at
  parity with binder, but also it opens up the possibility for later
  errors. TODOs are added in the code for this.
- a single server should be able to share a threadpool between multiple
  clients. Currently a new threadpool is created for each client.
- new client connections should be able to be setup dynamically.
  Currently, once the threadpool is started, we don't support making
  more connections, but we should.

Bug: 185167543
Test: binderRpcTest
Change-Id: I4c11ab64bf7c1c19ca67f6a1c4be21de52358a5c
parent 27f2ed69
Loading
Loading
Loading
Loading
+44 −7
Original line number Original line Diff line number Diff line
@@ -139,8 +139,8 @@ bool RpcConnection::setupUnixDomainServer(const char* path) {
    return setupSocketServer(UnixSocketAddress(path));
    return setupSocketServer(UnixSocketAddress(path));
}
}


bool RpcConnection::addUnixDomainClient(const char* path) {
bool RpcConnection::setupUnixDomainClient(const char* path) {
    return addSocketClient(UnixSocketAddress(path));
    return setupSocketClient(UnixSocketAddress(path));
}
}


#ifdef __BIONIC__
#ifdef __BIONIC__
@@ -171,8 +171,8 @@ bool RpcConnection::setupVsockServer(unsigned int port) {
    return setupSocketServer(VsockSocketAddress(kAnyCid, port));
    return setupSocketServer(VsockSocketAddress(kAnyCid, port));
}
}


bool RpcConnection::addVsockClient(unsigned int cid, unsigned int port) {
bool RpcConnection::setupVsockClient(unsigned int cid, unsigned int port) {
    return addSocketClient(VsockSocketAddress(cid, port));
    return setupSocketClient(VsockSocketAddress(cid, port));
}
}


#endif // __BIONIC__
#endif // __BIONIC__
@@ -240,12 +240,12 @@ bool RpcConnection::setupInetServer(unsigned int port, unsigned int* assignedPor
    return false;
    return false;
}
}


bool RpcConnection::addInetClient(const char* addr, unsigned int port) {
bool RpcConnection::setupInetClient(const char* addr, unsigned int port) {
    auto aiStart = GetAddrInfo(addr, port);
    auto aiStart = GetAddrInfo(addr, port);
    if (aiStart == nullptr) return false;
    if (aiStart == nullptr) return false;
    for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) {
    for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) {
        InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, addr, port);
        InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, addr, port);
        if (addSocketClient(socketAddress)) return true;
        if (setupSocketClient(socketAddress)) return true;
    }
    }
    ALOGE("None of the socket address resolved for %s:%u can be added as inet client.", addr, port);
    ALOGE("None of the socket address resolved for %s:%u can be added as inet client.", addr, port);
    return false;
    return false;
@@ -268,6 +268,11 @@ sp<IBinder> RpcConnection::getRootObject() {
    return state()->getRootObject(socket.fd(), sp<RpcConnection>::fromExisting(this));
    return state()->getRootObject(socket.fd(), sp<RpcConnection>::fromExisting(this));
}
}


status_t RpcConnection::getMaxThreads(size_t* maxThreads) {
    ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), SocketUse::CLIENT);
    return state()->getMaxThreads(socket.fd(), sp<RpcConnection>::fromExisting(this), maxThreads);
}

status_t RpcConnection::transact(const RpcAddress& address, uint32_t code, const Parcel& data,
status_t RpcConnection::transact(const RpcAddress& address, uint32_t code, const Parcel& data,
                                 Parcel* reply, uint32_t flags) {
                                 Parcel* reply, uint32_t flags) {
    ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this),
    ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this),
@@ -348,7 +353,39 @@ bool RpcConnection::setupSocketServer(const SocketAddress& addr) {
    return true;
    return true;
}
}


bool RpcConnection::addSocketClient(const SocketAddress& addr) {
bool RpcConnection::setupSocketClient(const SocketAddress& addr) {
    {
        std::lock_guard<std::mutex> _l(mSocketMutex);
        LOG_ALWAYS_FATAL_IF(mClients.size() != 0,
                            "Must only setup connection once, but already has %zu clients",
                            mClients.size());
    }

    if (!setupOneSocketClient(addr)) return false;

    // TODO(b/185167543): we should add additional connections dynamically
    // instead of all at once.
    // TODO(b/186470974): first risk of blocking
    size_t numThreadsAvailable;
    if (status_t status = getMaxThreads(&numThreadsAvailable); status != OK) {
        ALOGE("Could not get max threads after initial connection to %s: %s",
              addr.toString().c_str(), statusToString(status).c_str());
        return false;
    }

    // we've already setup one client
    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
        // TODO(b/185167543): avoid race w/ accept4 not being called on server
        for (size_t tries = 0; tries < 5; tries++) {
            if (setupOneSocketClient(addr)) break;
            usleep(10000);
        }
    }

    return true;
}

bool RpcConnection::setupOneSocketClient(const SocketAddress& addr) {
    unique_fd serverFd(
    unique_fd serverFd(
            TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
            TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
    if (serverFd == -1) {
    if (serverFd == -1) {
+42 −7
Original line number Original line Diff line number Diff line
@@ -19,6 +19,7 @@
#include <sys/socket.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/un.h>


#include <thread>
#include <vector>
#include <vector>


#include <binder/Parcel.h>
#include <binder/Parcel.h>
@@ -41,6 +42,31 @@ void RpcServer::iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction()
    mAgreedExperimental = true;
    mAgreedExperimental = true;
}
}


void RpcServer::setMaxThreads(size_t threads) {
    LOG_ALWAYS_FATAL_IF(threads <= 0, "RpcServer is useless without threads");
    {
        // this lock should only ever be needed in the error case
        std::lock_guard<std::mutex> _l(mLock);
        LOG_ALWAYS_FATAL_IF(mConnections.size() > 0,
                            "Must specify max threads before creating a connection");
    }
    mMaxThreads = threads;
}

size_t RpcServer::getMaxThreads() {
    return mMaxThreads;
}

void RpcServer::setRootObject(const sp<IBinder>& binder) {
    std::lock_guard<std::mutex> _l(mLock);
    mRootObject = binder;
}

sp<IBinder> RpcServer::getRootObject() {
    std::lock_guard<std::mutex> _l(mLock);
    return mRootObject;
}

sp<RpcConnection> RpcServer::addClientConnection() {
sp<RpcConnection> RpcServer::addClientConnection() {
    LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
    LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");


@@ -48,19 +74,28 @@ sp<RpcConnection> RpcServer::addClientConnection() {
    connection->setForServer(sp<RpcServer>::fromExisting(this));
    connection->setForServer(sp<RpcServer>::fromExisting(this));
    {
    {
        std::lock_guard<std::mutex> _l(mLock);
        std::lock_guard<std::mutex> _l(mLock);
        LOG_ALWAYS_FATAL_IF(mStarted,
                            "currently only supports adding client connections at creation time");
        mConnections.push_back(connection);
        mConnections.push_back(connection);
    }
    }
    return connection;
    return connection;
}
}


void RpcServer::setRootObject(const sp<IBinder>& binder) {
void RpcServer::join() {
    std::vector<std::thread> pool;
    {
        std::lock_guard<std::mutex> _l(mLock);
        std::lock_guard<std::mutex> _l(mLock);
    mRootObject = binder;
        mStarted = true;
        for (const sp<RpcConnection>& connection : mConnections) {
            for (size_t i = 0; i < mMaxThreads; i++) {
                pool.push_back(std::thread([=] { connection->join(); }));
            }
        }
    }
    }


sp<IBinder> RpcServer::getRootObject() {
    // TODO(b/185167543): don't waste extra thread for join, and combine threads
    std::lock_guard<std::mutex> _l(mLock);
    // between clients
    return mRootObject;
    for (auto& t : pool) t.join();
}
}


} // namespace android
} // namespace android
+42 −15
Original line number Original line Diff line number Diff line
@@ -248,6 +248,31 @@ sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd,
    return reply.readStrongBinder();
    return reply.readStrongBinder();
}
}


status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcConnection>& connection,
                                 size_t* maxThreads) {
    Parcel data;
    data.markForRpc(connection);
    Parcel reply;

    status_t status = transact(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS, data,
                               connection, &reply, 0);
    if (status != OK) {
        ALOGE("Error getting max threads: %s", statusToString(status).c_str());
        return status;
    }

    int32_t threads;
    status = reply.readInt32(&threads);
    if (status != OK) return status;
    if (threads <= 0) {
        ALOGE("Error invalid max threads: %d", threads);
        return BAD_VALUE;
    }

    *maxThreads = threads;
    return OK;
}

status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address, uint32_t code,
status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address, uint32_t code,
                            const Parcel& data, const sp<RpcConnection>& connection, Parcel* reply,
                            const Parcel& data, const sp<RpcConnection>& connection, Parcel* reply,
                            uint32_t flags) {
                            uint32_t flags) {
@@ -516,24 +541,26 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd,
            replyStatus = target->transact(transaction->code, data, &reply, transaction->flags);
            replyStatus = target->transact(transaction->code, data, &reply, transaction->flags);
        } else {
        } else {
            LOG_RPC_DETAIL("Got special transaction %u", transaction->code);
            LOG_RPC_DETAIL("Got special transaction %u", transaction->code);

            sp<RpcServer> server = connection->server().promote();
            if (server) {
                // special case for 'zero' address (special server commands)
                // special case for 'zero' address (special server commands)
                switch (transaction->code) {
                switch (transaction->code) {
                    case RPC_SPECIAL_TRANSACT_GET_ROOT: {
                    case RPC_SPECIAL_TRANSACT_GET_ROOT: {
                    sp<IBinder> root;
                        replyStatus = reply.writeStrongBinder(server->getRootObject());
                    sp<RpcServer> server = connection->server().promote();
                        break;
                    if (server) {
                        root = server->getRootObject();
                    } else {
                        ALOGE("Root object requested, but no server attached.");
                    }
                    }

                    case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: {
                    replyStatus = reply.writeStrongBinder(root);
                        replyStatus = reply.writeInt32(server->getMaxThreads());
                        break;
                        break;
                    }
                    }
                    default: {
                    default: {
                        replyStatus = UNKNOWN_TRANSACTION;
                        replyStatus = UNKNOWN_TRANSACTION;
                    }
                    }
                }
                }
            } else {
                ALOGE("Special command sent, but no server object attached.");
            }
        }
        }
    }
    }


+2 −0
Original line number Original line Diff line number Diff line
@@ -51,6 +51,8 @@ public:
    ~RpcState();
    ~RpcState();


    sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcConnection>& connection);
    sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcConnection>& connection);
    status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcConnection>& connection,
                           size_t* maxThreadsOut);


    [[nodiscard]] status_t transact(const base::unique_fd& fd, const RpcAddress& address,
    [[nodiscard]] status_t transact(const base::unique_fd& fd, const RpcAddress& address,
                                    uint32_t code, const Parcel& data,
                                    uint32_t code, const Parcel& data,
+1 −0
Original line number Original line Diff line number Diff line
@@ -47,6 +47,7 @@ enum : uint32_t {
 */
 */
enum : uint32_t {
enum : uint32_t {
    RPC_SPECIAL_TRANSACT_GET_ROOT = 0,
    RPC_SPECIAL_TRANSACT_GET_ROOT = 0,
    RPC_SPECIAL_TRANSACT_GET_MAX_THREADS = 1,
};
};


// serialization is like:
// serialization is like:
Loading