Loading services/core/jni/com_android_server_pm_PackageManagerShellCommandDataLoader.cpp +24 −20 Original line number Diff line number Diff line Loading @@ -364,13 +364,8 @@ private: } } void onDestroy() final { ALOGE("Sending EXIT to server."); sendRequest(mOutFd, EXIT); // Make sure the receiver thread stopped. CHECK(!mReceiverThread.joinable()); mInFd.reset(); mOutFd.reset(); } // Installation. Loading Loading @@ -568,13 +563,6 @@ private: // Streaming. bool initStreaming(unique_fd inout) { mInFd.reset(dup(inout)); mOutFd.reset(dup(inout)); if (mInFd < 0 || mOutFd < 0) { ALOGE("Failed to dup FDs."); return false; } mEventFd.reset(eventfd(0, EFD_CLOEXEC)); if (mEventFd < 0) { ALOGE("Failed to create eventfd."); Loading @@ -583,7 +571,7 @@ private: // Awaiting adb handshake. char okay_buf[OKAY.size()]; if (!android::base::ReadFully(mInFd, okay_buf, OKAY.size())) { if (!android::base::ReadFully(inout, okay_buf, OKAY.size())) { ALOGE("Failed to receive OKAY. Abort."); return false; } Loading @@ -593,13 +581,23 @@ private: return false; } mReceiverThread = std::thread([this]() { receiver(); }); { std::lock_guard lock{mOutFdLock}; mOutFd.reset(::dup(inout)); if (mOutFd < 0) { ALOGE("Failed to create streaming fd."); } } mReceiverThread = std::thread([this, io = std::move(inout)]() mutable { receiver(std::move(io)); }); ALOGI("Started streaming..."); return true; } // IFS callbacks. void onPendingReads(dataloader::PendingReads pendingReads) final { std::lock_guard lock{mOutFdLock}; CHECK(mIfs); for (auto&& pendingRead : pendingReads) { const android::dataloader::FileId& fileId = pendingRead.id; Loading @@ -625,12 +623,12 @@ private: } } void receiver() { void receiver(unique_fd inout) { std::vector<uint8_t> data; std::vector<IncFsDataBlock> instructions; std::unordered_map<FileIdx, unique_fd> writeFds; while (!mStopReceiving) { const int res = waitForDataOrSignal(mInFd, mEventFd); const int res = waitForDataOrSignal(inout, mEventFd); if (res == 0) { continue; } Loading @@ -640,10 +638,11 @@ private: break; } if (res == mEventFd) { ALOGE("Received stop signal. Exit."); ALOGE("Received stop signal. Sending EXIT to server."); sendRequest(inout, EXIT); break; } if (!readChunk(mInFd, data)) { if (!readChunk(inout, data)) { ALOGE("Failed to read a message. Abort."); mStatusListener->reportStatus(DATA_LOADER_NO_CONNECTION); break; Loading @@ -656,7 +655,7 @@ private: ALOGI("Stop signal received. Sending exit command (remaining bytes: %d).", int(remainingData.size())); sendRequest(mOutFd, EXIT); sendRequest(inout, EXIT); mStopReceiving = true; break; } Loading Loading @@ -699,6 +698,11 @@ private: writeInstructions(instructions); } writeInstructions(instructions); { std::lock_guard lock{mOutFdLock}; mOutFd.reset(); } } void writeInstructions(std::vector<IncFsDataBlock>& instructions) { Loading Loading @@ -742,7 +746,7 @@ private: std::string mArgs; android::dataloader::FilesystemConnectorPtr mIfs = nullptr; android::dataloader::StatusListenerPtr mStatusListener = nullptr; android::base::unique_fd mInFd; std::mutex mOutFdLock; android::base::unique_fd mOutFd; android::base::unique_fd mEventFd; std::thread mReceiverThread; Loading Loading
services/core/jni/com_android_server_pm_PackageManagerShellCommandDataLoader.cpp +24 −20 Original line number Diff line number Diff line Loading @@ -364,13 +364,8 @@ private: } } void onDestroy() final { ALOGE("Sending EXIT to server."); sendRequest(mOutFd, EXIT); // Make sure the receiver thread stopped. CHECK(!mReceiverThread.joinable()); mInFd.reset(); mOutFd.reset(); } // Installation. Loading Loading @@ -568,13 +563,6 @@ private: // Streaming. bool initStreaming(unique_fd inout) { mInFd.reset(dup(inout)); mOutFd.reset(dup(inout)); if (mInFd < 0 || mOutFd < 0) { ALOGE("Failed to dup FDs."); return false; } mEventFd.reset(eventfd(0, EFD_CLOEXEC)); if (mEventFd < 0) { ALOGE("Failed to create eventfd."); Loading @@ -583,7 +571,7 @@ private: // Awaiting adb handshake. char okay_buf[OKAY.size()]; if (!android::base::ReadFully(mInFd, okay_buf, OKAY.size())) { if (!android::base::ReadFully(inout, okay_buf, OKAY.size())) { ALOGE("Failed to receive OKAY. Abort."); return false; } Loading @@ -593,13 +581,23 @@ private: return false; } mReceiverThread = std::thread([this]() { receiver(); }); { std::lock_guard lock{mOutFdLock}; mOutFd.reset(::dup(inout)); if (mOutFd < 0) { ALOGE("Failed to create streaming fd."); } } mReceiverThread = std::thread([this, io = std::move(inout)]() mutable { receiver(std::move(io)); }); ALOGI("Started streaming..."); return true; } // IFS callbacks. void onPendingReads(dataloader::PendingReads pendingReads) final { std::lock_guard lock{mOutFdLock}; CHECK(mIfs); for (auto&& pendingRead : pendingReads) { const android::dataloader::FileId& fileId = pendingRead.id; Loading @@ -625,12 +623,12 @@ private: } } void receiver() { void receiver(unique_fd inout) { std::vector<uint8_t> data; std::vector<IncFsDataBlock> instructions; std::unordered_map<FileIdx, unique_fd> writeFds; while (!mStopReceiving) { const int res = waitForDataOrSignal(mInFd, mEventFd); const int res = waitForDataOrSignal(inout, mEventFd); if (res == 0) { continue; } Loading @@ -640,10 +638,11 @@ private: break; } if (res == mEventFd) { ALOGE("Received stop signal. Exit."); ALOGE("Received stop signal. Sending EXIT to server."); sendRequest(inout, EXIT); break; } if (!readChunk(mInFd, data)) { if (!readChunk(inout, data)) { ALOGE("Failed to read a message. Abort."); mStatusListener->reportStatus(DATA_LOADER_NO_CONNECTION); break; Loading @@ -656,7 +655,7 @@ private: ALOGI("Stop signal received. Sending exit command (remaining bytes: %d).", int(remainingData.size())); sendRequest(mOutFd, EXIT); sendRequest(inout, EXIT); mStopReceiving = true; break; } Loading Loading @@ -699,6 +698,11 @@ private: writeInstructions(instructions); } writeInstructions(instructions); { std::lock_guard lock{mOutFdLock}; mOutFd.reset(); } } void writeInstructions(std::vector<IncFsDataBlock>& instructions) { Loading Loading @@ -742,7 +746,7 @@ private: std::string mArgs; android::dataloader::FilesystemConnectorPtr mIfs = nullptr; android::dataloader::StatusListenerPtr mStatusListener = nullptr; android::base::unique_fd mInFd; std::mutex mOutFdLock; android::base::unique_fd mOutFd; android::base::unique_fd mEventFd; std::thread mReceiverThread; Loading