Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit bd3eee40 authored by Yifan Hong's avatar Yifan Hong Committed by Automerger Merge Worker
Browse files

Merge changes from topic "binder_rpc_non_blocking" am: 93b2cc96

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1789069

Change-Id: Icfe84de041d8e670b8283187d5035d1cb73f19ab
parents 81809fac 93b2cc96
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -104,6 +104,7 @@ cc_library {
        "BpBinder.cpp",
        "BufferedTextOutput.cpp",
        "Debug.cpp",
        "FdTrigger.cpp",
        "IInterface.cpp",
        "IMemory.cpp",
        "IPCThreadState.cpp",
+62 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define LOG_TAG "FdTrigger"
#include <log/log.h>

#include <poll.h>

#include <android-base/macros.h>

#include "FdTrigger.h"
namespace android {

std::unique_ptr<FdTrigger> FdTrigger::make() {
    auto ret = std::make_unique<FdTrigger>();
    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
        ALOGE("Could not create pipe %s", strerror(errno));
        return nullptr;
    }
    return ret;
}

void FdTrigger::trigger() {
    mWrite.reset();
}

bool FdTrigger::isTriggered() {
    return mWrite == -1;
}

status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
    while (true) {
        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
        if (ret < 0) {
            return -errno;
        }
        if (ret == 0) {
            continue;
        }
        if (pfd[1].revents & POLLHUP) {
            return -ECANCELED;
        }
        return pfd[0].revents & event ? OK : DEAD_OBJECT;
    }
}

} // namespace android
+56 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <memory>

#include <android-base/unique_fd.h>
#include <utils/Errors.h>

namespace android {

/** This is not a pipe. */
class FdTrigger {
public:
    /** Returns nullptr for error case */
    static std::unique_ptr<FdTrigger> make();

    /**
     * Close the write end of the pipe so that the read end receives POLLHUP.
     * Not threadsafe.
     */
    void trigger();

    /**
     * Whether this has been triggered.
     */
    bool isTriggered();

    /**
     * Poll for a read event.
     *
     * event - for pollfd
     *
     * Return:
     *   true - time to read!
     *   false - trigger happened
     */
    status_t triggerablePoll(base::borrowed_fd fd, int16_t event);

private:
    base::unique_fd mWrite;
    base::unique_fd mRead;
};
} // namespace android
+10 −9
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@
#include <binder/RpcTransportRaw.h>
#include <log/log.h>

#include "FdTrigger.h"
#include "RpcSocketAddress.h"
#include "RpcState.h"
#include "RpcWireFormat.h"
@@ -156,7 +157,7 @@ void RpcServer::join() {
        LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join.");
        LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined");
        mJoinThreadRunning = true;
        mShutdownTrigger = RpcSession::FdTrigger::make();
        mShutdownTrigger = FdTrigger::make();
        LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");

        mCtx = mRpcTransportCtxFactory->newServerCtx();
@@ -167,7 +168,7 @@ void RpcServer::join() {
    status_t status;
    while ((status = mShutdownTrigger->triggerablePoll(mServer, POLLIN)) == OK) {
        unique_fd clientFd(TEMP_FAILURE_RETRY(
                accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC)));
                accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC | SOCK_NONBLOCK)));

        if (clientFd < 0) {
            ALOGE("Could not accept4 socket: %s", strerror(errno));
@@ -259,7 +260,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
    status_t status = OK;

    int clientFdForLog = clientFd.get();
    auto client = server->mCtx->newTransport(std::move(clientFd));
    auto client = server->mCtx->newTransport(std::move(clientFd), server->mShutdownTrigger.get());
    if (client == nullptr) {
        ALOGE("Dropping accept4()-ed socket because sslAccept fails");
        status = DEAD_OBJECT;
@@ -270,7 +271,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie

    RpcConnectionHeader header;
    if (status == OK) {
        status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header,
        status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header,
                                                sizeof(header));
        if (status != OK) {
            ALOGE("Failed to read ID for client connecting to RPC server: %s",
@@ -296,7 +297,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
                    .version = protocolVersion,
            };

            status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response,
            status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response,
                                                     sizeof(response));
            if (status != OK) {
                ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
@@ -387,8 +388,8 @@ status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
    LOG_RPC_DETAIL("Setting up socket server %s", addr.toString().c_str());
    LOG_ALWAYS_FATAL_IF(hasServer(), "Each RpcServer can only have one server.");

    unique_fd serverFd(
            TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
    unique_fd serverFd(TEMP_FAILURE_RETRY(
            socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)));
    if (serverFd == -1) {
        int savedErrno = errno;
        ALOGE("Could not create socket: %s", strerror(savedErrno));
+58 −111
Original line number Diff line number Diff line
@@ -35,9 +35,11 @@
#include <jni.h>
#include <utils/String8.h>

#include "FdTrigger.h"
#include "RpcSocketAddress.h"
#include "RpcState.h"
#include "RpcWireFormat.h"
#include "Utils.h"

#ifdef __GLIBC__
extern "C" pid_t gettid();
@@ -133,12 +135,18 @@ status_t RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_
            fd = request();
            if (!fd.ok()) return BAD_VALUE;
        }
        if (auto res = setNonBlocking(fd); !res.ok()) {
            ALOGE("setupPreconnectedClient: %s", res.error().message().c_str());
            return res.error().code() == 0 ? UNKNOWN_ERROR : -res.error().code();
        }
        return initAndAddConnection(std::move(fd), sessionId, incoming);
    });
}

status_t RpcSession::addNullDebuggingClient() {
    // Note: only works on raw sockets.
    if (auto status = initShutdownTrigger(); status != OK) return status;

    unique_fd serverFd(TEMP_FAILURE_RETRY(open("/dev/null", O_WRONLY | O_CLOEXEC)));

    if (serverFd == -1) {
@@ -152,7 +160,7 @@ status_t RpcSession::addNullDebuggingClient() {
        ALOGE("Unable to create RpcTransportCtx for null debugging client");
        return NO_MEMORY;
    }
    auto server = ctx->newTransport(std::move(serverFd));
    auto server = ctx->newTransport(std::move(serverFd), mShutdownTrigger.get());
    if (server == nullptr) {
        ALOGE("Unable to set up RpcTransport");
        return UNKNOWN_ERROR;
@@ -216,91 +224,6 @@ status_t RpcSession::sendDecStrong(const RpcAddress& address) {
    return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address);
}

std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
    auto ret = std::make_unique<RpcSession::FdTrigger>();
    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
        ALOGE("Could not create pipe %s", strerror(errno));
        return nullptr;
    }
    return ret;
}

void RpcSession::FdTrigger::trigger() {
    mWrite.reset();
}

bool RpcSession::FdTrigger::isTriggered() {
    return mWrite == -1;
}

status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) {
    return triggerablePoll(rpcTransport->pollSocket(), event);
}

status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
    while (true) {
        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
        if (ret < 0) {
            return -errno;
        }
        if (ret == 0) {
            continue;
        }
        if (pfd[1].revents & POLLHUP) {
            return -ECANCELED;
        }
        return pfd[0].revents & event ? OK : DEAD_OBJECT;
    }
}

status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport,
                                                        const void* data, size_t size) {
    const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
    const uint8_t* end = buffer + size;

    MAYBE_WAIT_IN_FLAKE_MODE;

    status_t status;
    while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) {
        auto writeSize = rpcTransport->send(buffer, end - buffer);
        if (!writeSize.ok()) {
            LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
            return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
        }

        if (*writeSize == 0) return DEAD_OBJECT;

        buffer += *writeSize;
        if (buffer == end) return OK;
    }
    return status;
}

status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data,
                                                       size_t size) {
    uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
    uint8_t* end = buffer + size;

    MAYBE_WAIT_IN_FLAKE_MODE;

    status_t status;
    while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) {
        auto readSize = rpcTransport->recv(buffer, end - buffer);
        if (!readSize.ok()) {
            LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
            return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
        }

        if (*readSize == 0) return DEAD_OBJECT; // EOF

        buffer += *readSize;
        if (buffer == end) return OK;
    }
    return status;
}

status_t RpcSession::readId() {
    {
        std::lock_guard<std::mutex> _l(mMutex);
@@ -484,6 +407,7 @@ status_t RpcSession::setupClient(
                            "Must only setup session once, but already has %zu clients",
                            mOutgoingConnections.size());
    }
    if (auto status = initShutdownTrigger(); status != OK) return status;

    if (status_t status = connectAndInit(RpcAddress::zero(), false /*incoming*/); status != OK)
        return status;
@@ -550,8 +474,8 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr,
    for (size_t tries = 0; tries < 5; tries++) {
        if (tries > 0) usleep(10000);

        unique_fd serverFd(
                TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
        unique_fd serverFd(TEMP_FAILURE_RETRY(
                socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)));
        if (serverFd == -1) {
            int savedErrno = errno;
            ALOGE("Could not create socket at %s: %s", addr.toString().c_str(),
@@ -564,11 +488,35 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr,
                ALOGW("Connection reset on %s", addr.toString().c_str());
                continue;
            }
            if (errno != EAGAIN && errno != EINPROGRESS) {
                int savedErrno = errno;
                ALOGE("Could not connect socket at %s: %s", addr.toString().c_str(),
                      strerror(savedErrno));
                return -savedErrno;
            }
            // For non-blocking sockets, connect() may return EAGAIN (for unix domain socket) or
            // EINPROGRESS (for others). Call poll() and getsockopt() to get the error.
            status_t pollStatus = mShutdownTrigger->triggerablePoll(serverFd, POLLOUT);
            if (pollStatus != OK) {
                ALOGE("Could not POLLOUT after connect() on non-blocking socket: %s",
                      statusToString(pollStatus).c_str());
                return pollStatus;
            }
            int soError;
            socklen_t soErrorLen = sizeof(soError);
            int ret = getsockopt(serverFd.get(), SOL_SOCKET, SO_ERROR, &soError, &soErrorLen);
            if (ret == -1) {
                int savedErrno = errno;
                ALOGE("Could not getsockopt() after connect() on non-blocking socket: %s",
                      strerror(savedErrno));
                return -savedErrno;
            }
            if (soError != 0) {
                ALOGE("After connect(), getsockopt() returns error for socket at %s: %s",
                      addr.toString().c_str(), strerror(soError));
                return -soError;
            }
        }
        LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());

        return initAndAddConnection(std::move(serverFd), sessionId, incoming);
@@ -580,13 +528,14 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr,

status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId,
                                          bool incoming) {
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr);
    auto ctx = mRpcTransportCtxFactory->newClientCtx();
    if (ctx == nullptr) {
        ALOGE("Unable to create client RpcTransportCtx with %s sockets",
              mRpcTransportCtxFactory->toCString());
        return NO_MEMORY;
    }
    auto server = ctx->newTransport(std::move(fd));
    auto server = ctx->newTransport(std::move(fd), mShutdownTrigger.get());
    if (server == nullptr) {
        ALOGE("Unable to set up RpcTransport in %s context", mRpcTransportCtxFactory->toCString());
        return UNKNOWN_ERROR;
@@ -602,16 +551,12 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessio

    if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;

    auto sentHeader = server->send(&header, sizeof(header));
    if (!sentHeader.ok()) {
    auto sendHeaderStatus =
            server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header));
    if (sendHeaderStatus != OK) {
        ALOGE("Could not write connection header to socket: %s",
              sentHeader.error().message().c_str());
        return -sentHeader.error().code();
    }
    if (*sentHeader != sizeof(header)) {
        ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd",
              *sentHeader, sizeof(header));
        return UNKNOWN_ERROR;
              statusToString(sendHeaderStatus).c_str());
        return sendHeaderStatus;
    }

    LOG_RPC_DETAIL("Socket at client: header sent");
@@ -652,11 +597,7 @@ status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTran
    return OK;
}

status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) {
    sp<RpcConnection> connection = sp<RpcConnection>::make();
    {
        std::lock_guard<std::mutex> _l(mMutex);

status_t RpcSession::initShutdownTrigger() {
    // first client connection added, but setForServer not called, so
    // initializaing for a client.
    if (mShutdownTrigger == nullptr) {
@@ -664,7 +605,13 @@ status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTran
        mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
        if (mShutdownTrigger == nullptr) return INVALID_OPERATION;
    }
    return OK;
}

status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) {
    sp<RpcConnection> connection = sp<RpcConnection>::make();
    {
        std::lock_guard<std::mutex> _l(mMutex);
        connection->rpcTransport = std::move(rpcTransport);
        connection->exclusiveTid = gettid();
        mOutgoingConnections.push_back(connection);
Loading