Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 8c950421 authored by Yifan Hong's avatar Yifan Hong
Browse files

binder: Refactor: move FdTrigger to its own file / class.

Also move interruptable*Fully functions to RpcTransport so that we no
longer need pending() and pollSocket().

This also allows us to hide send() / recv(); callers should use
interruptableWriteFully / interruptableReadFully instead, because
those repsect the shutdown trigger.
- Fix one place to use interruptableWriteFully() instead of send() when
  sending header.

interruptable*Fully are marked as virtual functions because TLS will
need to poll with events dynamically adjusted. See follow-up CLs for
TLS implementation.

Test: TH
Bug: 190868302
Change-Id: I131eed3a637b3a30280b320966e466bbfac0fc45
parent 832521eb
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -104,6 +104,7 @@ cc_library {
        "BpBinder.cpp",
        "BufferedTextOutput.cpp",
        "Debug.cpp",
        "FdTrigger.cpp",
        "IInterface.cpp",
        "IMemory.cpp",
        "IPCThreadState.cpp",
+62 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define LOG_TAG "FdTrigger"
#include <log/log.h>

#include <poll.h>

#include <android-base/macros.h>

#include "FdTrigger.h"
namespace android {

std::unique_ptr<FdTrigger> FdTrigger::make() {
    auto ret = std::make_unique<FdTrigger>();
    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
        ALOGE("Could not create pipe %s", strerror(errno));
        return nullptr;
    }
    return ret;
}

void FdTrigger::trigger() {
    mWrite.reset();
}

bool FdTrigger::isTriggered() {
    return mWrite == -1;
}

status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
    while (true) {
        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
        if (ret < 0) {
            return -errno;
        }
        if (ret == 0) {
            continue;
        }
        if (pfd[1].revents & POLLHUP) {
            return -ECANCELED;
        }
        return pfd[0].revents & event ? OK : DEAD_OBJECT;
    }
}

} // namespace android
+56 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <memory>

#include <android-base/unique_fd.h>
#include <utils/Errors.h>

namespace android {

/** This is not a pipe. */
class FdTrigger {
public:
    /** Returns nullptr for error case */
    static std::unique_ptr<FdTrigger> make();

    /**
     * Close the write end of the pipe so that the read end receives POLLHUP.
     * Not threadsafe.
     */
    void trigger();

    /**
     * Whether this has been triggered.
     */
    bool isTriggered();

    /**
     * Poll for a read event.
     *
     * event - for pollfd
     *
     * Return:
     *   true - time to read!
     *   false - trigger happened
     */
    status_t triggerablePoll(base::borrowed_fd fd, int16_t event);

private:
    base::unique_fd mWrite;
    base::unique_fd mRead;
};
} // namespace android
+6 −5
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@
#include <binder/RpcTransportRaw.h>
#include <log/log.h>

#include "FdTrigger.h"
#include "RpcSocketAddress.h"
#include "RpcState.h"
#include "RpcWireFormat.h"
@@ -156,7 +157,7 @@ void RpcServer::join() {
        LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join.");
        LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined");
        mJoinThreadRunning = true;
        mShutdownTrigger = RpcSession::FdTrigger::make();
        mShutdownTrigger = FdTrigger::make();
        LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");

        mCtx = mRpcTransportCtxFactory->newServerCtx();
@@ -270,7 +271,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie

    RpcConnectionHeader header;
    if (status == OK) {
        status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header,
        status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header,
                                                sizeof(header));
        if (status != OK) {
            ALOGE("Failed to read ID for client connecting to RPC server: %s",
@@ -296,7 +297,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
                    .version = protocolVersion,
            };

            status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response,
            status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response,
                                                     sizeof(response));
            if (status != OK) {
                ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
+7 −94
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@
#include <jni.h>
#include <utils/String8.h>

#include "FdTrigger.h"
#include "RpcSocketAddress.h"
#include "RpcState.h"
#include "RpcWireFormat.h"
@@ -218,91 +219,6 @@ status_t RpcSession::sendDecStrong(const RpcAddress& address) {
    return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address);
}

std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
    auto ret = std::make_unique<RpcSession::FdTrigger>();
    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
        ALOGE("Could not create pipe %s", strerror(errno));
        return nullptr;
    }
    return ret;
}

void RpcSession::FdTrigger::trigger() {
    mWrite.reset();
}

bool RpcSession::FdTrigger::isTriggered() {
    return mWrite == -1;
}

status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) {
    return triggerablePoll(rpcTransport->pollSocket(), event);
}

status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
    while (true) {
        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
        if (ret < 0) {
            return -errno;
        }
        if (ret == 0) {
            continue;
        }
        if (pfd[1].revents & POLLHUP) {
            return -ECANCELED;
        }
        return pfd[0].revents & event ? OK : DEAD_OBJECT;
    }
}

status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport,
                                                        const void* data, size_t size) {
    const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
    const uint8_t* end = buffer + size;

    MAYBE_WAIT_IN_FLAKE_MODE;

    status_t status;
    while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) {
        auto writeSize = rpcTransport->send(buffer, end - buffer);
        if (!writeSize.ok()) {
            LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
            return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
        }

        if (*writeSize == 0) return DEAD_OBJECT;

        buffer += *writeSize;
        if (buffer == end) return OK;
    }
    return status;
}

status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data,
                                                       size_t size) {
    uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
    uint8_t* end = buffer + size;

    MAYBE_WAIT_IN_FLAKE_MODE;

    status_t status;
    while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) {
        auto readSize = rpcTransport->recv(buffer, end - buffer);
        if (!readSize.ok()) {
            LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
            return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
        }

        if (*readSize == 0) return DEAD_OBJECT; // EOF

        buffer += *readSize;
        if (buffer == end) return OK;
    }
    return status;
}

status_t RpcSession::readId() {
    {
        std::lock_guard<std::mutex> _l(mMutex);
@@ -584,6 +500,7 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr,

status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId,
                                          bool incoming) {
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr);
    auto ctx = mRpcTransportCtxFactory->newClientCtx();
    if (ctx == nullptr) {
        ALOGE("Unable to create client RpcTransportCtx with %s sockets",
@@ -606,16 +523,12 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessio

    if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;

    auto sentHeader = server->send(&header, sizeof(header));
    if (!sentHeader.ok()) {
    auto sendHeaderStatus =
            server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header));
    if (sendHeaderStatus != OK) {
        ALOGE("Could not write connection header to socket: %s",
              sentHeader.error().message().c_str());
        return -sentHeader.error().code();
    }
    if (*sentHeader != sizeof(header)) {
        ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd",
              *sentHeader, sizeof(header));
        return UNKNOWN_ERROR;
              statusToString(sendHeaderStatus).c_str());
        return sendHeaderStatus;
    }

    LOG_RPC_DETAIL("Socket at client: header sent");
Loading