Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ae0418a1 authored by Alex Buynytskyy's avatar Alex Buynytskyy Committed by Automerger Merge Worker
Browse files

Merge "Wait for data to be available before reading." into sc-dev am: 527d6d6a

Original change: https://googleplex-android-review.googlesource.com/c/platform/frameworks/base/+/15182761

Change-Id: I94255bd217aa65e0f739b5797c90e3f0aac96ab3
parents c56fa470 527d6d6a
Loading
Loading
Loading
Loading
+77 −34
Original line number Diff line number Diff line
@@ -131,28 +131,6 @@ static bool sendRequest(int fd, RequestType requestType, FileIdx fileIdx = -1,
    return android::base::WriteFully(fd, &command, sizeof(command));
}

static int waitForDataOrSignal(int fd, int event_fd) {
    struct pollfd pfds[2] = {{fd, POLLIN, 0}, {event_fd, POLLIN, 0}};
    // Wait until either data is ready or stop signal is received
    int res = poll(pfds, 2, PollTimeoutMs);
    if (res == -1 && errno == EINTR) {
        // Treat it the same as timeout and allow the caller to retry.
        return 0;
    }
    if (res <= 0) {
        return res;
    }
    // First check if there is a stop signal
    if (pfds[1].revents == POLLIN) {
        return event_fd;
    }
    // Otherwise check if incoming data is ready
    if (pfds[0].revents == POLLIN) {
        return fd;
    }
    return -1;
}

static bool readChunk(int fd, std::vector<uint8_t>& data) {
    int32_t size;
    if (!android::base::ReadFully(fd, &size, sizeof(size))) {
@@ -382,7 +360,12 @@ static OnTraceChanged& onTraceChanged() {
class PMSCDataLoader : public android::dataloader::DataLoader {
public:
    PMSCDataLoader(JavaVM* jvm) : mJvm(jvm) { CHECK(mJvm); }
    ~PMSCDataLoader() { onTraceChanged().unregisterCallback(this); }
    ~PMSCDataLoader() {
        onTraceChanged().unregisterCallback(this);
        if (mReceiverThread.joinable()) {
            mReceiverThread.join();
        }
    }

    void updateReadLogsState(const bool enabled) {
        if (enabled != mReadLogsEnabled.exchange(enabled)) {
@@ -417,11 +400,7 @@ private:
            mReceiverThread.join();
        }
    }
    void onDestroy() final {
        onTraceChanged().unregisterCallback(this);
        // Make sure the receiver thread stopped.
        CHECK(!mReceiverThread.joinable());
    }
    void onDestroy() final {}

    // Installation.
    bool onPrepareImage(dataloader::DataLoaderInstallationFiles addedFiles) final {
@@ -573,6 +552,60 @@ private:
        return true;
    }

    enum class WaitResult {
        DataAvailable,
        Timeout,
        Failure,
        StopRequested,
    };

    WaitResult waitForData(int fd) {
        using Clock = std::chrono::steady_clock;
        using Milliseconds = std::chrono::milliseconds;

        auto pollTimeoutMs = PollTimeoutMs;
        const auto waitEnd = Clock::now() + Milliseconds(pollTimeoutMs);
        while (!mStopReceiving) {
            struct pollfd pfds[2] = {{fd, POLLIN, 0}, {mEventFd, POLLIN, 0}};
            // Wait until either data is ready or stop signal is received
            int res = poll(pfds, std::size(pfds), pollTimeoutMs);

            if (res < 0) {
                if (errno == EINTR) {
                    pollTimeoutMs = std::chrono::duration_cast<Milliseconds>(waitEnd - Clock::now())
                                            .count();
                    if (pollTimeoutMs < 0) {
                        return WaitResult::Timeout;
                    }
                    continue;
                }
                ALOGE("Failed to poll. Error %d", errno);
                return WaitResult::Failure;
            }

            if (res == 0) {
                return WaitResult::Timeout;
            }

            // First check if there is a stop signal
            if (pfds[1].revents == POLLIN) {
                ALOGE("DataLoader requested to stop.");
                return WaitResult::StopRequested;
            }
            // Otherwise check if incoming data is ready
            if (pfds[0].revents == POLLIN) {
                return WaitResult::DataAvailable;
            }

            // Invalid case, just fail.
            ALOGE("Failed to poll. Result %d", res);
            return WaitResult::Failure;
        }

        ALOGE("DataLoader requested to stop.");
        return WaitResult::StopRequested;
    }

    // Streaming.
    bool initStreaming(unique_fd inout, MetadataMode mode) {
        mEventFd.reset(eventfd(0, EFD_CLOEXEC));
@@ -582,6 +615,11 @@ private:
        }

        // Awaiting adb handshake.
        if (waitForData(inout) != WaitResult::DataAvailable) {
            ALOGE("Failure waiting for the handshake.");
            return false;
        }

        char okay_buf[OKAY.size()];
        if (!android::base::ReadFully(inout, okay_buf, OKAY.size())) {
            ALOGE("Failed to receive OKAY. Abort. Error %d", errno);
@@ -601,8 +639,14 @@ private:
            }
        }

        if (mStopReceiving) {
            ALOGE("DataLoader requested to stop.");
            return false;
        }

        mReceiverThread = std::thread(
                [this, io = std::move(inout), mode]() mutable { receiver(std::move(io), mode); });

        ALOGI("Started streaming...");
        return true;
    }
@@ -750,17 +794,16 @@ private:
        std::vector<IncFsDataBlock> instructions;
        std::unordered_map<FileIdx, unique_fd> writeFds;
        while (!mStopReceiving) {
            const int res = waitForDataOrSignal(inout, mEventFd);
            if (res == 0) {
            const auto res = waitForData(inout);
            if (res == WaitResult::Timeout) {
                continue;
            }
            if (res < 0) {
                ALOGE("Failed to poll. Abort. Error %d", res);
            if (res == WaitResult::Failure) {
                mStatusListener->reportStatus(DATA_LOADER_UNRECOVERABLE);
                break;
            }
            if (res == mEventFd) {
                ALOGE("DataLoader requested to stop. Sending EXIT to server.");
            if (res == WaitResult::StopRequested) {
                ALOGE("Sending EXIT to server.");
                sendRequest(inout, EXIT);
                break;
            }