Loading services/incremental/IncrementalService.cpp +239 −49 Original line number Diff line number Diff line Loading @@ -294,6 +294,10 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v mJni->initializeForCurrentThread(); runCmdLooper(); }); mTimerThread = std::thread([this]() { mJni->initializeForCurrentThread(); runTimers(); }); const auto mountedRootNames = adoptMountedInstances(); mountExistingImages(mountedRootNames); Loading @@ -306,7 +310,13 @@ IncrementalService::~IncrementalService() { } mJobCondition.notify_all(); mJobProcessor.join(); mTimerCondition.notify_all(); mTimerThread.join(); mCmdLooperThread.join(); mTimedJobs.clear(); // Ensure that mounts are destroyed while the service is still valid. mBindsByPath.clear(); mMounts.clear(); } static const char* toString(IncrementalService::BindKind kind) { Loading Loading @@ -1700,6 +1710,55 @@ void IncrementalService::onAppOpChanged(const std::string& packageName) { } } void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) { if (id == kInvalidStorageId) { return; } { std::unique_lock lock(mTimerMutex); mTimedJobs.insert(TimedJob{id, when, std::move(what)}); } mTimerCondition.notify_all(); } void IncrementalService::removeTimedJobs(MountId id) { if (id == kInvalidStorageId) { return; } { std::unique_lock lock(mTimerMutex); std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; }); } } void IncrementalService::runTimers() { static constexpr TimePoint kInfinityTs{Clock::duration::max()}; TimePoint nextTaskTs = kInfinityTs; for (;;) { std::unique_lock lock(mTimerMutex); mTimerCondition.wait_until(lock, nextTaskTs, [this]() { auto now = Clock::now(); return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now); }); if (!mRunning) { return; } auto now = Clock::now(); auto it = mTimedJobs.begin(); // Always acquire begin(). We can't use it after unlock as mTimedJobs can change. for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) { auto job = it->what; mTimedJobs.erase(it); lock.unlock(); job(); lock.lock(); } nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs; } } IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id, DataLoaderParamsParcel&& params, FileSystemControlParcel&& control, Loading @@ -1713,10 +1772,17 @@ IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, mControl(std::move(control)), mStatusListener(statusListener ? *statusListener : DataLoaderStatusListener()), mHealthListener(healthListener ? *healthListener : StorageHealthListener()), mHealthPath(std::move(healthPath)) { // TODO(b/153874006): enable external health listener. mHealthPath(std::move(healthPath)), mHealthCheckParams(std::move(healthCheckParams)) { if (mHealthListener) { if (!isHealthParamsValid()) { mHealthListener = {}; healthStatusOk(); } } else { // Disable advanced health check statuses. mHealthCheckParams.blockedTimeoutMs = -1; } updateHealthStatus(); } IncrementalService::DataLoaderStub::~DataLoaderStub() { Loading @@ -1726,22 +1792,30 @@ IncrementalService::DataLoaderStub::~DataLoaderStub() { } void IncrementalService::DataLoaderStub::cleanupResources() { requestDestroy(); auto now = Clock::now(); { std::unique_lock lock(mMutex); mHealthPath.clear(); unregisterFromPendingReads(); resetHealthControl(); mService.removeTimedJobs(mId); } requestDestroy(); { std::unique_lock lock(mMutex); mParams = {}; mControl = {}; mHealthControl = {}; mHealthListener = {}; mStatusCondition.wait_until(lock, now + 60s, [this] { return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED; }); mStatusListener = {}; mHealthListener = {}; mId = kInvalidStorageId; } } sp<content::pm::IDataLoader> IncrementalService::DataLoaderStub::getDataLoader() { sp<IDataLoader> dataloader; Loading Loading @@ -1838,7 +1912,7 @@ bool IncrementalService::DataLoaderStub::fsmStep() { targetStatus = mTargetStatus; } LOG(DEBUG) << "fsmStep: " << mId << ": " << currentStatus << " -> " << targetStatus; LOG(DEBUG) << "fsmStep: " << id() << ": " << currentStatus << " -> " << targetStatus; if (currentStatus == targetStatus) { return true; Loading Loading @@ -1920,42 +1994,167 @@ binder::Status IncrementalService::DataLoaderStub::onStatusChanged(MountId mount return binder::Status::ok(); } void IncrementalService::DataLoaderStub::healthStatusOk() { LOG(DEBUG) << "healthStatusOk: " << mId; std::unique_lock lock(mMutex); registerForPendingReads(); bool IncrementalService::DataLoaderStub::isHealthParamsValid() const { return mHealthCheckParams.blockedTimeoutMs > 0 && mHealthCheckParams.blockedTimeoutMs < mHealthCheckParams.unhealthyTimeoutMs; } void IncrementalService::DataLoaderStub::healthStatusReadsPending() { LOG(DEBUG) << "healthStatusReadsPending: " << mId; requestStart(); void IncrementalService::DataLoaderStub::onHealthStatus(StorageHealthListener healthListener, int healthStatus) { LOG(DEBUG) << id() << ": healthStatus: " << healthStatus; if (healthListener) { healthListener->onHealthStatus(id(), healthStatus); } } void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { LOG(DEBUG) << id() << ": updateHealthStatus" << (baseline ? " (baseline)" : ""); int healthStatusToReport = -1; StorageHealthListener healthListener; { std::unique_lock lock(mMutex); unregisterFromPendingReads(); healthListener = mHealthListener; // Healthcheck depends on timestamp of the oldest pending read. // To get it, we need to re-open a pendingReads FD to get a full list of reads. // Additionally we need to re-register for epoll with fresh FDs in case there are no reads. const auto now = Clock::now(); const auto kernelTsUs = getOldestPendingReadTs(); if (baseline) { // Updating baseline only on looper/epoll callback, i.e. on new set of pending reads. mHealthBase = {now, kernelTsUs}; } void IncrementalService::DataLoaderStub::healthStatusBlocked() {} if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now || mHealthBase.kernelTsUs > kernelTsUs) { LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait."; registerForPendingReads(); healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK; lock.unlock(); onHealthStatus(healthListener, healthStatusToReport); return; } void IncrementalService::DataLoaderStub::healthStatusUnhealthy() {} resetHealthControl(); void IncrementalService::DataLoaderStub::registerForPendingReads() { auto pendingReadsFd = mHealthControl.pendingReads(); if (pendingReadsFd < 0) { // Always make sure the data loader is started. setTargetStatusLocked(IDataLoaderStatusListener::DATA_LOADER_STARTED); // Skip any further processing if health check params are invalid. if (!isHealthParamsValid()) { LOG(DEBUG) << id() << ": Skip any further processing if health check params are invalid."; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING; lock.unlock(); onHealthStatus(healthListener, healthStatusToReport); // Triggering data loader start. This is a one-time action. fsmStep(); return; } const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs); const auto unhealthyTimeout = std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs); const auto unhealthyMonitoring = std::max(1000ms, std::chrono::milliseconds(mHealthCheckParams.unhealthyMonitoringMs)); const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs; const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs); const auto delta = now - userTs; TimePoint whenToCheckBack; if (delta < blockedTimeout) { LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status."; whenToCheckBack = userTs + blockedTimeout; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING; } else if (delta < unhealthyTimeout) { LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy."; whenToCheckBack = userTs + unhealthyTimeout; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED; } else { LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring."; whenToCheckBack = now + unhealthyMonitoring; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY; } LOG(DEBUG) << id() << ": updateHealthStatus in " << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack - now) .count()) / 1000.0 << "secs"; mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); }); } if (healthStatusToReport != -1) { onHealthStatus(healthListener, healthStatusToReport); } fsmStep(); } const incfs::UniqueControl& IncrementalService::DataLoaderStub::initializeHealthControl() { if (mHealthPath.empty()) { resetHealthControl(); return mHealthControl; } if (mHealthControl.pendingReads() < 0) { mHealthControl = mService.mIncFs->openMount(mHealthPath); pendingReadsFd = mHealthControl.pendingReads(); if (pendingReadsFd < 0) { LOG(ERROR) << "Failed to open health control for: " << mId << ", path: " << mHealthPath } if (mHealthControl.pendingReads() < 0) { LOG(ERROR) << "Failed to open health control for: " << id() << ", path: " << mHealthPath << "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":" << mHealthControl.logs() << ")"; return; } return mHealthControl; } void IncrementalService::DataLoaderStub::resetHealthControl() { mHealthControl = {}; } BootClockTsUs IncrementalService::DataLoaderStub::getOldestPendingReadTs() { auto result = kMaxBootClockTsUs; const auto& control = initializeHealthControl(); if (control.pendingReads() < 0) { return result; } std::vector<incfs::ReadInfo> pendingReads; if (mService.mIncFs->waitForPendingReads(control, 0ms, &pendingReads) != android::incfs::WaitResult::HaveData || pendingReads.empty()) { return result; } LOG(DEBUG) << id() << ": pendingReads: " << control.pendingReads() << ", " << pendingReads.size() << ": " << pendingReads.front().bootClockTsUs; for (auto&& pendingRead : pendingReads) { result = std::min(result, pendingRead.bootClockTsUs); } return result; } void IncrementalService::DataLoaderStub::registerForPendingReads() { const auto pendingReadsFd = mHealthControl.pendingReads(); if (pendingReadsFd < 0) { return; } LOG(DEBUG) << id() << ": addFd(pendingReadsFd): " << pendingReadsFd; mService.mLooper->addFd( pendingReadsFd, android::Looper::POLL_CALLBACK, android::Looper::EVENT_INPUT, [](int, int, void* data) -> int { auto&& self = (DataLoaderStub*)data; return self->onPendingReads(); self->updateHealthStatus(/*baseline=*/true); return 0; }, this); mService.mLooper->wake(); Loading @@ -1967,19 +2166,10 @@ void IncrementalService::DataLoaderStub::unregisterFromPendingReads() { return; } LOG(DEBUG) << id() << ": removeFd(pendingReadsFd): " << pendingReadsFd; mService.mLooper->removeFd(pendingReadsFd); mService.mLooper->wake(); mHealthControl = {}; } int IncrementalService::DataLoaderStub::onPendingReads() { if (!mService.mRunning.load(std::memory_order_relaxed)) { return 0; } healthStatusReadsPending(); return 0; } void IncrementalService::DataLoaderStub::onDump(int fd) { Loading services/incremental/IncrementalService.h +42 −17 Original line number Diff line number Diff line Loading @@ -35,6 +35,7 @@ #include <limits> #include <map> #include <mutex> #include <set> #include <span> #include <string> #include <string_view> Loading Loading @@ -186,17 +187,12 @@ private: void onDump(int fd); MountId id() const { return mId; } MountId id() const { return mId.load(std::memory_order_relaxed); } const content::pm::DataLoaderParamsParcel& params() const { return mParams; } private: binder::Status onStatusChanged(MountId mount, int newStatus) final; void registerForPendingReads(); void unregisterFromPendingReads(); int onPendingReads(); bool isValid() const { return mId != kInvalidStorageId; } sp<content::pm::IDataLoader> getDataLoader(); bool bind(); Loading @@ -208,21 +204,27 @@ private: void setTargetStatusLocked(int status); bool fsmStep(); bool fsmStep(int currentStatus, int targetStatus); void onHealthStatus(StorageHealthListener healthListener, int healthStatus); void updateHealthStatus(bool baseline = false); bool isValid() const { return id() != kInvalidStorageId; } // Watching for pending reads. void healthStatusOk(); // Pending reads detected, waiting for Xsecs to confirm blocked state. void healthStatusReadsPending(); // There are reads pending for X+secs, waiting for additional Ysecs to confirm unhealthy // state. void healthStatusBlocked(); // There are reads pending for X+Ysecs, marking storage as unhealthy. void healthStatusUnhealthy(); bool isHealthParamsValid() const; const incfs::UniqueControl& initializeHealthControl(); void resetHealthControl(); BootClockTsUs getOldestPendingReadTs(); void registerForPendingReads(); void unregisterFromPendingReads(); IncrementalService& mService; std::mutex mMutex; MountId mId = kInvalidStorageId; std::atomic<MountId> mId = kInvalidStorageId; content::pm::DataLoaderParamsParcel mParams; content::pm::FileSystemControlParcel mControl; DataLoaderStatusListener mStatusListener; Loading @@ -235,6 +237,11 @@ private: std::string mHealthPath; incfs::UniqueControl mHealthControl; struct { TimePoint userTs; BootClockTsUs kernelTsUs; } mHealthBase = {TimePoint::max(), kMaxBootClockTsUs}; StorageHealthCheckParams mHealthCheckParams; }; using DataLoaderStubPtr = sp<DataLoaderStub>; Loading Loading @@ -331,6 +338,8 @@ private: bool unregisterAppOpsCallback(const std::string& packageName); void onAppOpChanged(const std::string& packageName); using Job = std::function<void()>; void runJobProcessing(); void extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile, ZipEntry& entry, const incfs::FileId& libFileId, std::string_view targetLibPath, Loading @@ -338,6 +347,10 @@ private: void runCmdLooper(); void addTimedJob(MountId id, TimePoint when, Job what); void removeTimedJobs(MountId id); void runTimers(); private: const std::unique_ptr<VoldServiceWrapper> mVold; const std::unique_ptr<DataLoaderManagerWrapper> mDataLoaderManager; Loading @@ -360,7 +373,6 @@ private: std::atomic_bool mRunning{true}; using Job = std::function<void()>; std::unordered_map<MountId, std::vector<Job>> mJobQueue; MountId mPendingJobsMount = kInvalidStorageId; std::condition_variable mJobCondition; Loading @@ -368,6 +380,19 @@ private: std::thread mJobProcessor; std::thread mCmdLooperThread; struct TimedJob { MountId id; TimePoint when; Job what; friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) { return lhs.when < rhs.when; } }; std::set<TimedJob> mTimedJobs; std::condition_variable mTimerCondition; std::mutex mTimerMutex; std::thread mTimerThread; }; } // namespace android::incremental services/incremental/test/IncrementalServiceTest.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -371,7 +371,7 @@ class MockJniWrapper : public JniWrapper { public: MOCK_CONST_METHOD0(initializeForCurrentThread, void()); MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(2); } MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(3); } }; class MockLooperWrapper : public LooperWrapper { Loading Loading
services/incremental/IncrementalService.cpp +239 −49 Original line number Diff line number Diff line Loading @@ -294,6 +294,10 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v mJni->initializeForCurrentThread(); runCmdLooper(); }); mTimerThread = std::thread([this]() { mJni->initializeForCurrentThread(); runTimers(); }); const auto mountedRootNames = adoptMountedInstances(); mountExistingImages(mountedRootNames); Loading @@ -306,7 +310,13 @@ IncrementalService::~IncrementalService() { } mJobCondition.notify_all(); mJobProcessor.join(); mTimerCondition.notify_all(); mTimerThread.join(); mCmdLooperThread.join(); mTimedJobs.clear(); // Ensure that mounts are destroyed while the service is still valid. mBindsByPath.clear(); mMounts.clear(); } static const char* toString(IncrementalService::BindKind kind) { Loading Loading @@ -1700,6 +1710,55 @@ void IncrementalService::onAppOpChanged(const std::string& packageName) { } } void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) { if (id == kInvalidStorageId) { return; } { std::unique_lock lock(mTimerMutex); mTimedJobs.insert(TimedJob{id, when, std::move(what)}); } mTimerCondition.notify_all(); } void IncrementalService::removeTimedJobs(MountId id) { if (id == kInvalidStorageId) { return; } { std::unique_lock lock(mTimerMutex); std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; }); } } void IncrementalService::runTimers() { static constexpr TimePoint kInfinityTs{Clock::duration::max()}; TimePoint nextTaskTs = kInfinityTs; for (;;) { std::unique_lock lock(mTimerMutex); mTimerCondition.wait_until(lock, nextTaskTs, [this]() { auto now = Clock::now(); return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now); }); if (!mRunning) { return; } auto now = Clock::now(); auto it = mTimedJobs.begin(); // Always acquire begin(). We can't use it after unlock as mTimedJobs can change. for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) { auto job = it->what; mTimedJobs.erase(it); lock.unlock(); job(); lock.lock(); } nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs; } } IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id, DataLoaderParamsParcel&& params, FileSystemControlParcel&& control, Loading @@ -1713,10 +1772,17 @@ IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, mControl(std::move(control)), mStatusListener(statusListener ? *statusListener : DataLoaderStatusListener()), mHealthListener(healthListener ? *healthListener : StorageHealthListener()), mHealthPath(std::move(healthPath)) { // TODO(b/153874006): enable external health listener. mHealthPath(std::move(healthPath)), mHealthCheckParams(std::move(healthCheckParams)) { if (mHealthListener) { if (!isHealthParamsValid()) { mHealthListener = {}; healthStatusOk(); } } else { // Disable advanced health check statuses. mHealthCheckParams.blockedTimeoutMs = -1; } updateHealthStatus(); } IncrementalService::DataLoaderStub::~DataLoaderStub() { Loading @@ -1726,22 +1792,30 @@ IncrementalService::DataLoaderStub::~DataLoaderStub() { } void IncrementalService::DataLoaderStub::cleanupResources() { requestDestroy(); auto now = Clock::now(); { std::unique_lock lock(mMutex); mHealthPath.clear(); unregisterFromPendingReads(); resetHealthControl(); mService.removeTimedJobs(mId); } requestDestroy(); { std::unique_lock lock(mMutex); mParams = {}; mControl = {}; mHealthControl = {}; mHealthListener = {}; mStatusCondition.wait_until(lock, now + 60s, [this] { return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED; }); mStatusListener = {}; mHealthListener = {}; mId = kInvalidStorageId; } } sp<content::pm::IDataLoader> IncrementalService::DataLoaderStub::getDataLoader() { sp<IDataLoader> dataloader; Loading Loading @@ -1838,7 +1912,7 @@ bool IncrementalService::DataLoaderStub::fsmStep() { targetStatus = mTargetStatus; } LOG(DEBUG) << "fsmStep: " << mId << ": " << currentStatus << " -> " << targetStatus; LOG(DEBUG) << "fsmStep: " << id() << ": " << currentStatus << " -> " << targetStatus; if (currentStatus == targetStatus) { return true; Loading Loading @@ -1920,42 +1994,167 @@ binder::Status IncrementalService::DataLoaderStub::onStatusChanged(MountId mount return binder::Status::ok(); } void IncrementalService::DataLoaderStub::healthStatusOk() { LOG(DEBUG) << "healthStatusOk: " << mId; std::unique_lock lock(mMutex); registerForPendingReads(); bool IncrementalService::DataLoaderStub::isHealthParamsValid() const { return mHealthCheckParams.blockedTimeoutMs > 0 && mHealthCheckParams.blockedTimeoutMs < mHealthCheckParams.unhealthyTimeoutMs; } void IncrementalService::DataLoaderStub::healthStatusReadsPending() { LOG(DEBUG) << "healthStatusReadsPending: " << mId; requestStart(); void IncrementalService::DataLoaderStub::onHealthStatus(StorageHealthListener healthListener, int healthStatus) { LOG(DEBUG) << id() << ": healthStatus: " << healthStatus; if (healthListener) { healthListener->onHealthStatus(id(), healthStatus); } } void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) { LOG(DEBUG) << id() << ": updateHealthStatus" << (baseline ? " (baseline)" : ""); int healthStatusToReport = -1; StorageHealthListener healthListener; { std::unique_lock lock(mMutex); unregisterFromPendingReads(); healthListener = mHealthListener; // Healthcheck depends on timestamp of the oldest pending read. // To get it, we need to re-open a pendingReads FD to get a full list of reads. // Additionally we need to re-register for epoll with fresh FDs in case there are no reads. const auto now = Clock::now(); const auto kernelTsUs = getOldestPendingReadTs(); if (baseline) { // Updating baseline only on looper/epoll callback, i.e. on new set of pending reads. mHealthBase = {now, kernelTsUs}; } void IncrementalService::DataLoaderStub::healthStatusBlocked() {} if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now || mHealthBase.kernelTsUs > kernelTsUs) { LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait."; registerForPendingReads(); healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK; lock.unlock(); onHealthStatus(healthListener, healthStatusToReport); return; } void IncrementalService::DataLoaderStub::healthStatusUnhealthy() {} resetHealthControl(); void IncrementalService::DataLoaderStub::registerForPendingReads() { auto pendingReadsFd = mHealthControl.pendingReads(); if (pendingReadsFd < 0) { // Always make sure the data loader is started. setTargetStatusLocked(IDataLoaderStatusListener::DATA_LOADER_STARTED); // Skip any further processing if health check params are invalid. if (!isHealthParamsValid()) { LOG(DEBUG) << id() << ": Skip any further processing if health check params are invalid."; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING; lock.unlock(); onHealthStatus(healthListener, healthStatusToReport); // Triggering data loader start. This is a one-time action. fsmStep(); return; } const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs); const auto unhealthyTimeout = std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs); const auto unhealthyMonitoring = std::max(1000ms, std::chrono::milliseconds(mHealthCheckParams.unhealthyMonitoringMs)); const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs; const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs); const auto delta = now - userTs; TimePoint whenToCheckBack; if (delta < blockedTimeout) { LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status."; whenToCheckBack = userTs + blockedTimeout; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING; } else if (delta < unhealthyTimeout) { LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy."; whenToCheckBack = userTs + unhealthyTimeout; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED; } else { LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring."; whenToCheckBack = now + unhealthyMonitoring; healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY; } LOG(DEBUG) << id() << ": updateHealthStatus in " << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack - now) .count()) / 1000.0 << "secs"; mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); }); } if (healthStatusToReport != -1) { onHealthStatus(healthListener, healthStatusToReport); } fsmStep(); } const incfs::UniqueControl& IncrementalService::DataLoaderStub::initializeHealthControl() { if (mHealthPath.empty()) { resetHealthControl(); return mHealthControl; } if (mHealthControl.pendingReads() < 0) { mHealthControl = mService.mIncFs->openMount(mHealthPath); pendingReadsFd = mHealthControl.pendingReads(); if (pendingReadsFd < 0) { LOG(ERROR) << "Failed to open health control for: " << mId << ", path: " << mHealthPath } if (mHealthControl.pendingReads() < 0) { LOG(ERROR) << "Failed to open health control for: " << id() << ", path: " << mHealthPath << "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":" << mHealthControl.logs() << ")"; return; } return mHealthControl; } void IncrementalService::DataLoaderStub::resetHealthControl() { mHealthControl = {}; } BootClockTsUs IncrementalService::DataLoaderStub::getOldestPendingReadTs() { auto result = kMaxBootClockTsUs; const auto& control = initializeHealthControl(); if (control.pendingReads() < 0) { return result; } std::vector<incfs::ReadInfo> pendingReads; if (mService.mIncFs->waitForPendingReads(control, 0ms, &pendingReads) != android::incfs::WaitResult::HaveData || pendingReads.empty()) { return result; } LOG(DEBUG) << id() << ": pendingReads: " << control.pendingReads() << ", " << pendingReads.size() << ": " << pendingReads.front().bootClockTsUs; for (auto&& pendingRead : pendingReads) { result = std::min(result, pendingRead.bootClockTsUs); } return result; } void IncrementalService::DataLoaderStub::registerForPendingReads() { const auto pendingReadsFd = mHealthControl.pendingReads(); if (pendingReadsFd < 0) { return; } LOG(DEBUG) << id() << ": addFd(pendingReadsFd): " << pendingReadsFd; mService.mLooper->addFd( pendingReadsFd, android::Looper::POLL_CALLBACK, android::Looper::EVENT_INPUT, [](int, int, void* data) -> int { auto&& self = (DataLoaderStub*)data; return self->onPendingReads(); self->updateHealthStatus(/*baseline=*/true); return 0; }, this); mService.mLooper->wake(); Loading @@ -1967,19 +2166,10 @@ void IncrementalService::DataLoaderStub::unregisterFromPendingReads() { return; } LOG(DEBUG) << id() << ": removeFd(pendingReadsFd): " << pendingReadsFd; mService.mLooper->removeFd(pendingReadsFd); mService.mLooper->wake(); mHealthControl = {}; } int IncrementalService::DataLoaderStub::onPendingReads() { if (!mService.mRunning.load(std::memory_order_relaxed)) { return 0; } healthStatusReadsPending(); return 0; } void IncrementalService::DataLoaderStub::onDump(int fd) { Loading
services/incremental/IncrementalService.h +42 −17 Original line number Diff line number Diff line Loading @@ -35,6 +35,7 @@ #include <limits> #include <map> #include <mutex> #include <set> #include <span> #include <string> #include <string_view> Loading Loading @@ -186,17 +187,12 @@ private: void onDump(int fd); MountId id() const { return mId; } MountId id() const { return mId.load(std::memory_order_relaxed); } const content::pm::DataLoaderParamsParcel& params() const { return mParams; } private: binder::Status onStatusChanged(MountId mount, int newStatus) final; void registerForPendingReads(); void unregisterFromPendingReads(); int onPendingReads(); bool isValid() const { return mId != kInvalidStorageId; } sp<content::pm::IDataLoader> getDataLoader(); bool bind(); Loading @@ -208,21 +204,27 @@ private: void setTargetStatusLocked(int status); bool fsmStep(); bool fsmStep(int currentStatus, int targetStatus); void onHealthStatus(StorageHealthListener healthListener, int healthStatus); void updateHealthStatus(bool baseline = false); bool isValid() const { return id() != kInvalidStorageId; } // Watching for pending reads. void healthStatusOk(); // Pending reads detected, waiting for Xsecs to confirm blocked state. void healthStatusReadsPending(); // There are reads pending for X+secs, waiting for additional Ysecs to confirm unhealthy // state. void healthStatusBlocked(); // There are reads pending for X+Ysecs, marking storage as unhealthy. void healthStatusUnhealthy(); bool isHealthParamsValid() const; const incfs::UniqueControl& initializeHealthControl(); void resetHealthControl(); BootClockTsUs getOldestPendingReadTs(); void registerForPendingReads(); void unregisterFromPendingReads(); IncrementalService& mService; std::mutex mMutex; MountId mId = kInvalidStorageId; std::atomic<MountId> mId = kInvalidStorageId; content::pm::DataLoaderParamsParcel mParams; content::pm::FileSystemControlParcel mControl; DataLoaderStatusListener mStatusListener; Loading @@ -235,6 +237,11 @@ private: std::string mHealthPath; incfs::UniqueControl mHealthControl; struct { TimePoint userTs; BootClockTsUs kernelTsUs; } mHealthBase = {TimePoint::max(), kMaxBootClockTsUs}; StorageHealthCheckParams mHealthCheckParams; }; using DataLoaderStubPtr = sp<DataLoaderStub>; Loading Loading @@ -331,6 +338,8 @@ private: bool unregisterAppOpsCallback(const std::string& packageName); void onAppOpChanged(const std::string& packageName); using Job = std::function<void()>; void runJobProcessing(); void extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile, ZipEntry& entry, const incfs::FileId& libFileId, std::string_view targetLibPath, Loading @@ -338,6 +347,10 @@ private: void runCmdLooper(); void addTimedJob(MountId id, TimePoint when, Job what); void removeTimedJobs(MountId id); void runTimers(); private: const std::unique_ptr<VoldServiceWrapper> mVold; const std::unique_ptr<DataLoaderManagerWrapper> mDataLoaderManager; Loading @@ -360,7 +373,6 @@ private: std::atomic_bool mRunning{true}; using Job = std::function<void()>; std::unordered_map<MountId, std::vector<Job>> mJobQueue; MountId mPendingJobsMount = kInvalidStorageId; std::condition_variable mJobCondition; Loading @@ -368,6 +380,19 @@ private: std::thread mJobProcessor; std::thread mCmdLooperThread; struct TimedJob { MountId id; TimePoint when; Job what; friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) { return lhs.when < rhs.when; } }; std::set<TimedJob> mTimedJobs; std::condition_variable mTimerCondition; std::mutex mTimerMutex; std::thread mTimerThread; }; } // namespace android::incremental
services/incremental/test/IncrementalServiceTest.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -371,7 +371,7 @@ class MockJniWrapper : public JniWrapper { public: MOCK_CONST_METHOD0(initializeForCurrentThread, void()); MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(2); } MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(3); } }; class MockLooperWrapper : public LooperWrapper { Loading