Loading cmds/incidentd/src/FdBuffer.cpp +31 −31 Original line number Original line Diff line number Diff line Loading @@ -34,11 +34,11 @@ FdBuffer::FdBuffer() FdBuffer::~FdBuffer() {} FdBuffer::~FdBuffer() {} status_t FdBuffer::read(unique_fd* fd, int64_t timeout) { status_t FdBuffer::read(int fd, int64_t timeout) { struct pollfd pfds = {.fd = fd->get(), .events = POLLIN}; struct pollfd pfds = {.fd = fd, .events = POLLIN}; mStartTime = uptimeMillis(); mStartTime = uptimeMillis(); fcntl(fd->get(), F_SETFL, fcntl(fd->get(), F_GETFL, 0) | O_NONBLOCK); fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); while (true) { while (true) { if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { Loading Loading @@ -67,16 +67,16 @@ status_t FdBuffer::read(unique_fd* fd, int64_t timeout) { VLOG("return event has error %s", strerror(errno)); VLOG("return event has error %s", strerror(errno)); return errno != 0 ? -errno : UNKNOWN_ERROR; return errno != 0 ? -errno : UNKNOWN_ERROR; } else { } else { ssize_t amt = ::read(fd->get(), mBuffer.writeBuffer(), mBuffer.currentToWrite()); ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite()); if (amt < 0) { if (amt < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EAGAIN || errno == EWOULDBLOCK) { continue; continue; } else { } else { VLOG("Fail to read %d: %s", fd->get(), strerror(errno)); VLOG("Fail to read %d: %s", fd, strerror(errno)); return -errno; return -errno; } } } else if (amt == 0) { } else if (amt == 0) { VLOG("Reached EOF of fd=%d", fd->get()); VLOG("Reached EOF of fd=%d", fd); break; break; } } mBuffer.wp()->move(amt); mBuffer.wp()->move(amt); Loading @@ -87,7 +87,7 @@ status_t FdBuffer::read(unique_fd* fd, int64_t timeout) { return NO_ERROR; return NO_ERROR; } } status_t FdBuffer::readFully(unique_fd* fd) { status_t FdBuffer::readFully(int fd) { mStartTime = uptimeMillis(); mStartTime = uptimeMillis(); while (true) { while (true) { Loading @@ -99,10 +99,10 @@ status_t FdBuffer::readFully(unique_fd* fd) { } } if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; ssize_t amt = TEMP_FAILURE_RETRY( ssize_t amt = ::read(fd->get(), mBuffer.writeBuffer(), mBuffer.currentToWrite())); TEMP_FAILURE_RETRY(::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite())); if (amt < 0) { if (amt < 0) { VLOG("Fail to read %d: %s", fd->get(), strerror(errno)); VLOG("Fail to read %d: %s", fd, strerror(errno)); return -errno; return -errno; } else if (amt == 0) { } else if (amt == 0) { VLOG("Done reading %zu bytes", mBuffer.size()); VLOG("Done reading %zu bytes", mBuffer.size()); Loading @@ -116,20 +116,20 @@ status_t FdBuffer::readFully(unique_fd* fd) { return NO_ERROR; return NO_ERROR; } } status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, unique_fd* fromFd, status_t FdBuffer::readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd, int64_t timeoutMs, const bool isSysfs) { int64_t timeoutMs, const bool isSysfs) { struct pollfd pfds[] = { struct pollfd pfds[] = { {.fd = fd->get(), .events = POLLIN}, {.fd = fd, .events = POLLIN}, {.fd = toFd->get(), .events = POLLOUT}, {.fd = toFd.get(), .events = POLLOUT}, {.fd = fromFd->get(), .events = POLLIN}, {.fd = fromFd.get(), .events = POLLIN}, }; }; mStartTime = uptimeMillis(); mStartTime = uptimeMillis(); // mark all fds non blocking // mark all fds non blocking fcntl(fd->get(), F_SETFL, fcntl(fd->get(), F_GETFL, 0) | O_NONBLOCK); fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); fcntl(toFd->get(), F_SETFL, fcntl(toFd->get(), F_GETFL, 0) | O_NONBLOCK); fcntl(toFd.get(), F_SETFL, fcntl(toFd.get(), F_GETFL, 0) | O_NONBLOCK); fcntl(fromFd->get(), F_SETFL, fcntl(fromFd->get(), F_GETFL, 0) | O_NONBLOCK); fcntl(fromFd.get(), F_SETFL, fcntl(fromFd.get(), F_GETFL, 0) | O_NONBLOCK); // A circular buffer holds data read from fd and writes to parsing process // A circular buffer holds data read from fd and writes to parsing process uint8_t cirBuf[BUFFER_SIZE]; uint8_t cirBuf[BUFFER_SIZE]; Loading Loading @@ -166,10 +166,10 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) { if ((pfds[i].revents & POLLERR) != 0) { if ((pfds[i].revents & POLLERR) != 0) { if (i == 0 && isSysfs) { if (i == 0 && isSysfs) { VLOG("fd %d is sysfs, ignore its POLLERR return value", fd->get()); VLOG("fd %d is sysfs, ignore its POLLERR return value", fd); continue; continue; } } VLOG("fd[%d]=%d returns error events: %s", i, fd->get(), strerror(errno)); VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno)); return errno != 0 ? -errno : UNKNOWN_ERROR; return errno != 0 ? -errno : UNKNOWN_ERROR; } } } } Loading @@ -178,17 +178,17 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) { if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) { ssize_t amt; ssize_t amt; if (rpos >= wpos) { if (rpos >= wpos) { amt = ::read(fd->get(), cirBuf + rpos, BUFFER_SIZE - rpos); amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos); } else { } else { amt = ::read(fd->get(), cirBuf + rpos, wpos - rpos); amt = ::read(fd, cirBuf + rpos, wpos - rpos); } } if (amt < 0) { if (amt < 0) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { VLOG("Fail to read fd %d: %s", fd->get(), strerror(errno)); VLOG("Fail to read fd %d: %s", fd, strerror(errno)); return -errno; return -errno; } // otherwise just continue } // otherwise just continue } else if (amt == 0) { } else if (amt == 0) { VLOG("Reached EOF of input file %d", fd->get()); VLOG("Reached EOF of input file %d", fd); pfds[0].fd = -1; // reach EOF so don't have to poll pfds[0]. pfds[0].fd = -1; // reach EOF so don't have to poll pfds[0]. } else { } else { rpos += amt; rpos += amt; Loading @@ -200,13 +200,13 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni if (cirSize > 0 && pfds[1].fd != -1) { if (cirSize > 0 && pfds[1].fd != -1) { ssize_t amt; ssize_t amt; if (rpos > wpos) { if (rpos > wpos) { amt = ::write(toFd->get(), cirBuf + wpos, rpos - wpos); amt = ::write(toFd.get(), cirBuf + wpos, rpos - wpos); } else { } else { amt = ::write(toFd->get(), cirBuf + wpos, BUFFER_SIZE - wpos); amt = ::write(toFd.get(), cirBuf + wpos, BUFFER_SIZE - wpos); } } if (amt < 0) { if (amt < 0) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { VLOG("Fail to write toFd %d: %s", toFd->get(), strerror(errno)); VLOG("Fail to write toFd.get() %d: %s", toFd.get(), strerror(errno)); return -errno; return -errno; } // otherwise just continue } // otherwise just continue } else { } else { Loading @@ -217,8 +217,8 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni // if buffer is empty and fd is closed, close write fd. // if buffer is empty and fd is closed, close write fd. if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) { if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) { VLOG("Close write pipe %d", toFd->get()); VLOG("Close write pipe %d", toFd.get()); toFd->reset(); toFd.reset(); pfds[1].fd = -1; pfds[1].fd = -1; } } Loading @@ -231,14 +231,14 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni } } // read from parsing process // read from parsing process ssize_t amt = ::read(fromFd->get(), mBuffer.writeBuffer(), mBuffer.currentToWrite()); ssize_t amt = ::read(fromFd.get(), mBuffer.writeBuffer(), mBuffer.currentToWrite()); if (amt < 0) { if (amt < 0) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { VLOG("Fail to read fromFd %d: %s", fromFd->get(), strerror(errno)); VLOG("Fail to read fromFd.get() %d: %s", fromFd.get(), strerror(errno)); return -errno; return -errno; } // otherwise just continue } // otherwise just continue } else if (amt == 0) { } else if (amt == 0) { VLOG("Reached EOF of fromFd %d", fromFd->get()); VLOG("Reached EOF of fromFd.get() %d", fromFd.get()); break; break; } else { } else { mBuffer.wp()->move(amt); mBuffer.wp()->move(amt); Loading cmds/incidentd/src/FdBuffer.h +4 −4 Original line number Original line Diff line number Diff line Loading @@ -40,13 +40,13 @@ public: * Returns NO_ERROR if there were no errors or if we timed out. * Returns NO_ERROR if there were no errors or if we timed out. * Will mark the file O_NONBLOCK. * Will mark the file O_NONBLOCK. */ */ status_t read(unique_fd* fd, int64_t timeoutMs); status_t read(int fd, int64_t timeoutMs); /** /** * Read the data until we hit eof. * Read the data until we hit eof. * Returns NO_ERROR if there were no errors. * Returns NO_ERROR if there were no errors. */ */ status_t readFully(unique_fd* fd); status_t readFully(int fd); /** /** * Read processed results by streaming data to a parsing process, e.g. incident helper. * Read processed results by streaming data to a parsing process, e.g. incident helper. Loading @@ -58,8 +58,8 @@ public: * * * Poll will return POLLERR if fd is from sysfs, handle this edge case. * Poll will return POLLERR if fd is from sysfs, handle this edge case. */ */ status_t readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, unique_fd* fromFd, status_t readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd, int64_t timeoutMs, int64_t timeoutMs, const bool isSysfs = false); const bool isSysfs = false); /** /** * Whether we timed out. * Whether we timed out. Loading cmds/incidentd/src/IncidentService.cpp +1 −2 Original line number Original line Diff line number Diff line Loading @@ -352,8 +352,7 @@ status_t IncidentService::cmd_privacy(FILE* in, FILE* out, FILE* err, Vector<Str printPrivacy(p, out, String8("")); printPrivacy(p, out, String8("")); } else if (opt == "parse") { } else if (opt == "parse") { FdBuffer buf; FdBuffer buf; unique_fd infd(fileno(in)); status_t error = buf.read(fileno(in), 60000); status_t error = buf.read(&infd, 60000); if (error != NO_ERROR) { if (error != NO_ERROR) { fprintf(err, "Error reading from stdin\n"); fprintf(err, "Error reading from stdin\n"); return error; return error; Loading cmds/incidentd/src/Section.cpp +9 −8 Original line number Original line Diff line number Diff line Loading @@ -264,8 +264,9 @@ status_t FileSection::Execute(ReportRequestSet* requests) const { } } // parent process // parent process status_t readStatus = buffer.readProcessedDataInStream( status_t readStatus = buffer.readProcessedDataInStream(fd.get(), std::move(p2cPipe.writeFd()), &fd, &p2cPipe.writeFd(), &c2pPipe.readFd(), this->timeoutMs, mIsSysfs); std::move(c2pPipe.readFd()), this->timeoutMs, mIsSysfs); if (readStatus != NO_ERROR || buffer.timedOut()) { if (readStatus != NO_ERROR || buffer.timedOut()) { ALOGW("FileSection '%s' failed to read data from incident helper: %s, timedout: %s", ALOGW("FileSection '%s' failed to read data from incident helper: %s, timedout: %s", Loading Loading @@ -356,9 +357,9 @@ status_t GZipSection::Execute(ReportRequestSet* requests) const { VLOG("GZipSection '%s' editPos=%zd, dataBeginAt=%zd", this->name.string(), editPos, VLOG("GZipSection '%s' editPos=%zd, dataBeginAt=%zd", this->name.string(), editPos, dataBeginAt); dataBeginAt); status_t readStatus = status_t readStatus = buffer.readProcessedDataInStream( buffer.readProcessedDataInStream(&fd, &p2cPipe.writeFd(), &c2pPipe.readFd(), fd.get(), std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), this->timeoutMs, this->timeoutMs, isSysfs(mFilenames[index])); isSysfs(mFilenames[index])); if (readStatus != NO_ERROR || buffer.timedOut()) { if (readStatus != NO_ERROR || buffer.timedOut()) { ALOGW("GZipSection '%s' failed to read data from gzip: %s, timedout: %s", ALOGW("GZipSection '%s' failed to read data from gzip: %s, timedout: %s", Loading Loading @@ -468,7 +469,7 @@ status_t WorkerThreadSection::Execute(ReportRequestSet* requests) const { pthread_attr_destroy(&attr); pthread_attr_destroy(&attr); // Loop reading until either the timeout or the worker side is done (i.e. eof). // Loop reading until either the timeout or the worker side is done (i.e. eof). err = buffer.read(&data->pipe.readFd(), this->timeoutMs); err = buffer.read(data->pipe.readFd().get(), this->timeoutMs); if (err != NO_ERROR) { if (err != NO_ERROR) { // TODO: Log this error into the incident report. // TODO: Log this error into the incident report. ALOGW("WorkerThreadSection '%s' reader failed with error '%s'", this->name.string(), ALOGW("WorkerThreadSection '%s' reader failed with error '%s'", this->name.string(), Loading Loading @@ -575,7 +576,7 @@ status_t CommandSection::Execute(ReportRequestSet* requests) const { } } cmdPipe.writeFd().reset(); cmdPipe.writeFd().reset(); status_t readStatus = buffer.read(&ihPipe.readFd(), this->timeoutMs); status_t readStatus = buffer.read(ihPipe.readFd().get(), this->timeoutMs); if (readStatus != NO_ERROR || buffer.timedOut()) { if (readStatus != NO_ERROR || buffer.timedOut()) { ALOGW("CommandSection '%s' failed to read data from incident helper: %s, timedout: %s", ALOGW("CommandSection '%s' failed to read data from incident helper: %s, timedout: %s", this->name.string(), strerror(-readStatus), buffer.timedOut() ? "true" : "false"); this->name.string(), strerror(-readStatus), buffer.timedOut() ? "true" : "false"); Loading Loading @@ -894,7 +895,7 @@ status_t TombstoneSection::BlockingCall(int pipeWriteFd) const { // Parent process. // Parent process. // Read from the pipe concurrently to avoid blocking the child. // Read from the pipe concurrently to avoid blocking the child. FdBuffer buffer; FdBuffer buffer; err = buffer.readFully(&dumpPipe.readFd()); err = buffer.readFully(dumpPipe.readFd().get()); if (err != NO_ERROR) { if (err != NO_ERROR) { ALOGW("TombstoneSection '%s' failed to read stack dump: %d", this->name.string(), err); ALOGW("TombstoneSection '%s' failed to read stack dump: %d", this->name.string(), err); dumpPipe.readFd().reset(); dumpPipe.readFd().reset(); Loading cmds/incidentd/tests/FdBuffer_test.cpp +24 −21 Original line number Original line Diff line number Diff line Loading @@ -37,7 +37,6 @@ class FdBufferTest : public Test { public: public: virtual void SetUp() override { virtual void SetUp() override { ASSERT_NE(tf.fd, -1); ASSERT_NE(tf.fd, -1); tffd.reset(tf.fd); ASSERT_NE(p2cPipe.init(), -1); ASSERT_NE(p2cPipe.init(), -1); ASSERT_NE(c2pPipe.init(), -1); ASSERT_NE(c2pPipe.init(), -1); } } Loading @@ -57,13 +56,13 @@ public: EXPECT_EQ(expected[i], '\0'); EXPECT_EQ(expected[i], '\0'); } } bool DoDataStream(unique_fd* rFd, unique_fd* wFd) { bool DoDataStream(const unique_fd& rFd, const unique_fd& wFd) { char buf[BUFFER_SIZE]; char buf[BUFFER_SIZE]; ssize_t nRead; ssize_t nRead; while ((nRead = read(rFd->get(), buf, BUFFER_SIZE)) > 0) { while ((nRead = read(rFd.get(), buf, BUFFER_SIZE)) > 0) { ssize_t nWritten = 0; ssize_t nWritten = 0; while (nWritten < nRead) { while (nWritten < nRead) { ssize_t amt = write(wFd->get(), buf + nWritten, nRead - nWritten); ssize_t amt = write(wFd.get(), buf + nWritten, nRead - nWritten); if (amt < 0) { if (amt < 0) { return false; return false; } } Loading @@ -76,7 +75,6 @@ public: protected: protected: FdBuffer buffer; FdBuffer buffer; TemporaryFile tf; TemporaryFile tf; unique_fd tffd; Fpipe p2cPipe; Fpipe p2cPipe; Fpipe c2pPipe; Fpipe c2pPipe; Loading @@ -87,7 +85,7 @@ protected: TEST_F(FdBufferTest, ReadAndWrite) { TEST_F(FdBufferTest, ReadAndWrite) { std::string testdata = "FdBuffer test string"; std::string testdata = "FdBuffer test string"; ASSERT_TRUE(WriteStringToFile(testdata, tf.path)); ASSERT_TRUE(WriteStringToFile(testdata, tf.path)); ASSERT_EQ(NO_ERROR, buffer.read(&tffd, READ_TIMEOUT)); ASSERT_EQ(NO_ERROR, buffer.read(tf.fd, READ_TIMEOUT)); AssertBufferReadSuccessful(testdata.size()); AssertBufferReadSuccessful(testdata.size()); AssertBufferContent(testdata.c_str()); AssertBufferContent(testdata.c_str()); } } Loading @@ -100,7 +98,7 @@ TEST_F(FdBufferTest, IterateEmpty) { TEST_F(FdBufferTest, ReadAndIterate) { TEST_F(FdBufferTest, ReadAndIterate) { std::string testdata = "FdBuffer test string"; std::string testdata = "FdBuffer test string"; ASSERT_TRUE(WriteStringToFile(testdata, tf.path)); ASSERT_TRUE(WriteStringToFile(testdata, tf.path)); ASSERT_EQ(NO_ERROR, buffer.read(&tffd, READ_TIMEOUT)); ASSERT_EQ(NO_ERROR, buffer.read(tf.fd, READ_TIMEOUT)); int i = 0; int i = 0; EncodedBuffer::iterator it = buffer.data(); EncodedBuffer::iterator it = buffer.data(); Loading Loading @@ -128,7 +126,7 @@ TEST_F(FdBufferTest, ReadTimeout) { } else { } else { c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); status_t status = buffer.read(&c2pPipe.readFd(), QUICK_TIMEOUT_MS); status_t status = buffer.read(c2pPipe.readFd().get(), QUICK_TIMEOUT_MS); ASSERT_EQ(NO_ERROR, status); ASSERT_EQ(NO_ERROR, status); EXPECT_TRUE(buffer.timedOut()); EXPECT_TRUE(buffer.timedOut()); Loading @@ -148,7 +146,7 @@ TEST_F(FdBufferTest, ReadInStreamAndWrite) { p2cPipe.writeFd().reset(); p2cPipe.writeFd().reset(); c2pPipe.readFd().reset(); c2pPipe.readFd().reset(); ASSERT_TRUE(WriteStringToFd(HEAD, c2pPipe.writeFd())); ASSERT_TRUE(WriteStringToFd(HEAD, c2pPipe.writeFd())); ASSERT_TRUE(DoDataStream(&p2cPipe.readFd(), &c2pPipe.writeFd())); ASSERT_TRUE(DoDataStream(p2cPipe.readFd(), c2pPipe.writeFd())); p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); // Must exit here otherwise the child process will continue executing the test binary. // Must exit here otherwise the child process will continue executing the test binary. Loading @@ -157,8 +155,9 @@ TEST_F(FdBufferTest, ReadInStreamAndWrite) { p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(), ASSERT_EQ(NO_ERROR, &c2pPipe.readFd(), READ_TIMEOUT)); buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), READ_TIMEOUT)); AssertBufferReadSuccessful(HEAD.size() + testdata.size()); AssertBufferReadSuccessful(HEAD.size() + testdata.size()); AssertBufferContent(expected.c_str()); AssertBufferContent(expected.c_str()); wait(&pid); wait(&pid); Loading Loading @@ -189,8 +188,9 @@ TEST_F(FdBufferTest, ReadInStreamAndWriteAllAtOnce) { p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(), ASSERT_EQ(NO_ERROR, &c2pPipe.readFd(), READ_TIMEOUT)); buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), READ_TIMEOUT)); AssertBufferReadSuccessful(HEAD.size() + testdata.size()); AssertBufferReadSuccessful(HEAD.size() + testdata.size()); AssertBufferContent(expected.c_str()); AssertBufferContent(expected.c_str()); wait(&pid); wait(&pid); Loading @@ -206,7 +206,7 @@ TEST_F(FdBufferTest, ReadInStreamEmpty) { if (pid == 0) { if (pid == 0) { p2cPipe.writeFd().reset(); p2cPipe.writeFd().reset(); c2pPipe.readFd().reset(); c2pPipe.readFd().reset(); ASSERT_TRUE(DoDataStream(&p2cPipe.readFd(), &c2pPipe.writeFd())); ASSERT_TRUE(DoDataStream(p2cPipe.readFd(), c2pPipe.writeFd())); p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); _exit(EXIT_SUCCESS); _exit(EXIT_SUCCESS); Loading @@ -214,8 +214,9 @@ TEST_F(FdBufferTest, ReadInStreamEmpty) { p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(), ASSERT_EQ(NO_ERROR, &c2pPipe.readFd(), READ_TIMEOUT)); buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), READ_TIMEOUT)); AssertBufferReadSuccessful(0); AssertBufferReadSuccessful(0); AssertBufferContent(""); AssertBufferContent(""); wait(&pid); wait(&pid); Loading @@ -233,7 +234,7 @@ TEST_F(FdBufferTest, ReadInStreamMoreThan4MB) { if (pid == 0) { if (pid == 0) { p2cPipe.writeFd().reset(); p2cPipe.writeFd().reset(); c2pPipe.readFd().reset(); c2pPipe.readFd().reset(); ASSERT_TRUE(DoDataStream(&p2cPipe.readFd(), &c2pPipe.writeFd())); ASSERT_TRUE(DoDataStream(p2cPipe.readFd(), c2pPipe.writeFd())); p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); _exit(EXIT_SUCCESS); _exit(EXIT_SUCCESS); Loading @@ -241,8 +242,9 @@ TEST_F(FdBufferTest, ReadInStreamMoreThan4MB) { p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&fd, &p2cPipe.writeFd(), ASSERT_EQ(NO_ERROR, &c2pPipe.readFd(), READ_TIMEOUT)); buffer.readProcessedDataInStream(fd, std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), READ_TIMEOUT)); EXPECT_EQ(buffer.size(), fourMB); EXPECT_EQ(buffer.size(), fourMB); EXPECT_FALSE(buffer.timedOut()); EXPECT_FALSE(buffer.timedOut()); EXPECT_TRUE(buffer.truncated()); EXPECT_TRUE(buffer.truncated()); Loading Loading @@ -278,8 +280,9 @@ TEST_F(FdBufferTest, ReadInStreamTimeOut) { p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(), ASSERT_EQ(NO_ERROR, &c2pPipe.readFd(), QUICK_TIMEOUT_MS)); buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), QUICK_TIMEOUT_MS)); EXPECT_TRUE(buffer.timedOut()); EXPECT_TRUE(buffer.timedOut()); kill(pid, SIGKILL); // reap the child process kill(pid, SIGKILL); // reap the child process } } Loading Loading
cmds/incidentd/src/FdBuffer.cpp +31 −31 Original line number Original line Diff line number Diff line Loading @@ -34,11 +34,11 @@ FdBuffer::FdBuffer() FdBuffer::~FdBuffer() {} FdBuffer::~FdBuffer() {} status_t FdBuffer::read(unique_fd* fd, int64_t timeout) { status_t FdBuffer::read(int fd, int64_t timeout) { struct pollfd pfds = {.fd = fd->get(), .events = POLLIN}; struct pollfd pfds = {.fd = fd, .events = POLLIN}; mStartTime = uptimeMillis(); mStartTime = uptimeMillis(); fcntl(fd->get(), F_SETFL, fcntl(fd->get(), F_GETFL, 0) | O_NONBLOCK); fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); while (true) { while (true) { if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { Loading Loading @@ -67,16 +67,16 @@ status_t FdBuffer::read(unique_fd* fd, int64_t timeout) { VLOG("return event has error %s", strerror(errno)); VLOG("return event has error %s", strerror(errno)); return errno != 0 ? -errno : UNKNOWN_ERROR; return errno != 0 ? -errno : UNKNOWN_ERROR; } else { } else { ssize_t amt = ::read(fd->get(), mBuffer.writeBuffer(), mBuffer.currentToWrite()); ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite()); if (amt < 0) { if (amt < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EAGAIN || errno == EWOULDBLOCK) { continue; continue; } else { } else { VLOG("Fail to read %d: %s", fd->get(), strerror(errno)); VLOG("Fail to read %d: %s", fd, strerror(errno)); return -errno; return -errno; } } } else if (amt == 0) { } else if (amt == 0) { VLOG("Reached EOF of fd=%d", fd->get()); VLOG("Reached EOF of fd=%d", fd); break; break; } } mBuffer.wp()->move(amt); mBuffer.wp()->move(amt); Loading @@ -87,7 +87,7 @@ status_t FdBuffer::read(unique_fd* fd, int64_t timeout) { return NO_ERROR; return NO_ERROR; } } status_t FdBuffer::readFully(unique_fd* fd) { status_t FdBuffer::readFully(int fd) { mStartTime = uptimeMillis(); mStartTime = uptimeMillis(); while (true) { while (true) { Loading @@ -99,10 +99,10 @@ status_t FdBuffer::readFully(unique_fd* fd) { } } if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; ssize_t amt = TEMP_FAILURE_RETRY( ssize_t amt = ::read(fd->get(), mBuffer.writeBuffer(), mBuffer.currentToWrite())); TEMP_FAILURE_RETRY(::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite())); if (amt < 0) { if (amt < 0) { VLOG("Fail to read %d: %s", fd->get(), strerror(errno)); VLOG("Fail to read %d: %s", fd, strerror(errno)); return -errno; return -errno; } else if (amt == 0) { } else if (amt == 0) { VLOG("Done reading %zu bytes", mBuffer.size()); VLOG("Done reading %zu bytes", mBuffer.size()); Loading @@ -116,20 +116,20 @@ status_t FdBuffer::readFully(unique_fd* fd) { return NO_ERROR; return NO_ERROR; } } status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, unique_fd* fromFd, status_t FdBuffer::readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd, int64_t timeoutMs, const bool isSysfs) { int64_t timeoutMs, const bool isSysfs) { struct pollfd pfds[] = { struct pollfd pfds[] = { {.fd = fd->get(), .events = POLLIN}, {.fd = fd, .events = POLLIN}, {.fd = toFd->get(), .events = POLLOUT}, {.fd = toFd.get(), .events = POLLOUT}, {.fd = fromFd->get(), .events = POLLIN}, {.fd = fromFd.get(), .events = POLLIN}, }; }; mStartTime = uptimeMillis(); mStartTime = uptimeMillis(); // mark all fds non blocking // mark all fds non blocking fcntl(fd->get(), F_SETFL, fcntl(fd->get(), F_GETFL, 0) | O_NONBLOCK); fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); fcntl(toFd->get(), F_SETFL, fcntl(toFd->get(), F_GETFL, 0) | O_NONBLOCK); fcntl(toFd.get(), F_SETFL, fcntl(toFd.get(), F_GETFL, 0) | O_NONBLOCK); fcntl(fromFd->get(), F_SETFL, fcntl(fromFd->get(), F_GETFL, 0) | O_NONBLOCK); fcntl(fromFd.get(), F_SETFL, fcntl(fromFd.get(), F_GETFL, 0) | O_NONBLOCK); // A circular buffer holds data read from fd and writes to parsing process // A circular buffer holds data read from fd and writes to parsing process uint8_t cirBuf[BUFFER_SIZE]; uint8_t cirBuf[BUFFER_SIZE]; Loading Loading @@ -166,10 +166,10 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) { if ((pfds[i].revents & POLLERR) != 0) { if ((pfds[i].revents & POLLERR) != 0) { if (i == 0 && isSysfs) { if (i == 0 && isSysfs) { VLOG("fd %d is sysfs, ignore its POLLERR return value", fd->get()); VLOG("fd %d is sysfs, ignore its POLLERR return value", fd); continue; continue; } } VLOG("fd[%d]=%d returns error events: %s", i, fd->get(), strerror(errno)); VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno)); return errno != 0 ? -errno : UNKNOWN_ERROR; return errno != 0 ? -errno : UNKNOWN_ERROR; } } } } Loading @@ -178,17 +178,17 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) { if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) { ssize_t amt; ssize_t amt; if (rpos >= wpos) { if (rpos >= wpos) { amt = ::read(fd->get(), cirBuf + rpos, BUFFER_SIZE - rpos); amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos); } else { } else { amt = ::read(fd->get(), cirBuf + rpos, wpos - rpos); amt = ::read(fd, cirBuf + rpos, wpos - rpos); } } if (amt < 0) { if (amt < 0) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { VLOG("Fail to read fd %d: %s", fd->get(), strerror(errno)); VLOG("Fail to read fd %d: %s", fd, strerror(errno)); return -errno; return -errno; } // otherwise just continue } // otherwise just continue } else if (amt == 0) { } else if (amt == 0) { VLOG("Reached EOF of input file %d", fd->get()); VLOG("Reached EOF of input file %d", fd); pfds[0].fd = -1; // reach EOF so don't have to poll pfds[0]. pfds[0].fd = -1; // reach EOF so don't have to poll pfds[0]. } else { } else { rpos += amt; rpos += amt; Loading @@ -200,13 +200,13 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni if (cirSize > 0 && pfds[1].fd != -1) { if (cirSize > 0 && pfds[1].fd != -1) { ssize_t amt; ssize_t amt; if (rpos > wpos) { if (rpos > wpos) { amt = ::write(toFd->get(), cirBuf + wpos, rpos - wpos); amt = ::write(toFd.get(), cirBuf + wpos, rpos - wpos); } else { } else { amt = ::write(toFd->get(), cirBuf + wpos, BUFFER_SIZE - wpos); amt = ::write(toFd.get(), cirBuf + wpos, BUFFER_SIZE - wpos); } } if (amt < 0) { if (amt < 0) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { VLOG("Fail to write toFd %d: %s", toFd->get(), strerror(errno)); VLOG("Fail to write toFd.get() %d: %s", toFd.get(), strerror(errno)); return -errno; return -errno; } // otherwise just continue } // otherwise just continue } else { } else { Loading @@ -217,8 +217,8 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni // if buffer is empty and fd is closed, close write fd. // if buffer is empty and fd is closed, close write fd. if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) { if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) { VLOG("Close write pipe %d", toFd->get()); VLOG("Close write pipe %d", toFd.get()); toFd->reset(); toFd.reset(); pfds[1].fd = -1; pfds[1].fd = -1; } } Loading @@ -231,14 +231,14 @@ status_t FdBuffer::readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, uni } } // read from parsing process // read from parsing process ssize_t amt = ::read(fromFd->get(), mBuffer.writeBuffer(), mBuffer.currentToWrite()); ssize_t amt = ::read(fromFd.get(), mBuffer.writeBuffer(), mBuffer.currentToWrite()); if (amt < 0) { if (amt < 0) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { VLOG("Fail to read fromFd %d: %s", fromFd->get(), strerror(errno)); VLOG("Fail to read fromFd.get() %d: %s", fromFd.get(), strerror(errno)); return -errno; return -errno; } // otherwise just continue } // otherwise just continue } else if (amt == 0) { } else if (amt == 0) { VLOG("Reached EOF of fromFd %d", fromFd->get()); VLOG("Reached EOF of fromFd.get() %d", fromFd.get()); break; break; } else { } else { mBuffer.wp()->move(amt); mBuffer.wp()->move(amt); Loading
cmds/incidentd/src/FdBuffer.h +4 −4 Original line number Original line Diff line number Diff line Loading @@ -40,13 +40,13 @@ public: * Returns NO_ERROR if there were no errors or if we timed out. * Returns NO_ERROR if there were no errors or if we timed out. * Will mark the file O_NONBLOCK. * Will mark the file O_NONBLOCK. */ */ status_t read(unique_fd* fd, int64_t timeoutMs); status_t read(int fd, int64_t timeoutMs); /** /** * Read the data until we hit eof. * Read the data until we hit eof. * Returns NO_ERROR if there were no errors. * Returns NO_ERROR if there were no errors. */ */ status_t readFully(unique_fd* fd); status_t readFully(int fd); /** /** * Read processed results by streaming data to a parsing process, e.g. incident helper. * Read processed results by streaming data to a parsing process, e.g. incident helper. Loading @@ -58,8 +58,8 @@ public: * * * Poll will return POLLERR if fd is from sysfs, handle this edge case. * Poll will return POLLERR if fd is from sysfs, handle this edge case. */ */ status_t readProcessedDataInStream(unique_fd* fd, unique_fd* toFd, unique_fd* fromFd, status_t readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd, int64_t timeoutMs, int64_t timeoutMs, const bool isSysfs = false); const bool isSysfs = false); /** /** * Whether we timed out. * Whether we timed out. Loading
cmds/incidentd/src/IncidentService.cpp +1 −2 Original line number Original line Diff line number Diff line Loading @@ -352,8 +352,7 @@ status_t IncidentService::cmd_privacy(FILE* in, FILE* out, FILE* err, Vector<Str printPrivacy(p, out, String8("")); printPrivacy(p, out, String8("")); } else if (opt == "parse") { } else if (opt == "parse") { FdBuffer buf; FdBuffer buf; unique_fd infd(fileno(in)); status_t error = buf.read(fileno(in), 60000); status_t error = buf.read(&infd, 60000); if (error != NO_ERROR) { if (error != NO_ERROR) { fprintf(err, "Error reading from stdin\n"); fprintf(err, "Error reading from stdin\n"); return error; return error; Loading
cmds/incidentd/src/Section.cpp +9 −8 Original line number Original line Diff line number Diff line Loading @@ -264,8 +264,9 @@ status_t FileSection::Execute(ReportRequestSet* requests) const { } } // parent process // parent process status_t readStatus = buffer.readProcessedDataInStream( status_t readStatus = buffer.readProcessedDataInStream(fd.get(), std::move(p2cPipe.writeFd()), &fd, &p2cPipe.writeFd(), &c2pPipe.readFd(), this->timeoutMs, mIsSysfs); std::move(c2pPipe.readFd()), this->timeoutMs, mIsSysfs); if (readStatus != NO_ERROR || buffer.timedOut()) { if (readStatus != NO_ERROR || buffer.timedOut()) { ALOGW("FileSection '%s' failed to read data from incident helper: %s, timedout: %s", ALOGW("FileSection '%s' failed to read data from incident helper: %s, timedout: %s", Loading Loading @@ -356,9 +357,9 @@ status_t GZipSection::Execute(ReportRequestSet* requests) const { VLOG("GZipSection '%s' editPos=%zd, dataBeginAt=%zd", this->name.string(), editPos, VLOG("GZipSection '%s' editPos=%zd, dataBeginAt=%zd", this->name.string(), editPos, dataBeginAt); dataBeginAt); status_t readStatus = status_t readStatus = buffer.readProcessedDataInStream( buffer.readProcessedDataInStream(&fd, &p2cPipe.writeFd(), &c2pPipe.readFd(), fd.get(), std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), this->timeoutMs, this->timeoutMs, isSysfs(mFilenames[index])); isSysfs(mFilenames[index])); if (readStatus != NO_ERROR || buffer.timedOut()) { if (readStatus != NO_ERROR || buffer.timedOut()) { ALOGW("GZipSection '%s' failed to read data from gzip: %s, timedout: %s", ALOGW("GZipSection '%s' failed to read data from gzip: %s, timedout: %s", Loading Loading @@ -468,7 +469,7 @@ status_t WorkerThreadSection::Execute(ReportRequestSet* requests) const { pthread_attr_destroy(&attr); pthread_attr_destroy(&attr); // Loop reading until either the timeout or the worker side is done (i.e. eof). // Loop reading until either the timeout or the worker side is done (i.e. eof). err = buffer.read(&data->pipe.readFd(), this->timeoutMs); err = buffer.read(data->pipe.readFd().get(), this->timeoutMs); if (err != NO_ERROR) { if (err != NO_ERROR) { // TODO: Log this error into the incident report. // TODO: Log this error into the incident report. ALOGW("WorkerThreadSection '%s' reader failed with error '%s'", this->name.string(), ALOGW("WorkerThreadSection '%s' reader failed with error '%s'", this->name.string(), Loading Loading @@ -575,7 +576,7 @@ status_t CommandSection::Execute(ReportRequestSet* requests) const { } } cmdPipe.writeFd().reset(); cmdPipe.writeFd().reset(); status_t readStatus = buffer.read(&ihPipe.readFd(), this->timeoutMs); status_t readStatus = buffer.read(ihPipe.readFd().get(), this->timeoutMs); if (readStatus != NO_ERROR || buffer.timedOut()) { if (readStatus != NO_ERROR || buffer.timedOut()) { ALOGW("CommandSection '%s' failed to read data from incident helper: %s, timedout: %s", ALOGW("CommandSection '%s' failed to read data from incident helper: %s, timedout: %s", this->name.string(), strerror(-readStatus), buffer.timedOut() ? "true" : "false"); this->name.string(), strerror(-readStatus), buffer.timedOut() ? "true" : "false"); Loading Loading @@ -894,7 +895,7 @@ status_t TombstoneSection::BlockingCall(int pipeWriteFd) const { // Parent process. // Parent process. // Read from the pipe concurrently to avoid blocking the child. // Read from the pipe concurrently to avoid blocking the child. FdBuffer buffer; FdBuffer buffer; err = buffer.readFully(&dumpPipe.readFd()); err = buffer.readFully(dumpPipe.readFd().get()); if (err != NO_ERROR) { if (err != NO_ERROR) { ALOGW("TombstoneSection '%s' failed to read stack dump: %d", this->name.string(), err); ALOGW("TombstoneSection '%s' failed to read stack dump: %d", this->name.string(), err); dumpPipe.readFd().reset(); dumpPipe.readFd().reset(); Loading
cmds/incidentd/tests/FdBuffer_test.cpp +24 −21 Original line number Original line Diff line number Diff line Loading @@ -37,7 +37,6 @@ class FdBufferTest : public Test { public: public: virtual void SetUp() override { virtual void SetUp() override { ASSERT_NE(tf.fd, -1); ASSERT_NE(tf.fd, -1); tffd.reset(tf.fd); ASSERT_NE(p2cPipe.init(), -1); ASSERT_NE(p2cPipe.init(), -1); ASSERT_NE(c2pPipe.init(), -1); ASSERT_NE(c2pPipe.init(), -1); } } Loading @@ -57,13 +56,13 @@ public: EXPECT_EQ(expected[i], '\0'); EXPECT_EQ(expected[i], '\0'); } } bool DoDataStream(unique_fd* rFd, unique_fd* wFd) { bool DoDataStream(const unique_fd& rFd, const unique_fd& wFd) { char buf[BUFFER_SIZE]; char buf[BUFFER_SIZE]; ssize_t nRead; ssize_t nRead; while ((nRead = read(rFd->get(), buf, BUFFER_SIZE)) > 0) { while ((nRead = read(rFd.get(), buf, BUFFER_SIZE)) > 0) { ssize_t nWritten = 0; ssize_t nWritten = 0; while (nWritten < nRead) { while (nWritten < nRead) { ssize_t amt = write(wFd->get(), buf + nWritten, nRead - nWritten); ssize_t amt = write(wFd.get(), buf + nWritten, nRead - nWritten); if (amt < 0) { if (amt < 0) { return false; return false; } } Loading @@ -76,7 +75,6 @@ public: protected: protected: FdBuffer buffer; FdBuffer buffer; TemporaryFile tf; TemporaryFile tf; unique_fd tffd; Fpipe p2cPipe; Fpipe p2cPipe; Fpipe c2pPipe; Fpipe c2pPipe; Loading @@ -87,7 +85,7 @@ protected: TEST_F(FdBufferTest, ReadAndWrite) { TEST_F(FdBufferTest, ReadAndWrite) { std::string testdata = "FdBuffer test string"; std::string testdata = "FdBuffer test string"; ASSERT_TRUE(WriteStringToFile(testdata, tf.path)); ASSERT_TRUE(WriteStringToFile(testdata, tf.path)); ASSERT_EQ(NO_ERROR, buffer.read(&tffd, READ_TIMEOUT)); ASSERT_EQ(NO_ERROR, buffer.read(tf.fd, READ_TIMEOUT)); AssertBufferReadSuccessful(testdata.size()); AssertBufferReadSuccessful(testdata.size()); AssertBufferContent(testdata.c_str()); AssertBufferContent(testdata.c_str()); } } Loading @@ -100,7 +98,7 @@ TEST_F(FdBufferTest, IterateEmpty) { TEST_F(FdBufferTest, ReadAndIterate) { TEST_F(FdBufferTest, ReadAndIterate) { std::string testdata = "FdBuffer test string"; std::string testdata = "FdBuffer test string"; ASSERT_TRUE(WriteStringToFile(testdata, tf.path)); ASSERT_TRUE(WriteStringToFile(testdata, tf.path)); ASSERT_EQ(NO_ERROR, buffer.read(&tffd, READ_TIMEOUT)); ASSERT_EQ(NO_ERROR, buffer.read(tf.fd, READ_TIMEOUT)); int i = 0; int i = 0; EncodedBuffer::iterator it = buffer.data(); EncodedBuffer::iterator it = buffer.data(); Loading Loading @@ -128,7 +126,7 @@ TEST_F(FdBufferTest, ReadTimeout) { } else { } else { c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); status_t status = buffer.read(&c2pPipe.readFd(), QUICK_TIMEOUT_MS); status_t status = buffer.read(c2pPipe.readFd().get(), QUICK_TIMEOUT_MS); ASSERT_EQ(NO_ERROR, status); ASSERT_EQ(NO_ERROR, status); EXPECT_TRUE(buffer.timedOut()); EXPECT_TRUE(buffer.timedOut()); Loading @@ -148,7 +146,7 @@ TEST_F(FdBufferTest, ReadInStreamAndWrite) { p2cPipe.writeFd().reset(); p2cPipe.writeFd().reset(); c2pPipe.readFd().reset(); c2pPipe.readFd().reset(); ASSERT_TRUE(WriteStringToFd(HEAD, c2pPipe.writeFd())); ASSERT_TRUE(WriteStringToFd(HEAD, c2pPipe.writeFd())); ASSERT_TRUE(DoDataStream(&p2cPipe.readFd(), &c2pPipe.writeFd())); ASSERT_TRUE(DoDataStream(p2cPipe.readFd(), c2pPipe.writeFd())); p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); // Must exit here otherwise the child process will continue executing the test binary. // Must exit here otherwise the child process will continue executing the test binary. Loading @@ -157,8 +155,9 @@ TEST_F(FdBufferTest, ReadInStreamAndWrite) { p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(), ASSERT_EQ(NO_ERROR, &c2pPipe.readFd(), READ_TIMEOUT)); buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), READ_TIMEOUT)); AssertBufferReadSuccessful(HEAD.size() + testdata.size()); AssertBufferReadSuccessful(HEAD.size() + testdata.size()); AssertBufferContent(expected.c_str()); AssertBufferContent(expected.c_str()); wait(&pid); wait(&pid); Loading Loading @@ -189,8 +188,9 @@ TEST_F(FdBufferTest, ReadInStreamAndWriteAllAtOnce) { p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(), ASSERT_EQ(NO_ERROR, &c2pPipe.readFd(), READ_TIMEOUT)); buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), READ_TIMEOUT)); AssertBufferReadSuccessful(HEAD.size() + testdata.size()); AssertBufferReadSuccessful(HEAD.size() + testdata.size()); AssertBufferContent(expected.c_str()); AssertBufferContent(expected.c_str()); wait(&pid); wait(&pid); Loading @@ -206,7 +206,7 @@ TEST_F(FdBufferTest, ReadInStreamEmpty) { if (pid == 0) { if (pid == 0) { p2cPipe.writeFd().reset(); p2cPipe.writeFd().reset(); c2pPipe.readFd().reset(); c2pPipe.readFd().reset(); ASSERT_TRUE(DoDataStream(&p2cPipe.readFd(), &c2pPipe.writeFd())); ASSERT_TRUE(DoDataStream(p2cPipe.readFd(), c2pPipe.writeFd())); p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); _exit(EXIT_SUCCESS); _exit(EXIT_SUCCESS); Loading @@ -214,8 +214,9 @@ TEST_F(FdBufferTest, ReadInStreamEmpty) { p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(), ASSERT_EQ(NO_ERROR, &c2pPipe.readFd(), READ_TIMEOUT)); buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), READ_TIMEOUT)); AssertBufferReadSuccessful(0); AssertBufferReadSuccessful(0); AssertBufferContent(""); AssertBufferContent(""); wait(&pid); wait(&pid); Loading @@ -233,7 +234,7 @@ TEST_F(FdBufferTest, ReadInStreamMoreThan4MB) { if (pid == 0) { if (pid == 0) { p2cPipe.writeFd().reset(); p2cPipe.writeFd().reset(); c2pPipe.readFd().reset(); c2pPipe.readFd().reset(); ASSERT_TRUE(DoDataStream(&p2cPipe.readFd(), &c2pPipe.writeFd())); ASSERT_TRUE(DoDataStream(p2cPipe.readFd(), c2pPipe.writeFd())); p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); _exit(EXIT_SUCCESS); _exit(EXIT_SUCCESS); Loading @@ -241,8 +242,9 @@ TEST_F(FdBufferTest, ReadInStreamMoreThan4MB) { p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&fd, &p2cPipe.writeFd(), ASSERT_EQ(NO_ERROR, &c2pPipe.readFd(), READ_TIMEOUT)); buffer.readProcessedDataInStream(fd, std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), READ_TIMEOUT)); EXPECT_EQ(buffer.size(), fourMB); EXPECT_EQ(buffer.size(), fourMB); EXPECT_FALSE(buffer.timedOut()); EXPECT_FALSE(buffer.timedOut()); EXPECT_TRUE(buffer.truncated()); EXPECT_TRUE(buffer.truncated()); Loading Loading @@ -278,8 +280,9 @@ TEST_F(FdBufferTest, ReadInStreamTimeOut) { p2cPipe.readFd().reset(); p2cPipe.readFd().reset(); c2pPipe.writeFd().reset(); c2pPipe.writeFd().reset(); ASSERT_EQ(NO_ERROR, buffer.readProcessedDataInStream(&tffd, &p2cPipe.writeFd(), ASSERT_EQ(NO_ERROR, &c2pPipe.readFd(), QUICK_TIMEOUT_MS)); buffer.readProcessedDataInStream(tf.fd, std::move(p2cPipe.writeFd()), std::move(c2pPipe.readFd()), QUICK_TIMEOUT_MS)); EXPECT_TRUE(buffer.timedOut()); EXPECT_TRUE(buffer.timedOut()); kill(pid, SIGKILL); // reap the child process kill(pid, SIGKILL); // reap the child process } } Loading