Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e3dab2d9 authored by Yi Jin's avatar Yi Jin
Browse files

Use unique_fd with the clear ownership.

FdBuffer won't take ownership of the `main` fd.
It only enforces transfer ownership in readProcessedDataInStream.

Bug: 74021345
Test: atest incidentd_test
Change-Id: I6182730241c81c34b3be865b827a2d3e8c10c21c
parent ab34199e
Loading
Loading
Loading
Loading
+31 −31
Original line number Diff line number Diff line
@@ -34,11 +34,11 @@ FdBuffer::FdBuffer()

FdBuffer::~FdBuffer() {}

status_t FdBuffer::read(unique_fd* fd, int64_t timeout) {
    struct pollfd pfds = {.fd = fd->get(), .events = POLLIN};
status_t FdBuffer::read(int fd, int64_t timeout) {
    struct pollfd pfds = {.fd = fd, .events = POLLIN};
    mStartTime = uptimeMillis();

    fcntl(fd->get(), F_SETFL, fcntl(fd->get(), F_GETFL, 0) | O_NONBLOCK);
    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);

    while (true) {
        if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
@@ -67,16 +67,16 @@ status_t FdBuffer::read(unique_fd* fd, int64_t timeout) {
                VLOG("return event has error %s", strerror(errno));
                return errno != 0 ? -errno : UNKNOWN_ERROR;
            } else {
                ssize_t amt = ::read(fd->get(), mBuffer.writeBuffer(), mBuffer.currentToWrite());
                ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
                if (amt < 0) {
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        continue;
                    } else {
                        VLOG("Fail to read %d: %s", fd->get(), strerror(errno));
                        VLOG("Fail to read %d: %s", fd, strerror(errno));
                        return -errno;
                    }
                } else if (amt == 0) {
                    VLOG("Reached EOF of fd=%d", fd->get());
                    VLOG("Reached EOF of fd=%d", fd);
                    break;
                }
                mBuffer.wp()->move(amt);
@@ -87,7 +87,7 @@ status_t FdBuffer::read(unique_fd* fd, int64_t timeout) {
    return NO_ERROR;
}

status_t FdBuffer::readFully(unique_fd* fd) {
status_t FdBuffer::readFully(int fd) {
    mStartTime = uptimeMillis();

    while (true) {
@@ -99,10 +99,10 @@ status_t FdBuffer::readFully(unique_fd* fd) {
        }
        if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;

        ssize_t amt = TEMP_FAILURE_RETRY(
                ::read(fd->get(), mBuffer.writeBuffer(), mBuffer.currentToWrite()));
        ssize_t amt =
                TEMP_FAILURE_RETRY(::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite()));
        if (amt < 0) {
            VLOG("Fail to read %d: %s", fd->get(), strerror(errno));
            VLOG("Fail to read %d: %s", fd, strerror(errno));
            return -errno;
        } else if (amt == 0) {
            VLOG("Done reading %zu bytes", mBuffer.size());
@@ -116,20 +116,20 @@ status_t FdBuffer::readFully(unique_fd* fd) {
    return NO_ERROR;
}

status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, unique_fd* fromFd,
status_t FdBuffer::readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd,
                                             int64_t timeoutMs, const bool isSysfs) {
    struct pollfd pfds[] = {
            {.fd = fd->get(), .events = POLLIN},
            {.fd = toFd->get(), .events = POLLOUT},
            {.fd = fromFd->get(), .events = POLLIN},
            {.fd = fd, .events = POLLIN},
            {.fd = toFd.get(), .events = POLLOUT},
            {.fd = fromFd.get(), .events = POLLIN},
    };

    mStartTime = uptimeMillis();

    // mark all fds non blocking
    fcntl(fd->get(), F_SETFL, fcntl(fd->get(), F_GETFL, 0) | O_NONBLOCK);
    fcntl(toFd->get(), F_SETFL, fcntl(toFd->get(), F_GETFL, 0) | O_NONBLOCK);
    fcntl(fromFd->get(), F_SETFL, fcntl(fromFd->get(), F_GETFL, 0) | O_NONBLOCK);
    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
    fcntl(toFd.get(), F_SETFL, fcntl(toFd.get(), F_GETFL, 0) | O_NONBLOCK);
    fcntl(fromFd.get(), F_SETFL, fcntl(fromFd.get(), F_GETFL, 0) | O_NONBLOCK);

    // A circular buffer holds data read from fd and writes to parsing process
    uint8_t cirBuf[BUFFER_SIZE];
@@ -166,10 +166,10 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni
        for (int i = 0; i < 3; ++i) {
            if ((pfds[i].revents & POLLERR) != 0) {
                if (i == 0 && isSysfs) {
                    VLOG("fd %d is sysfs, ignore its POLLERR return value", fd->get());
                    VLOG("fd %d is sysfs, ignore its POLLERR return value", fd);
                    continue;
                }
                VLOG("fd[%d]=%d returns error events: %s", i, fd->get(), strerror(errno));
                VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno));
                return errno != 0 ? -errno : UNKNOWN_ERROR;
            }
        }
@@ -178,17 +178,17 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni
        if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
            ssize_t amt;
            if (rpos >= wpos) {
                amt = ::read(fd->get(), cirBuf + rpos, BUFFER_SIZE - rpos);
                amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos);
            } else {
                amt = ::read(fd->get(), cirBuf + rpos, wpos - rpos);
                amt = ::read(fd, cirBuf + rpos, wpos - rpos);
            }
            if (amt < 0) {
                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
                    VLOG("Fail to read fd %d: %s", fd->get(), strerror(errno));
                    VLOG("Fail to read fd %d: %s", fd, strerror(errno));
                    return -errno;
                }  // otherwise just continue
            } else if (amt == 0) {
                VLOG("Reached EOF of input file %d", fd->get());
                VLOG("Reached EOF of input file %d", fd);
                pfds[0].fd = -1;  // reach EOF so don't have to poll pfds[0].
            } else {
                rpos += amt;
@@ -200,13 +200,13 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni
        if (cirSize > 0 && pfds[1].fd != -1) {
            ssize_t amt;
            if (rpos > wpos) {
                amt = ::write(toFd->get(), cirBuf + wpos, rpos - wpos);
                amt = ::write(toFd.get(), cirBuf + wpos, rpos - wpos);
            } else {
                amt = ::write(toFd->get(), cirBuf + wpos, BUFFER_SIZE - wpos);
                amt = ::write(toFd.get(), cirBuf + wpos, BUFFER_SIZE - wpos);
            }
            if (amt < 0) {
                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
                    VLOG("Fail to write toFd %d: %s", toFd->get(), strerror(errno));
                    VLOG("Fail to write toFd.get() %d: %s", toFd.get(), strerror(errno));
                    return -errno;
                }  // otherwise just continue
            } else {
@@ -217,8 +217,8 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni

        // if buffer is empty and fd is closed, close write fd.
        if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
            VLOG("Close write pipe %d", toFd->get());
            toFd->reset();
            VLOG("Close write pipe %d", toFd.get());
            toFd.reset();
            pfds[1].fd = -1;
        }

@@ -231,14 +231,14 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni
        }

        // read from parsing process
        ssize_t amt = ::read(fromFd->get(), mBuffer.writeBuffer(), mBuffer.currentToWrite());
        ssize_t amt = ::read(fromFd.get(), mBuffer.writeBuffer(), mBuffer.currentToWrite());
        if (amt < 0) {
            if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
                VLOG("Fail to read fromFd %d: %s", fromFd->get(), strerror(errno));
                VLOG("Fail to read fromFd.get() %d: %s", fromFd.get(), strerror(errno));
                return -errno;
            }  // otherwise just continue
        } else if (amt == 0) {
            VLOG("Reached EOF of fromFd %d", fromFd->get());
            VLOG("Reached EOF of fromFd.get() %d", fromFd.get());
            break;
        } else {
            mBuffer.wp()->move(amt);
+4 −4
Original line number Diff line number Diff line
@@ -40,13 +40,13 @@ public:
     * Returns NO_ERROR if there were no errors or if we timed out.
     * Will mark the file O_NONBLOCK.
     */
    status_t read(unique_fd* fd, int64_t timeoutMs);
    status_t read(int fd, int64_t timeoutMs);

    /**
     * Read the data until we hit eof.
     * Returns NO_ERROR if there were no errors.
     */
    status_t readFully(unique_fd* fd);
    status_t readFully(int fd);

    /**
     * Read processed results by streaming data to a parsing process, e.g. incident helper.
@@ -58,8 +58,8 @@ public:
     *
     * Poll will return POLLERR if fd is from sysfs, handle this edge case.
     */
    status_t readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, unique_fd* fromFd,
                                       int64_t timeoutMs, const bool isSysfs = false);
    status_t readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd, int64_t timeoutMs,
                                       const bool isSysfs = false);

    /**
     * Whether we timed out.
+1 −2
Original line number Diff line number Diff line
@@ -352,8 +352,7 @@ status_t IncidentService::cmd_privacy(FILE* in, FILE* out, FILE* err, Vector<Str
            printPrivacy(p, out, String8(""));
        } else if (opt == "parse") {
            FdBuffer buf;
            unique_fd infd(fileno(in));
            status_t error = buf.read(&infd, 60000);
            status_t error = buf.read(fileno(in), 60000);
            if (error != NO_ERROR) {
                fprintf(err, "Error reading from stdin\n");
                return error;
+9 −8
Original line number Diff line number Diff line
@@ -264,8 +264,9 @@ status_t FileSection::Execute(ReportRequestSet* requests) const {
    }

    // parent process
    status_t readStatus = buffer.readProcessedDataInStream(
            &fd, &p2cPipe.writeFd(), &c2pPipe.readFd(), this->timeoutMs, mIsSysfs);
    status_t readStatus = buffer.readProcessedDataInStream(fd.get(), std::move(p2cPipe.writeFd()),
                                                           std::move(c2pPipe.readFd()),
                                                           this->timeoutMs, mIsSysfs);

    if (readStatus != NO_ERROR || buffer.timedOut()) {
        ALOGW("FileSection '%s' failed to read data from incident helper: %s, timedout: %s",
@@ -354,9 +355,9 @@ status_t GZipSection::Execute(ReportRequestSet* requests) const {
    VLOG("GZipSection '%s' editPos=%zd, dataBeginAt=%zd", this->name.string(), editPos,
         dataBeginAt);

    status_t readStatus =
            buffer.readProcessedDataInStream(&fd, &p2cPipe.writeFd(), &c2pPipe.readFd(),
                                             this->timeoutMs, isSysfs(mFilenames[index]));
    status_t readStatus = buffer.readProcessedDataInStream(
            fd.get(), std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), this->timeoutMs,
            isSysfs(mFilenames[index]));

    if (readStatus != NO_ERROR || buffer.timedOut()) {
        ALOGW("GZipSection '%s' failed to read data from gzip: %s, timedout: %s",
@@ -466,7 +467,7 @@ status_t WorkerThreadSection::Execute(ReportRequestSet* requests) const {
    pthread_attr_destroy(&attr);

    // Loop reading until either the timeout or the worker side is done (i.e. eof).
    err = buffer.read(&data->pipe.readFd(), this->timeoutMs);
    err = buffer.read(data->pipe.readFd().get(), this->timeoutMs);
    if (err != NO_ERROR) {
        // TODO: Log this error into the incident report.
        ALOGW("WorkerThreadSection '%s' reader failed with error '%s'", this->name.string(),
@@ -573,7 +574,7 @@ status_t CommandSection::Execute(ReportRequestSet* requests) const {
    }

    cmdPipe.writeFd().reset();
    status_t readStatus = buffer.read(&ihPipe.readFd(), this->timeoutMs);
    status_t readStatus = buffer.read(ihPipe.readFd().get(), this->timeoutMs);
    if (readStatus != NO_ERROR || buffer.timedOut()) {
        ALOGW("CommandSection '%s' failed to read data from incident helper: %s, timedout: %s",
              this->name.string(), strerror(-readStatus), buffer.timedOut() ? "true" : "false");
@@ -892,7 +893,7 @@ status_t TombstoneSection::BlockingCall(int pipeWriteFd) const {
        // Parent process.
        // Read from the pipe concurrently to avoid blocking the child.
        FdBuffer buffer;
        err = buffer.readFully(&dumpPipe.readFd());
        err = buffer.readFully(dumpPipe.readFd().get());
        if (err != NO_ERROR) {
            ALOGW("TombstoneSection '%s' failed to read stack dump: %d", this->name.string(), err);
            dumpPipe.readFd().reset();
+24 −21
Original line number Diff line number Diff line
@@ -37,7 +37,6 @@ class FdBufferTest : public Test {
public:
    virtual void SetUp() override {
        ASSERT_NE(tf.fd, -1);
        tffd.reset(tf.fd);
        ASSERT_NE(p2cPipe.init(), -1);
        ASSERT_NE(c2pPipe.init(), -1);
    }
@@ -57,13 +56,13 @@ public:
        EXPECT_EQ(expected[i], '\0');
    }

    bool DoDataStream(unique_fd* rFd, unique_fd* wFd) {
    bool DoDataStream(const unique_fd& rFd, const unique_fd& wFd) {
        char buf[BUFFER_SIZE];
        ssize_t nRead;
        while ((nRead = read(rFd->get(), buf, BUFFER_SIZE)) > 0) {
        while ((nRead = read(rFd.get(), buf, BUFFER_SIZE)) > 0) {
            ssize_t nWritten = 0;
            while (nWritten < nRead) {
                ssize_t amt = write(wFd->get(), buf + nWritten, nRead - nWritten);
                ssize_t amt = write(wFd.get(), buf + nWritten, nRead - nWritten);
                if (amt < 0) {
                    return false;
                }
@@ -76,7 +75,6 @@ public:
protected:
    FdBuffer buffer;
    TemporaryFile tf;
    unique_fd tffd;
    Fpipe p2cPipe;
    Fpipe c2pPipe;

@@ -87,7 +85,7 @@ protected:
TEST_F(FdBufferTest, ReadAndWrite) {
    std::string testdata = "FdBuffer test string";
    ASSERT_TRUE(WriteStringToFile(testdata, tf.path));
    ASSERT_EQ(NO_ERROR, buffer.read(&tffd, READ_TIMEOUT));
    ASSERT_EQ(NO_ERROR, buffer.read(tf.fd, READ_TIMEOUT));
    AssertBufferReadSuccessful(testdata.size());
    AssertBufferContent(testdata.c_str());
}
@@ -100,7 +98,7 @@ TEST_F(FdBufferTest, IterateEmpty) {
TEST_F(FdBufferTest, ReadAndIterate) {
    std::string testdata = "FdBuffer test string";
    ASSERT_TRUE(WriteStringToFile(testdata, tf.path));
    ASSERT_EQ(NO_ERROR, buffer.read(&tffd, READ_TIMEOUT));
    ASSERT_EQ(NO_ERROR, buffer.read(tf.fd, READ_TIMEOUT));

    int i = 0;
    EncodedBuffer::iterator it = buffer.data();
@@ -128,7 +126,7 @@ TEST_F(FdBufferTest, ReadTimeout) {
    } else {
        c2pPipe.writeFd().reset();

        status_t status = buffer.read(&c2pPipe.readFd(), QUICK_TIMEOUT_MS);
        status_t status = buffer.read(c2pPipe.readFd().get(), QUICK_TIMEOUT_MS);
        ASSERT_EQ(NO_ERROR, status);
        EXPECT_TRUE(buffer.timedOut());

@@ -148,7 +146,7 @@ TEST_F(FdBufferTest, ReadInStreamAndWrite) {
        p2cPipe.writeFd().reset();
        c2pPipe.readFd().reset();
        ASSERT_TRUE(WriteStringToFd(HEAD, c2pPipe.writeFd()));
        ASSERT_TRUE(DoDataStream(&p2cPipe.readFd(), &c2pPipe.writeFd()));
        ASSERT_TRUE(DoDataStream(p2cPipe.readFd(), c2pPipe.writeFd()));
        p2cPipe.readFd().reset();
        c2pPipe.writeFd().reset();
        // Must exit here otherwise the child process will continue executing the test binary.
@@ -157,8 +155,9 @@ TEST_F(FdBufferTest, ReadInStreamAndWrite) {
        p2cPipe.readFd().reset();
        c2pPipe.writeFd().reset();

        ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(),
                                                             &c2pPipe.readFd(), READ_TIMEOUT));
        ASSERT_EQ(NO_ERROR,
                  buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()),
                                                   std::move(c2pPipe.readFd()), READ_TIMEOUT));
        AssertBufferReadSuccessful(HEAD.size() + testdata.size());
        AssertBufferContent(expected.c_str());
        wait(&pid);
@@ -189,8 +188,9 @@ TEST_F(FdBufferTest, ReadInStreamAndWriteAllAtOnce) {
        p2cPipe.readFd().reset();
        c2pPipe.writeFd().reset();

        ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(),
                                                             &c2pPipe.readFd(), READ_TIMEOUT));
        ASSERT_EQ(NO_ERROR,
                  buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()),
                                                   std::move(c2pPipe.readFd()), READ_TIMEOUT));
        AssertBufferReadSuccessful(HEAD.size() + testdata.size());
        AssertBufferContent(expected.c_str());
        wait(&pid);
@@ -206,7 +206,7 @@ TEST_F(FdBufferTest, ReadInStreamEmpty) {
    if (pid == 0) {
        p2cPipe.writeFd().reset();
        c2pPipe.readFd().reset();
        ASSERT_TRUE(DoDataStream(&p2cPipe.readFd(), &c2pPipe.writeFd()));
        ASSERT_TRUE(DoDataStream(p2cPipe.readFd(), c2pPipe.writeFd()));
        p2cPipe.readFd().reset();
        c2pPipe.writeFd().reset();
        _exit(EXIT_SUCCESS);
@@ -214,8 +214,9 @@ TEST_F(FdBufferTest, ReadInStreamEmpty) {
        p2cPipe.readFd().reset();
        c2pPipe.writeFd().reset();

        ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(),
                                                             &c2pPipe.readFd(), READ_TIMEOUT));
        ASSERT_EQ(NO_ERROR,
                  buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()),
                                                   std::move(c2pPipe.readFd()), READ_TIMEOUT));
        AssertBufferReadSuccessful(0);
        AssertBufferContent("");
        wait(&pid);
@@ -233,7 +234,7 @@ TEST_F(FdBufferTest, ReadInStreamMoreThan4MB) {
    if (pid == 0) {
        p2cPipe.writeFd().reset();
        c2pPipe.readFd().reset();
        ASSERT_TRUE(DoDataStream(&p2cPipe.readFd(), &c2pPipe.writeFd()));
        ASSERT_TRUE(DoDataStream(p2cPipe.readFd(), c2pPipe.writeFd()));
        p2cPipe.readFd().reset();
        c2pPipe.writeFd().reset();
        _exit(EXIT_SUCCESS);
@@ -241,8 +242,9 @@ TEST_F(FdBufferTest, ReadInStreamMoreThan4MB) {
        p2cPipe.readFd().reset();
        c2pPipe.writeFd().reset();

        ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&fd, &p2cPipe.writeFd(),
                                                             &c2pPipe.readFd(), READ_TIMEOUT));
        ASSERT_EQ(NO_ERROR,
                  buffer.readProcessedDataInStream(fd, std::move(p2cPipe.writeFd()),
                                                   std::move(c2pPipe.readFd()), READ_TIMEOUT));
        EXPECT_EQ(buffer.size(), fourMB);
        EXPECT_FALSE(buffer.timedOut());
        EXPECT_TRUE(buffer.truncated());
@@ -278,8 +280,9 @@ TEST_F(FdBufferTest, ReadInStreamTimeOut) {
        p2cPipe.readFd().reset();
        c2pPipe.writeFd().reset();

        ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(),
                                                             &c2pPipe.readFd(), QUICK_TIMEOUT_MS));
        ASSERT_EQ(NO_ERROR,
                  buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()),
                                                   std::move(c2pPipe.readFd()), QUICK_TIMEOUT_MS));
        EXPECT_TRUE(buffer.timedOut());
        kill(pid, SIGKILL);  // reap the child process
    }
Loading