Loading services/core/jni/com_android_server_pm_PackageManagerShellCommandDataLoader.cpp +77 −34 Original line number Diff line number Diff line Loading @@ -131,28 +131,6 @@ static bool sendRequest(int fd, RequestType requestType, FileIdx fileIdx = -1, return android::base::WriteFully(fd, &command, sizeof(command)); } static int waitForDataOrSignal(int fd, int event_fd) { struct pollfd pfds[2] = {{fd, POLLIN, 0}, {event_fd, POLLIN, 0}}; // Wait until either data is ready or stop signal is received int res = poll(pfds, 2, PollTimeoutMs); if (res == -1 && errno == EINTR) { // Treat it the same as timeout and allow the caller to retry. return 0; } if (res <= 0) { return res; } // First check if there is a stop signal if (pfds[1].revents == POLLIN) { return event_fd; } // Otherwise check if incoming data is ready if (pfds[0].revents == POLLIN) { return fd; } return -1; } static bool readChunk(int fd, std::vector<uint8_t>& data) { int32_t size; if (!android::base::ReadFully(fd, &size, sizeof(size))) { Loading Loading @@ -382,7 +360,12 @@ static OnTraceChanged& onTraceChanged() { class PMSCDataLoader : public android::dataloader::DataLoader { public: PMSCDataLoader(JavaVM* jvm) : mJvm(jvm) { CHECK(mJvm); } ~PMSCDataLoader() { onTraceChanged().unregisterCallback(this); } ~PMSCDataLoader() { onTraceChanged().unregisterCallback(this); if (mReceiverThread.joinable()) { mReceiverThread.join(); } } void updateReadLogsState(const bool enabled) { if (enabled != mReadLogsEnabled.exchange(enabled)) { Loading Loading @@ -417,11 +400,7 @@ private: mReceiverThread.join(); } } void onDestroy() final { onTraceChanged().unregisterCallback(this); // Make sure the receiver thread stopped. CHECK(!mReceiverThread.joinable()); } void onDestroy() final {} // Installation. bool onPrepareImage(dataloader::DataLoaderInstallationFiles addedFiles) final { Loading Loading @@ -573,6 +552,60 @@ private: return true; } enum class WaitResult { DataAvailable, Timeout, Failure, StopRequested, }; WaitResult waitForData(int fd) { using Clock = std::chrono::steady_clock; using Milliseconds = std::chrono::milliseconds; auto pollTimeoutMs = PollTimeoutMs; const auto waitEnd = Clock::now() + Milliseconds(pollTimeoutMs); while (!mStopReceiving) { struct pollfd pfds[2] = {{fd, POLLIN, 0}, {mEventFd, POLLIN, 0}}; // Wait until either data is ready or stop signal is received int res = poll(pfds, std::size(pfds), pollTimeoutMs); if (res < 0) { if (errno == EINTR) { pollTimeoutMs = std::chrono::duration_cast<Milliseconds>(waitEnd - Clock::now()) .count(); if (pollTimeoutMs < 0) { return WaitResult::Timeout; } continue; } ALOGE("Failed to poll. Error %d", errno); return WaitResult::Failure; } if (res == 0) { return WaitResult::Timeout; } // First check if there is a stop signal if (pfds[1].revents == POLLIN) { ALOGE("DataLoader requested to stop."); return WaitResult::StopRequested; } // Otherwise check if incoming data is ready if (pfds[0].revents == POLLIN) { return WaitResult::DataAvailable; } // Invalid case, just fail. ALOGE("Failed to poll. Result %d", res); return WaitResult::Failure; } ALOGE("DataLoader requested to stop."); return WaitResult::StopRequested; } // Streaming. bool initStreaming(unique_fd inout, MetadataMode mode) { mEventFd.reset(eventfd(0, EFD_CLOEXEC)); Loading @@ -582,6 +615,11 @@ private: } // Awaiting adb handshake. if (waitForData(inout) != WaitResult::DataAvailable) { ALOGE("Failure waiting for the handshake."); return false; } char okay_buf[OKAY.size()]; if (!android::base::ReadFully(inout, okay_buf, OKAY.size())) { ALOGE("Failed to receive OKAY. Abort. Error %d", errno); Loading @@ -601,8 +639,14 @@ private: } } if (mStopReceiving) { ALOGE("DataLoader requested to stop."); return false; } mReceiverThread = std::thread( [this, io = std::move(inout), mode]() mutable { receiver(std::move(io), mode); }); ALOGI("Started streaming..."); return true; } Loading Loading @@ -750,17 +794,16 @@ private: std::vector<IncFsDataBlock> instructions; std::unordered_map<FileIdx, unique_fd> writeFds; while (!mStopReceiving) { const int res = waitForDataOrSignal(inout, mEventFd); if (res == 0) { const auto res = waitForData(inout); if (res == WaitResult::Timeout) { continue; } if (res < 0) { ALOGE("Failed to poll. Abort. Error %d", res); if (res == WaitResult::Failure) { mStatusListener->reportStatus(DATA_LOADER_UNRECOVERABLE); break; } if (res == mEventFd) { ALOGE("DataLoader requested to stop. Sending EXIT to server."); if (res == WaitResult::StopRequested) { ALOGE("Sending EXIT to server."); sendRequest(inout, EXIT); break; } Loading Loading
services/core/jni/com_android_server_pm_PackageManagerShellCommandDataLoader.cpp +77 −34 Original line number Diff line number Diff line Loading @@ -131,28 +131,6 @@ static bool sendRequest(int fd, RequestType requestType, FileIdx fileIdx = -1, return android::base::WriteFully(fd, &command, sizeof(command)); } static int waitForDataOrSignal(int fd, int event_fd) { struct pollfd pfds[2] = {{fd, POLLIN, 0}, {event_fd, POLLIN, 0}}; // Wait until either data is ready or stop signal is received int res = poll(pfds, 2, PollTimeoutMs); if (res == -1 && errno == EINTR) { // Treat it the same as timeout and allow the caller to retry. return 0; } if (res <= 0) { return res; } // First check if there is a stop signal if (pfds[1].revents == POLLIN) { return event_fd; } // Otherwise check if incoming data is ready if (pfds[0].revents == POLLIN) { return fd; } return -1; } static bool readChunk(int fd, std::vector<uint8_t>& data) { int32_t size; if (!android::base::ReadFully(fd, &size, sizeof(size))) { Loading Loading @@ -382,7 +360,12 @@ static OnTraceChanged& onTraceChanged() { class PMSCDataLoader : public android::dataloader::DataLoader { public: PMSCDataLoader(JavaVM* jvm) : mJvm(jvm) { CHECK(mJvm); } ~PMSCDataLoader() { onTraceChanged().unregisterCallback(this); } ~PMSCDataLoader() { onTraceChanged().unregisterCallback(this); if (mReceiverThread.joinable()) { mReceiverThread.join(); } } void updateReadLogsState(const bool enabled) { if (enabled != mReadLogsEnabled.exchange(enabled)) { Loading Loading @@ -417,11 +400,7 @@ private: mReceiverThread.join(); } } void onDestroy() final { onTraceChanged().unregisterCallback(this); // Make sure the receiver thread stopped. CHECK(!mReceiverThread.joinable()); } void onDestroy() final {} // Installation. bool onPrepareImage(dataloader::DataLoaderInstallationFiles addedFiles) final { Loading Loading @@ -573,6 +552,60 @@ private: return true; } enum class WaitResult { DataAvailable, Timeout, Failure, StopRequested, }; WaitResult waitForData(int fd) { using Clock = std::chrono::steady_clock; using Milliseconds = std::chrono::milliseconds; auto pollTimeoutMs = PollTimeoutMs; const auto waitEnd = Clock::now() + Milliseconds(pollTimeoutMs); while (!mStopReceiving) { struct pollfd pfds[2] = {{fd, POLLIN, 0}, {mEventFd, POLLIN, 0}}; // Wait until either data is ready or stop signal is received int res = poll(pfds, std::size(pfds), pollTimeoutMs); if (res < 0) { if (errno == EINTR) { pollTimeoutMs = std::chrono::duration_cast<Milliseconds>(waitEnd - Clock::now()) .count(); if (pollTimeoutMs < 0) { return WaitResult::Timeout; } continue; } ALOGE("Failed to poll. Error %d", errno); return WaitResult::Failure; } if (res == 0) { return WaitResult::Timeout; } // First check if there is a stop signal if (pfds[1].revents == POLLIN) { ALOGE("DataLoader requested to stop."); return WaitResult::StopRequested; } // Otherwise check if incoming data is ready if (pfds[0].revents == POLLIN) { return WaitResult::DataAvailable; } // Invalid case, just fail. ALOGE("Failed to poll. Result %d", res); return WaitResult::Failure; } ALOGE("DataLoader requested to stop."); return WaitResult::StopRequested; } // Streaming. bool initStreaming(unique_fd inout, MetadataMode mode) { mEventFd.reset(eventfd(0, EFD_CLOEXEC)); Loading @@ -582,6 +615,11 @@ private: } // Awaiting adb handshake. if (waitForData(inout) != WaitResult::DataAvailable) { ALOGE("Failure waiting for the handshake."); return false; } char okay_buf[OKAY.size()]; if (!android::base::ReadFully(inout, okay_buf, OKAY.size())) { ALOGE("Failed to receive OKAY. Abort. Error %d", errno); Loading @@ -601,8 +639,14 @@ private: } } if (mStopReceiving) { ALOGE("DataLoader requested to stop."); return false; } mReceiverThread = std::thread( [this, io = std::move(inout), mode]() mutable { receiver(std::move(io), mode); }); ALOGI("Started streaming..."); return true; } Loading Loading @@ -750,17 +794,16 @@ private: std::vector<IncFsDataBlock> instructions; std::unordered_map<FileIdx, unique_fd> writeFds; while (!mStopReceiving) { const int res = waitForDataOrSignal(inout, mEventFd); if (res == 0) { const auto res = waitForData(inout); if (res == WaitResult::Timeout) { continue; } if (res < 0) { ALOGE("Failed to poll. Abort. Error %d", res); if (res == WaitResult::Failure) { mStatusListener->reportStatus(DATA_LOADER_UNRECOVERABLE); break; } if (res == mEventFd) { ALOGE("DataLoader requested to stop. Sending EXIT to server."); if (res == WaitResult::StopRequested) { ALOGE("Sending EXIT to server."); sendRequest(inout, EXIT); break; } Loading