Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 891989f8 authored by Yifan Hong's avatar Yifan Hong
Browse files

lshal: PipeRelay: use modern classes.

- Use android::base helpers
- Use std::thread instead of utils/Thread
- Use poll() instead of select()
- Use a separate fd trigger instead of an atomic_bool
  so that poll() finishes sooner.
- Also removes useless READ_TIMEOUT and error message
  for select() timeout and ~PipeRelay.

Test: lshal_test
Test: manually run lshal debug repeatedly to see if it
      is stuck or output is not complete

Bug: 182306776

Change-Id: Ie623bf1b979654a30b360819c9a787c88fd8d91d
parent 936fc19a
Loading
Loading
Loading
Loading
+5 −7
Original line number Diff line number Diff line
@@ -142,12 +142,10 @@ Status Lshal::emitDebugInfo(
        }
    }

    PipeRelay relay(out, err, interfaceName, instanceName);

    if (relay.initCheck() != OK) {
        std::string msg = "PipeRelay::initCheck() FAILED w/ " + std::to_string(relay.initCheck());
        err << msg << std::endl;
        LOG(ERROR) << msg;
    auto relay = PipeRelay::create(out, err, interfaceName + "/" + instanceName);
    if (!relay.ok()) {
        err << "Unable to create PipeRelay: " << relay.error() << std::endl;
        LOG(ERROR) << "Unable to create PipeRelay: " << relay.error();
        return IO_ERROR;
    }

@@ -155,7 +153,7 @@ Status Lshal::emitDebugInfo(
        native_handle_create(1 /* numFds */, 0 /* numInts */),
        native_handle_delete);

    fdHandle->data[0] = relay.fd();
    fdHandle->data[0] = relay.value()->fd().get();

    hardware::Return<void> ret = base->debug(fdHandle.get(), convert(options));

+70 −120
Original line number Diff line number Diff line
@@ -16,142 +16,92 @@

#include "PipeRelay.h"

#include <sys/select.h>
#include <sys/time.h>
#include <sys/poll.h>
#include <sys/types.h>
#include <unistd.h>

#include <atomic>
#include <chrono>
#include <optional>

#include <utils/Thread.h>
#include <android-base/unique_fd.h>

using android::base::borrowed_fd;
using android::base::Result;
using android::base::unique_fd;
using std::chrono_literals::operator""ms;

namespace android {
namespace lshal {

static constexpr struct timeval READ_TIMEOUT { .tv_sec = 1, .tv_usec = 0 };

static std::string getThreadName(std::string interfaceName, const std::string &instanceName) {
    auto dot = interfaceName.rfind(".");
    if (dot != std::string::npos) interfaceName = interfaceName.substr(dot + 1);
    return "RelayThread_" + interfaceName + "_" + instanceName;
}

struct PipeRelay::RelayThread : public Thread {
    explicit RelayThread(int fd, std::ostream &os, const NullableOStream<std::ostream> &err,
                         const std::string &fqName);

    bool threadLoop() override;
    void setFinished();

private:
    int mFd;
    std::ostream &mOutStream;
    NullableOStream<std::ostream> mErrStream;

    // If we were to use requestExit() and exitPending() instead, threadLoop()
    // may not run at all by the time ~PipeRelay is called (i.e. debug() has
    // returned from HAL). By using our own flag, we ensure that select() and
    // read() are executed until data are drained.
    std::atomic_bool mFinished;

    std::string mFqName;

    DISALLOW_COPY_AND_ASSIGN(RelayThread);
};

////////////////////////////////////////////////////////////////////////////////

PipeRelay::RelayThread::RelayThread(int fd, std::ostream &os,
Result<std::unique_ptr<PipeRelay>> PipeRelay::create(std::ostream& os,
                                                     const NullableOStream<std::ostream>& err,
                                    const std::string &fqName)
      : mFd(fd), mOutStream(os), mErrStream(err), mFinished(false), mFqName(fqName) {}

bool PipeRelay::RelayThread::threadLoop() {
                                                     const std::string& fqName) {
    auto pipeRelay = std::unique_ptr<PipeRelay>(new PipeRelay());
    unique_fd rfd;
    if (!android::base::Pipe(&rfd, &pipeRelay->mWrite)) {
        return android::base::ErrnoError() << "pipe()";
    }
    // Workaround for b/111997867: need a separate FD trigger because rfd can't receive POLLHUP
    // when the write end is closed after the write end was sent through hwbinder.
    unique_fd rfdTrigger;
    if (!android::base::Pipe(&rfdTrigger, &pipeRelay->mWriteTrigger)) {
        return android::base::ErrnoError() << "pipe() for trigger";
    }
    pipeRelay->mThread =
            std::make_unique<std::thread>(&PipeRelay::thread, std::move(rfd), std::move(rfdTrigger),
                                          &os, &err, fqName);
    return pipeRelay;
}

void PipeRelay::thread(unique_fd rfd, unique_fd rfdTrigger, std::ostream* out,
                       const NullableOStream<std::ostream>* err, std::string fqName) {
    while (true) {
        pollfd pfd[2];
        pfd[0] = {.fd = rfd.get(), .events = POLLIN};
        pfd[1] = {.fd = rfdTrigger.get(), .events = 0};

        int pollRes = poll(pfd, arraysize(pfd), -1 /* infinite timeout */);
        if (pollRes < 0) {
            int savedErrno = errno;
            (*err) << "debug " << fqName << ": poll() failed: " << strerror(savedErrno)
                   << std::endl;
            break;
        }

        if (pfd[0].revents & POLLIN) {
            char buffer[1024];

    fd_set set;
    FD_ZERO(&set);
    FD_SET(mFd, &set);

    struct timeval timeout = READ_TIMEOUT;

    int res = TEMP_FAILURE_RETRY(select(mFd + 1, &set, nullptr, nullptr, &timeout));
    if (res < 0) {
        mErrStream << "debug " << mFqName << ": select() failed";
        return false;
    }

    if (res == 0 || !FD_ISSET(mFd, &set)) {
        if (mFinished) {
            mErrStream << "debug " << mFqName
                       << ": timeout reading from pipe, output may be truncated.";
            return false;
        }
        // timeout, but debug() has not returned, so wait for HAL to finish.
        return true;
    }

    // FD_ISSET(mFd, &set) == true. Data available, start reading
    ssize_t n = TEMP_FAILURE_RETRY(read(mFd, buffer, sizeof(buffer)));

            ssize_t n = TEMP_FAILURE_RETRY(read(rfd.get(), buffer, sizeof(buffer)));
            if (n < 0) {
        mErrStream << "debug " << mFqName << ": read() failed";
                int savedErrno = errno;
                (*err) << "debug " << fqName << ": read() failed: " << strerror(savedErrno)
                       << std::endl;
                break;
            }

    if (n <= 0) {
        return false;
            if (n == 0) {
                (*err) << "Warning: debug " << fqName << ": poll() indicates POLLIN but no data"
                       << std::endl;
                continue;
            }

    mOutStream.write(buffer, n);

    return true;
            out->write(buffer, n);
        }

void PipeRelay::RelayThread::setFinished() {
    mFinished = true;
        if (pfd[0].revents & POLLHUP) {
            break;
        }

////////////////////////////////////////////////////////////////////////////////

PipeRelay::PipeRelay(std::ostream &os, const NullableOStream<std::ostream> &err,
                     const std::string &interfaceName, const std::string &instanceName)
      : mInitCheck(NO_INIT) {
    int res = pipe(mFds);

    if (res < 0) {
        mInitCheck = -errno;
        return;
        if (pfd[1].revents & POLLHUP) {
            // ~PipeRelay is called on the main thread. |mWrite| has been flushed and closed.
            // Ensure that our read end of the pipe doesn't have pending data, then exit.
            if ((pfd[0].revents & POLLIN) == 0) {
                break;
            }

    mThread = new RelayThread(mFds[0], os, err, interfaceName + "/" + instanceName);
    mInitCheck = mThread->run(getThreadName(interfaceName, instanceName).c_str());
        }

void PipeRelay::CloseFd(int *fd) {
    if (*fd >= 0) {
        close(*fd);
        *fd = -1;
    }
}

PipeRelay::~PipeRelay() {
    CloseFd(&mFds[1]);

    if (mThread != nullptr) {
        mThread->setFinished();
    mWrite.reset();
    mWriteTrigger.reset();
    if (mThread != nullptr && mThread->joinable()) {
        mThread->join();
        mThread.clear();
    }

    CloseFd(&mFds[0]);
}

status_t PipeRelay::initCheck() const {
    return mInitCheck;
}

int PipeRelay::fd() const {
    return mFds[1];
}

} // namespace lshal
+18 −17
Original line number Diff line number Diff line
@@ -16,42 +16,43 @@

#pragma once

#include <thread>

#include <android-base/macros.h>
#include <ostream>
#include <android-base/result.h>
#include <android-base/unique_fd.h>
#include <utils/Errors.h>
#include <utils/RefBase.h>
#include <ostream>

#include "NullableOStream.h"

namespace android {
namespace lshal {

/* Creates an AF_UNIX socketpair and spawns a thread that relays any data
/**
 * Creates a pipe and spawns a thread that relays any data
 * written to the "write"-end of the pair to the specified output stream "os".
 */
struct PipeRelay {
    explicit PipeRelay(std::ostream& os,
                       const NullableOStream<std::ostream>& err,
                       const std::string& interfaceName,
                       const std::string& instanceName);
    static android::base::Result<std::unique_ptr<PipeRelay>> create(
            std::ostream& os, const NullableOStream<std::ostream>& err, const std::string& fqName);
    ~PipeRelay();

    status_t initCheck() const;

    // Returns the file descriptor corresponding to the "write"-end of the
    // connection.
    int fd() const;
    android::base::borrowed_fd fd() const { return mWrite; }

private:
    struct RelayThread;

    status_t mInitCheck;
    int mFds[2];
    sp<RelayThread> mThread;

    static void CloseFd(int *fd);

    PipeRelay() = default;
    DISALLOW_COPY_AND_ASSIGN(PipeRelay);
    static void thread(android::base::unique_fd rfd, android::base::unique_fd rfdTrigger,
                       std::ostream* out, const NullableOStream<std::ostream>* err,
                       std::string fqName);

    android::base::unique_fd mWrite;
    android::base::unique_fd mWriteTrigger;
    std::unique_ptr<std::thread> mThread;
};

}  // namespace lshal