Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7f6aa983 authored by Steven Moreland's avatar Steven Moreland Committed by Automerger Merge Worker
Browse files

Merge "libbinder: finalize connect/server APIs" am: a98286eb am: 405fe6a3...

Merge "libbinder: finalize connect/server APIs" am: a98286eb am: 405fe6a3 am: ab29b2bd am: 315cd9a8

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1683484

Change-Id: Ia2ded3f144f3a5dbf9ca7d2e7d499c98e1362e6b
parents f82a3916 315cd9a8
Loading
Loading
Loading
Loading
+44 −7
Original line number Diff line number Diff line
@@ -139,8 +139,8 @@ bool RpcConnection::setupUnixDomainServer(const char* path) {
    return setupSocketServer(UnixSocketAddress(path));
}

bool RpcConnection::addUnixDomainClient(const char* path) {
    return addSocketClient(UnixSocketAddress(path));
bool RpcConnection::setupUnixDomainClient(const char* path) {
    return setupSocketClient(UnixSocketAddress(path));
}

#ifdef __BIONIC__
@@ -171,8 +171,8 @@ bool RpcConnection::setupVsockServer(unsigned int port) {
    return setupSocketServer(VsockSocketAddress(kAnyCid, port));
}

bool RpcConnection::addVsockClient(unsigned int cid, unsigned int port) {
    return addSocketClient(VsockSocketAddress(cid, port));
bool RpcConnection::setupVsockClient(unsigned int cid, unsigned int port) {
    return setupSocketClient(VsockSocketAddress(cid, port));
}

#endif // __BIONIC__
@@ -240,12 +240,12 @@ bool RpcConnection::setupInetServer(unsigned int port, unsigned int* assignedPor
    return false;
}

bool RpcConnection::addInetClient(const char* addr, unsigned int port) {
bool RpcConnection::setupInetClient(const char* addr, unsigned int port) {
    auto aiStart = GetAddrInfo(addr, port);
    if (aiStart == nullptr) return false;
    for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) {
        InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, addr, port);
        if (addSocketClient(socketAddress)) return true;
        if (setupSocketClient(socketAddress)) return true;
    }
    ALOGE("None of the socket address resolved for %s:%u can be added as inet client.", addr, port);
    return false;
@@ -268,6 +268,11 @@ sp<IBinder> RpcConnection::getRootObject() {
    return state()->getRootObject(socket.fd(), sp<RpcConnection>::fromExisting(this));
}

status_t RpcConnection::getMaxThreads(size_t* maxThreads) {
    ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), SocketUse::CLIENT);
    return state()->getMaxThreads(socket.fd(), sp<RpcConnection>::fromExisting(this), maxThreads);
}

status_t RpcConnection::transact(const RpcAddress& address, uint32_t code, const Parcel& data,
                                 Parcel* reply, uint32_t flags) {
    ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this),
@@ -348,7 +353,39 @@ bool RpcConnection::setupSocketServer(const SocketAddress& addr) {
    return true;
}

bool RpcConnection::addSocketClient(const SocketAddress& addr) {
bool RpcConnection::setupSocketClient(const SocketAddress& addr) {
    {
        std::lock_guard<std::mutex> _l(mSocketMutex);
        LOG_ALWAYS_FATAL_IF(mClients.size() != 0,
                            "Must only setup connection once, but already has %zu clients",
                            mClients.size());
    }

    if (!setupOneSocketClient(addr)) return false;

    // TODO(b/185167543): we should add additional connections dynamically
    // instead of all at once.
    // TODO(b/186470974): first risk of blocking
    size_t numThreadsAvailable;
    if (status_t status = getMaxThreads(&numThreadsAvailable); status != OK) {
        ALOGE("Could not get max threads after initial connection to %s: %s",
              addr.toString().c_str(), statusToString(status).c_str());
        return false;
    }

    // we've already setup one client
    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
        // TODO(b/185167543): avoid race w/ accept4 not being called on server
        for (size_t tries = 0; tries < 5; tries++) {
            if (setupOneSocketClient(addr)) break;
            usleep(10000);
        }
    }

    return true;
}

bool RpcConnection::setupOneSocketClient(const SocketAddress& addr) {
    unique_fd serverFd(
            TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
    if (serverFd == -1) {
+42 −7
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
#include <sys/socket.h>
#include <sys/un.h>

#include <thread>
#include <vector>

#include <binder/Parcel.h>
@@ -41,6 +42,31 @@ void RpcServer::iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction()
    mAgreedExperimental = true;
}

void RpcServer::setMaxThreads(size_t threads) {
    LOG_ALWAYS_FATAL_IF(threads <= 0, "RpcServer is useless without threads");
    {
        // this lock should only ever be needed in the error case
        std::lock_guard<std::mutex> _l(mLock);
        LOG_ALWAYS_FATAL_IF(mConnections.size() > 0,
                            "Must specify max threads before creating a connection");
    }
    mMaxThreads = threads;
}

size_t RpcServer::getMaxThreads() {
    return mMaxThreads;
}

void RpcServer::setRootObject(const sp<IBinder>& binder) {
    std::lock_guard<std::mutex> _l(mLock);
    mRootObject = binder;
}

sp<IBinder> RpcServer::getRootObject() {
    std::lock_guard<std::mutex> _l(mLock);
    return mRootObject;
}

sp<RpcConnection> RpcServer::addClientConnection() {
    LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");

@@ -48,19 +74,28 @@ sp<RpcConnection> RpcServer::addClientConnection() {
    connection->setForServer(sp<RpcServer>::fromExisting(this));
    {
        std::lock_guard<std::mutex> _l(mLock);
        LOG_ALWAYS_FATAL_IF(mStarted,
                            "currently only supports adding client connections at creation time");
        mConnections.push_back(connection);
    }
    return connection;
}

void RpcServer::setRootObject(const sp<IBinder>& binder) {
void RpcServer::join() {
    std::vector<std::thread> pool;
    {
        std::lock_guard<std::mutex> _l(mLock);
    mRootObject = binder;
        mStarted = true;
        for (const sp<RpcConnection>& connection : mConnections) {
            for (size_t i = 0; i < mMaxThreads; i++) {
                pool.push_back(std::thread([=] { connection->join(); }));
            }
        }
    }

sp<IBinder> RpcServer::getRootObject() {
    std::lock_guard<std::mutex> _l(mLock);
    return mRootObject;
    // TODO(b/185167543): don't waste extra thread for join, and combine threads
    // between clients
    for (auto& t : pool) t.join();
}

} // namespace android
+42 −15
Original line number Diff line number Diff line
@@ -248,6 +248,31 @@ sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd,
    return reply.readStrongBinder();
}

status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcConnection>& connection,
                                 size_t* maxThreads) {
    Parcel data;
    data.markForRpc(connection);
    Parcel reply;

    status_t status = transact(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS, data,
                               connection, &reply, 0);
    if (status != OK) {
        ALOGE("Error getting max threads: %s", statusToString(status).c_str());
        return status;
    }

    int32_t threads;
    status = reply.readInt32(&threads);
    if (status != OK) return status;
    if (threads <= 0) {
        ALOGE("Error invalid max threads: %d", threads);
        return BAD_VALUE;
    }

    *maxThreads = threads;
    return OK;
}

status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address, uint32_t code,
                            const Parcel& data, const sp<RpcConnection>& connection, Parcel* reply,
                            uint32_t flags) {
@@ -516,24 +541,26 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd,
            replyStatus = target->transact(transaction->code, data, &reply, transaction->flags);
        } else {
            LOG_RPC_DETAIL("Got special transaction %u", transaction->code);

            sp<RpcServer> server = connection->server().promote();
            if (server) {
                // special case for 'zero' address (special server commands)
                switch (transaction->code) {
                    case RPC_SPECIAL_TRANSACT_GET_ROOT: {
                    sp<IBinder> root;
                    sp<RpcServer> server = connection->server().promote();
                    if (server) {
                        root = server->getRootObject();
                    } else {
                        ALOGE("Root object requested, but no server attached.");
                        replyStatus = reply.writeStrongBinder(server->getRootObject());
                        break;
                    }

                    replyStatus = reply.writeStrongBinder(root);
                    case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: {
                        replyStatus = reply.writeInt32(server->getMaxThreads());
                        break;
                    }
                    default: {
                        replyStatus = UNKNOWN_TRANSACTION;
                    }
                }
            } else {
                ALOGE("Special command sent, but no server object attached.");
            }
        }
    }

+2 −0
Original line number Diff line number Diff line
@@ -51,6 +51,8 @@ public:
    ~RpcState();

    sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcConnection>& connection);
    status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcConnection>& connection,
                           size_t* maxThreadsOut);

    [[nodiscard]] status_t transact(const base::unique_fd& fd, const RpcAddress& address,
                                    uint32_t code, const Parcel& data,
+1 −0
Original line number Diff line number Diff line
@@ -47,6 +47,7 @@ enum : uint32_t {
 */
enum : uint32_t {
    RPC_SPECIAL_TRANSACT_GET_ROOT = 0,
    RPC_SPECIAL_TRANSACT_GET_MAX_THREADS = 1,
};

// serialization is like:
Loading