Loading services/incremental/IncrementalService.cpp +26 −6 Original line number Diff line number Diff line Loading @@ -1279,7 +1279,7 @@ bool IncrementalService::configureNativeBinaries(StorageId storage, std::string_ { std::lock_guard lock(mJobMutex); if (mRunning) { auto& existingJobs = mJobQueue[storage]; auto& existingJobs = mJobQueue[ifs->mountId]; if (existingJobs.empty()) { existingJobs = std::move(jobQueue); } else { Loading Loading @@ -1369,12 +1369,32 @@ void IncrementalService::extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle } bool IncrementalService::waitForNativeBinariesExtraction(StorageId storage) { struct WaitPrinter { const Clock::time_point startTs = Clock::now(); ~WaitPrinter() noexcept { if (sEnablePerfLogging) { const auto endTs = Clock::now(); LOG(INFO) << "incfs: waitForNativeBinariesExtraction() complete in " << elapsedMcs(startTs, endTs) << "mcs"; } } } waitPrinter; MountId mount; { auto ifs = getIfs(storage); if (!ifs) { return true; } mount = ifs->mountId; } std::unique_lock lock(mJobMutex); mJobCondition.wait(lock, [this, storage] { mJobCondition.wait(lock, [this, mount] { return !mRunning || (mPendingJobsStorage != storage && mJobQueue.find(storage) == mJobQueue.end()); (mPendingJobsMount != mount && mJobQueue.find(mount) == mJobQueue.end()); }); return mPendingJobsStorage != storage && mJobQueue.find(storage) == mJobQueue.end(); return mRunning; } void IncrementalService::runJobProcessing() { Loading @@ -1386,7 +1406,7 @@ void IncrementalService::runJobProcessing() { } auto it = mJobQueue.begin(); mPendingJobsStorage = it->first; mPendingJobsMount = it->first; auto queue = std::move(it->second); mJobQueue.erase(it); lock.unlock(); Loading @@ -1396,7 +1416,7 @@ void IncrementalService::runJobProcessing() { } lock.lock(); mPendingJobsStorage = kInvalidStorageId; mPendingJobsMount = kInvalidStorageId; lock.unlock(); mJobCondition.notify_all(); } Loading services/incremental/IncrementalService.h +2 −2 Original line number Diff line number Diff line Loading @@ -308,8 +308,8 @@ private: StorageId mNextId = 0; using Job = std::function<void()>; std::unordered_map<StorageId, std::vector<Job>> mJobQueue; StorageId mPendingJobsStorage = kInvalidStorageId; std::unordered_map<MountId, std::vector<Job>> mJobQueue; MountId mPendingJobsMount = kInvalidStorageId; std::condition_variable mJobCondition; std::mutex mJobMutex; std::thread mJobProcessor; Loading Loading
services/incremental/IncrementalService.cpp +26 −6 Original line number Diff line number Diff line Loading @@ -1279,7 +1279,7 @@ bool IncrementalService::configureNativeBinaries(StorageId storage, std::string_ { std::lock_guard lock(mJobMutex); if (mRunning) { auto& existingJobs = mJobQueue[storage]; auto& existingJobs = mJobQueue[ifs->mountId]; if (existingJobs.empty()) { existingJobs = std::move(jobQueue); } else { Loading Loading @@ -1369,12 +1369,32 @@ void IncrementalService::extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle } bool IncrementalService::waitForNativeBinariesExtraction(StorageId storage) { struct WaitPrinter { const Clock::time_point startTs = Clock::now(); ~WaitPrinter() noexcept { if (sEnablePerfLogging) { const auto endTs = Clock::now(); LOG(INFO) << "incfs: waitForNativeBinariesExtraction() complete in " << elapsedMcs(startTs, endTs) << "mcs"; } } } waitPrinter; MountId mount; { auto ifs = getIfs(storage); if (!ifs) { return true; } mount = ifs->mountId; } std::unique_lock lock(mJobMutex); mJobCondition.wait(lock, [this, storage] { mJobCondition.wait(lock, [this, mount] { return !mRunning || (mPendingJobsStorage != storage && mJobQueue.find(storage) == mJobQueue.end()); (mPendingJobsMount != mount && mJobQueue.find(mount) == mJobQueue.end()); }); return mPendingJobsStorage != storage && mJobQueue.find(storage) == mJobQueue.end(); return mRunning; } void IncrementalService::runJobProcessing() { Loading @@ -1386,7 +1406,7 @@ void IncrementalService::runJobProcessing() { } auto it = mJobQueue.begin(); mPendingJobsStorage = it->first; mPendingJobsMount = it->first; auto queue = std::move(it->second); mJobQueue.erase(it); lock.unlock(); Loading @@ -1396,7 +1416,7 @@ void IncrementalService::runJobProcessing() { } lock.lock(); mPendingJobsStorage = kInvalidStorageId; mPendingJobsMount = kInvalidStorageId; lock.unlock(); mJobCondition.notify_all(); } Loading
services/incremental/IncrementalService.h +2 −2 Original line number Diff line number Diff line Loading @@ -308,8 +308,8 @@ private: StorageId mNextId = 0; using Job = std::function<void()>; std::unordered_map<StorageId, std::vector<Job>> mJobQueue; StorageId mPendingJobsStorage = kInvalidStorageId; std::unordered_map<MountId, std::vector<Job>> mJobQueue; MountId mPendingJobsMount = kInvalidStorageId; std::condition_variable mJobCondition; std::mutex mJobMutex; std::thread mJobProcessor; Loading