Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 82adab51 authored by Steven Moreland's avatar Steven Moreland Committed by Automerger Merge Worker
Browse files

Merge changes Id4971e54,Id80da21c,Ice446ec4,If10f00de,I4f59ad60 am: 8f0b7f28...

Merge changes Id4971e54,Id80da21c,Ice446ec4,If10f00de,I4f59ad60 am: 8f0b7f28 am: 1c3b2b13 am: faedb79f am: 936b7ad6

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1716634

Change-Id: Icd43cbd7a95add4f9f8d7e4fb8a3f852f09223b0
parents ad48c396 936b7ad6
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -273,7 +273,8 @@ status_t BpBinder::transact(

        status_t status;
        if (CC_UNLIKELY(isRpcBinder())) {
            status = rpcSession()->transact(rpcAddress(), code, data, reply, flags);
            status = rpcSession()->transact(sp<IBinder>::fromExisting(this), code, data, reply,
                                            flags);
        } else {
            status = IPCThreadState::self()->transact(binderHandle(), code, data, reply, flags);
        }
+6 −4
Original line number Diff line number Diff line
@@ -193,10 +193,12 @@ bool RpcServer::shutdown() {

    mShutdownTrigger->trigger();
    while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
        ALOGI("Waiting for RpcServer to shut down. Join thread running: %d, Connecting threads: "
              "%zu, Sessions: %zu",
        if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) {
            ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, "
                  "Connecting threads: "
                  "%zu, Sessions: %zu. Is your server deadlocked?",
                  mJoinThreadRunning, mConnectingThreads.size(), mSessions.size());
        mShutdownCv.wait(_l);
        }
    }

    // At this point, we know join() is about to exit, but the thread that calls
+9 −8
Original line number Diff line number Diff line
@@ -86,8 +86,7 @@ bool RpcSession::addNullDebuggingClient() {
        return false;
    }

    addClientConnection(std::move(serverFd));
    return true;
    return addClientConnection(std::move(serverFd));
}

sp<IBinder> RpcSession::getRootObject() {
@@ -100,12 +99,12 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
    return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
}

status_t RpcSession::transact(const RpcAddress& address, uint32_t code, const Parcel& data,
status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
                              Parcel* reply, uint32_t flags) {
    ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
                                   (flags & IBinder::FLAG_ONEWAY) ? ConnectionUse::CLIENT_ASYNC
                                                                  : ConnectionUse::CLIENT);
    return state()->transact(connection.fd(), address, code, data,
    return state()->transact(connection.fd(), binder, code, data,
                             sp<RpcSession>::fromExisting(this), reply, flags);
}

@@ -199,7 +198,8 @@ void RpcSession::join(unique_fd client) {
                state()->getAndExecuteCommand(connection->fd, sp<RpcSession>::fromExisting(this));

        if (error != OK) {
            ALOGI("Binder connection thread closing w/ status %s", statusToString(error).c_str());
            LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
                           statusToString(error).c_str());
            break;
        }
    }
@@ -311,24 +311,25 @@ bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id)

        LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());

        addClientConnection(std::move(serverFd));
        return true;
        return addClientConnection(std::move(serverFd));
    }

    ALOGE("Ran out of retries to connect to %s", addr.toString().c_str());
    return false;
}

void RpcSession::addClientConnection(unique_fd fd) {
bool RpcSession::addClientConnection(unique_fd fd) {
    std::lock_guard<std::mutex> _l(mMutex);

    if (mShutdownTrigger == nullptr) {
        mShutdownTrigger = FdTrigger::make();
        if (mShutdownTrigger == nullptr) return false;
    }

    sp<RpcConnection> session = sp<RpcConnection>::make();
    session->fd = std::move(fd);
    mClientConnections.push_back(session);
    return true;
}

void RpcSession::setForServer(const wp<RpcServer>& server, int32_t sessionId,
+132 −105
Original line number Diff line number Diff line
@@ -207,45 +207,49 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) {
    mData.reset(new (std::nothrow) uint8_t[size]);
}

bool RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data, size_t size) {
status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data,
                           size_t size) {
    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());

    if (size > std::numeric_limits<ssize_t>::max()) {
        ALOGE("Cannot send %s at size %zu (too big)", what, size);
        terminate();
        return false;
        return BAD_VALUE;
    }

    ssize_t sent = TEMP_FAILURE_RETRY(send(fd.get(), data, size, MSG_NOSIGNAL));

    if (sent < 0 || sent != static_cast<ssize_t>(size)) {
        int savedErrno = errno;
        ALOGE("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent, size,
              fd.get(), strerror(errno));
              fd.get(), strerror(savedErrno));

        terminate();
        return false;
        return -savedErrno;
    }

    return true;
    return OK;
}

bool RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session, const char* what,
                      void* data, size_t size) {
status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
                          const char* what, void* data, size_t size) {
    if (size > std::numeric_limits<ssize_t>::max()) {
        ALOGE("Cannot rec %s at size %zu (too big)", what, size);
        terminate();
        return false;
        return BAD_VALUE;
    }

    if (status_t status = session->mShutdownTrigger->interruptableReadFully(fd.get(), data, size);
        status != OK) {
        if (status != -ECANCELED) {
            ALOGE("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(),
                  statusToString(status).c_str());
        return false;
        }
        return status;
    }

    LOG_RPC_DETAIL("Received %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
    return true;
    return OK;
}

sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) {
@@ -253,8 +257,8 @@ sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSessi
    data.markForRpc(session);
    Parcel reply;

    status_t status = transact(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT, data, session,
                               &reply, 0);
    status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT, data,
                                      session, &reply, 0);
    if (status != OK) {
        ALOGE("Error getting root object: %s", statusToString(status).c_str());
        return nullptr;
@@ -269,8 +273,8 @@ status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>
    data.markForRpc(session);
    Parcel reply;

    status_t status = transact(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS, data,
                               session, &reply, 0);
    status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS,
                                      data, session, &reply, 0);
    if (status != OK) {
        ALOGE("Error getting max threads: %s", statusToString(status).c_str());
        return status;
@@ -294,8 +298,8 @@ status_t RpcState::getSessionId(const base::unique_fd& fd, const sp<RpcSession>&
    data.markForRpc(session);
    Parcel reply;

    status_t status = transact(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID, data,
                               session, &reply, 0);
    status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID,
                                      data, session, &reply, 0);
    if (status != OK) {
        ALOGE("Error getting session ID: %s", statusToString(status).c_str());
        return status;
@@ -309,9 +313,31 @@ status_t RpcState::getSessionId(const base::unique_fd& fd, const sp<RpcSession>&
    return OK;
}

status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address, uint32_t code,
status_t RpcState::transact(const base::unique_fd& fd, const sp<IBinder>& binder, uint32_t code,
                            const Parcel& data, const sp<RpcSession>& session, Parcel* reply,
                            uint32_t flags) {
    if (!data.isForRpc()) {
        ALOGE("Refusing to send RPC with parcel not crafted for RPC");
        return BAD_TYPE;
    }

    if (data.objectsCount() != 0) {
        ALOGE("Parcel at %p has attached objects but is being used in an RPC call", &data);
        return BAD_TYPE;
    }

    RpcAddress address = RpcAddress::zero();
    if (status_t status = onBinderLeaving(session, binder, &address); status != OK) return status;

    return transactAddress(fd, address, code, data, session, reply, flags);
}

status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& address,
                                   uint32_t code, const Parcel& data, const sp<RpcSession>& session,
                                   Parcel* reply, uint32_t flags) {
    LOG_ALWAYS_FATAL_IF(!data.isForRpc());
    LOG_ALWAYS_FATAL_IF(data.objectsCount() != 0);

    uint64_t asyncNumber = 0;

    if (!address.isZero()) {
@@ -326,16 +352,6 @@ status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address
        }
    }

    if (!data.isForRpc()) {
        ALOGE("Refusing to send RPC with parcel not crafted for RPC");
        return BAD_TYPE;
    }

    if (data.objectsCount() != 0) {
        ALOGE("Parcel at %p has attached objects but is being used in an RPC call", &data);
        return BAD_TYPE;
    }

    RpcWireTransaction transaction{
            .address = address.viewRawEmbedded(),
            .code = code,
@@ -361,12 +377,12 @@ status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address
            .bodySize = static_cast<uint32_t>(transactionData.size()),
    };

    if (!rpcSend(fd, "transact header", &command, sizeof(command))) {
        return DEAD_OBJECT;
    }
    if (!rpcSend(fd, "command body", transactionData.data(), transactionData.size())) {
        return DEAD_OBJECT;
    }
    if (status_t status = rpcSend(fd, "transact header", &command, sizeof(command)); status != OK)
        return status;
    if (status_t status =
                rpcSend(fd, "command body", transactionData.data(), transactionData.size());
        status != OK)
        return status;

    if (flags & IBinder::FLAG_ONEWAY) {
        return OK; // do not wait for result
@@ -390,24 +406,22 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
                                Parcel* reply) {
    RpcWireHeader command;
    while (true) {
        if (!rpcRec(fd, session, "command header", &command, sizeof(command))) {
            return DEAD_OBJECT;
        }
        if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command));
            status != OK)
            return status;

        if (command.command == RPC_COMMAND_REPLY) break;

        status_t status = processServerCommand(fd, session, command);
        if (status != OK) return status;
        if (status_t status = processServerCommand(fd, session, command); status != OK)
            return status;
    }

    CommandData data(command.bodySize);
    if (!data.valid()) {
        return NO_MEMORY;
    }
    if (!data.valid()) return NO_MEMORY;

    if (!rpcRec(fd, session, "reply body", data.data(), command.bodySize)) {
        return DEAD_OBJECT;
    }
    if (status_t status = rpcRec(fd, session, "reply body", data.data(), command.bodySize);
        status != OK)
        return status;

    if (command.bodySize < sizeof(RpcWireReply)) {
        ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!",
@@ -447,9 +461,12 @@ status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& ad
            .command = RPC_COMMAND_DEC_STRONG,
            .bodySize = sizeof(RpcWireAddress),
    };
    if (!rpcSend(fd, "dec ref header", &cmd, sizeof(cmd))) return DEAD_OBJECT;
    if (!rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress)))
        return DEAD_OBJECT;
    if (status_t status = rpcSend(fd, "dec ref header", &cmd, sizeof(cmd)); status != OK)
        return status;
    if (status_t status =
                rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress));
        status != OK)
        return status;
    return OK;
}

@@ -457,9 +474,9 @@ status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcS
    LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get());

    RpcWireHeader command;
    if (!rpcRec(fd, session, "command header", &command, sizeof(command))) {
        return DEAD_OBJECT;
    }
    if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command));
        status != OK)
        return status;

    return processServerCommand(fd, session, command);
}
@@ -505,11 +522,12 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSessio
    if (!transactionData.valid()) {
        return NO_MEMORY;
    }
    if (!rpcRec(fd, session, "transaction body", transactionData.data(), transactionData.size())) {
        return DEAD_OBJECT;
    }
    if (status_t status = rpcRec(fd, session, "transaction body", transactionData.data(),
                                 transactionData.size());
        status != OK)
        return status;

    return processTransactInternal(fd, session, std::move(transactionData));
    return processTransactInternal(fd, session, std::move(transactionData), nullptr /*targetRef*/);
}

static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t dataSize,
@@ -522,7 +540,7 @@ static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t d
}

status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session,
                                           CommandData transactionData) {
                                           CommandData transactionData, sp<IBinder>&& targetRef) {
    if (transactionData.size() < sizeof(RpcWireTransaction)) {
        ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
              sizeof(RpcWireTransaction), transactionData.size());
@@ -538,14 +556,12 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
    status_t replyStatus = OK;
    sp<IBinder> target;
    if (!addr.isZero()) {
        std::lock_guard<std::mutex> _l(mNodeMutex);

        auto it = mNodeForAddress.find(addr);
        if (it == mNodeForAddress.end()) {
            ALOGE("Unknown binder address %s.", addr.toString().c_str());
            replyStatus = BAD_VALUE;
        if (!targetRef) {
            target = onBinderEntering(session, addr);
        } else {
            target = it->second.binder.promote();
            target = targetRef;
        }

        if (target == nullptr) {
            // This can happen if the binder is remote in this process, and
            // another thread has called the last decStrong on this binder.
@@ -558,18 +574,25 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
            terminate();
            replyStatus = BAD_VALUE;
        } else if (target->localBinder() == nullptr) {
                ALOGE("Transactions can only go to local binders, not address %s. Terminating!",
            ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!",
                  addr.toString().c_str());
            terminate();
            replyStatus = BAD_VALUE;
        } else if (transaction->flags & IBinder::FLAG_ONEWAY) {
                if (transaction->asyncNumber != it->second.asyncNumber) {
            std::lock_guard<std::mutex> _l(mNodeMutex);
            auto it = mNodeForAddress.find(addr);
            if (it->second.binder.promote() != target) {
                ALOGE("Binder became invalid during transaction. Bad client? %s",
                      addr.toString().c_str());
                replyStatus = BAD_VALUE;
            } else if (transaction->asyncNumber != it->second.asyncNumber) {
                // we need to process some other asynchronous transaction
                // first
                // TODO(b/183140903): limit enqueues/detect overfill for bad client
                // TODO(b/183140903): detect when an object is deleted when it still has
                //        pending async transactions
                it->second.asyncTodo.push(BinderNode::AsyncTodo{
                        .ref = target,
                        .data = std::move(transactionData),
                        .asyncNumber = transaction->asyncNumber,
                });
@@ -579,7 +602,6 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
            }
        }
    }
    }

    Parcel reply;
    reply.markForRpc(session);
@@ -670,13 +692,17 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
                               it->second.asyncNumber, addr.toString().c_str());

                // justification for const_cast (consider avoiding priority_queue):
                // - AsyncTodo operator< doesn't depend on 'data' object
                // - AsyncTodo operator< doesn't depend on 'data' or 'ref' objects
                // - gotta go fast
                CommandData data = std::move(
                        const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top()).data);
                auto& todo = const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top());

                CommandData nextData = std::move(todo.data);
                sp<IBinder> nextRef = std::move(todo.ref);

                it->second.asyncTodo.pop();
                _l.unlock();
                return processTransactInternal(fd, session, std::move(data));
                return processTransactInternal(fd, session, std::move(nextData),
                                               std::move(nextRef));
            }
        }
        return OK;
@@ -704,12 +730,12 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
            .bodySize = static_cast<uint32_t>(replyData.size()),
    };

    if (!rpcSend(fd, "reply header", &cmdReply, sizeof(RpcWireHeader))) {
        return DEAD_OBJECT;
    }
    if (!rpcSend(fd, "reply body", replyData.data(), replyData.size())) {
        return DEAD_OBJECT;
    }
    if (status_t status = rpcSend(fd, "reply header", &cmdReply, sizeof(RpcWireHeader));
        status != OK)
        return status;
    if (status_t status = rpcSend(fd, "reply body", replyData.data(), replyData.size());
        status != OK)
        return status;
    return OK;
}

@@ -721,9 +747,10 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi
    if (!commandData.valid()) {
        return NO_MEMORY;
    }
    if (!rpcRec(fd, session, "dec ref body", commandData.data(), commandData.size())) {
        return DEAD_OBJECT;
    }
    if (status_t status =
                rpcRec(fd, session, "dec ref body", commandData.data(), commandData.size());
        status != OK)
        return status;

    if (command.bodySize < sizeof(RpcWireAddress)) {
        ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireAddress. Terminating!",
+12 −6
Original line number Diff line number Diff line
@@ -58,9 +58,13 @@ public:
    status_t getSessionId(const base::unique_fd& fd, const sp<RpcSession>& session,
                          int32_t* sessionIdOut);

    [[nodiscard]] status_t transact(const base::unique_fd& fd, const RpcAddress& address,
    [[nodiscard]] status_t transact(const base::unique_fd& fd, const sp<IBinder>& address,
                                    uint32_t code, const Parcel& data,
                                    const sp<RpcSession>& session, Parcel* reply, uint32_t flags);
    [[nodiscard]] status_t transactAddress(const base::unique_fd& fd, const RpcAddress& address,
                                           uint32_t code, const Parcel& data,
                                           const sp<RpcSession>& session, Parcel* reply,
                                           uint32_t flags);
    [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address);
    [[nodiscard]] status_t getAndExecuteCommand(const base::unique_fd& fd,
                                                const sp<RpcSession>& session);
@@ -115,9 +119,9 @@ private:
        size_t mSize;
    };

    [[nodiscard]] bool rpcSend(const base::unique_fd& fd, const char* what, const void* data,
    [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const char* what, const void* data,
                                   size_t size);
    [[nodiscard]] bool rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
    [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
                                  const char* what, void* data, size_t size);

    [[nodiscard]] status_t waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -129,7 +133,8 @@ private:
                                           const RpcWireHeader& command);
    [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd,
                                                   const sp<RpcSession>& session,
                                                   CommandData transactionData);
                                                   CommandData transactionData,
                                                   sp<IBinder>&& targetRef);
    [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd,
                                            const sp<RpcSession>& session,
                                            const RpcWireHeader& command);
@@ -165,6 +170,7 @@ private:

        // async transaction queue, _only_ for local binder
        struct AsyncTodo {
            sp<IBinder> ref;
            CommandData data;
            uint64_t asyncNumber = 0;

Loading