Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c4ac9d23 authored by Alex Buynytskyy's avatar Alex Buynytskyy Committed by Android (Google) Code Review
Browse files

Merge "Healthcheck: proper job allocation and test." into rvc-dev

parents 20534dcc 46d3ddb3
Loading
Loading
Loading
Loading
+36 −75
Original line number Diff line number Diff line
@@ -268,22 +268,14 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v
        mAppOpsManager(sm.getAppOpsManager()),
        mJni(sm.getJni()),
        mLooper(sm.getLooper()),
        mTimedQueue(sm.getTimedQueue()),
        mIncrementalDir(rootDir) {
    if (!mVold) {
        LOG(FATAL) << "Vold service is unavailable";
    }
    if (!mDataLoaderManager) {
        LOG(FATAL) << "DataLoaderManagerService is unavailable";
    }
    if (!mAppOpsManager) {
        LOG(FATAL) << "AppOpsManager is unavailable";
    }
    if (!mJni) {
        LOG(FATAL) << "JNI is unavailable";
    }
    if (!mLooper) {
        LOG(FATAL) << "Looper is unavailable";
    }
    CHECK(mVold) << "Vold service is unavailable";
    CHECK(mDataLoaderManager) << "DataLoaderManagerService is unavailable";
    CHECK(mAppOpsManager) << "AppOpsManager is unavailable";
    CHECK(mJni) << "JNI is unavailable";
    CHECK(mLooper) << "Looper is unavailable";
    CHECK(mTimedQueue) << "TimedQueue is unavailable";

    mJobQueue.reserve(16);
    mJobProcessor = std::thread([this]() {
@@ -294,10 +286,6 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v
        mJni->initializeForCurrentThread();
        runCmdLooper();
    });
    mTimerThread = std::thread([this]() {
        mJni->initializeForCurrentThread();
        runTimers();
    });

    const auto mountedRootNames = adoptMountedInstances();
    mountExistingImages(mountedRootNames);
@@ -310,10 +298,8 @@ IncrementalService::~IncrementalService() {
    }
    mJobCondition.notify_all();
    mJobProcessor.join();
    mTimerCondition.notify_all();
    mTimerThread.join();
    mCmdLooperThread.join();
    mTimedJobs.clear();
    mTimedQueue->stop();
    // Ensure that mounts are destroyed while the service is still valid.
    mBindsByPath.clear();
    mMounts.clear();
@@ -1710,53 +1696,18 @@ void IncrementalService::onAppOpChanged(const std::string& packageName) {
    }
}

void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) {
void IncrementalService::addTimedJob(MountId id, Milliseconds after, Job what) {
    if (id == kInvalidStorageId) {
        return;
    }
    {
        std::unique_lock lock(mTimerMutex);
        mTimedJobs.insert(TimedJob{id, when, std::move(what)});
    }
    mTimerCondition.notify_all();
    mTimedQueue->addJob(id, after, std::move(what));
}

void IncrementalService::removeTimedJobs(MountId id) {
    if (id == kInvalidStorageId) {
        return;
    }
    {
        std::unique_lock lock(mTimerMutex);
        std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; });
    }
}

void IncrementalService::runTimers() {
    static constexpr TimePoint kInfinityTs{Clock::duration::max()};
    TimePoint nextTaskTs = kInfinityTs;
    for (;;) {
        std::unique_lock lock(mTimerMutex);
        mTimerCondition.wait_until(lock, nextTaskTs, [this]() {
            auto now = Clock::now();
            return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now);
        });
        if (!mRunning) {
            return;
        }

        auto now = Clock::now();
        auto it = mTimedJobs.begin();
        // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
        for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) {
            auto job = it->what;
            mTimedJobs.erase(it);

            lock.unlock();
            job();
            lock.lock();
        }
        nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs;
    }
    mTimedQueue->removeJobs(id);
}

IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
@@ -2029,8 +1980,8 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {
            mHealthBase = {now, kernelTsUs};
        }

        if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now ||
            mHealthBase.kernelTsUs > kernelTsUs) {
        if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.kernelTsUs == kMaxBootClockTsUs ||
            mHealthBase.userTs > now) {
            LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait.";
            registerForPendingReads();
            healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK;
@@ -2056,6 +2007,9 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {
            return;
        }

        // Don't schedule timer job less than 500ms in advance.
        static constexpr auto kTolerance = 500ms;

        const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs);
        const auto unhealthyTimeout =
                std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs);
@@ -2065,31 +2019,28 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {

        const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs;
        const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs);
        const auto delta = now - userTs;
        const auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(now - userTs);

        TimePoint whenToCheckBack;
        if (delta < blockedTimeout) {
        Milliseconds checkBackAfter;
        if (delta + kTolerance < blockedTimeout) {
            LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status.";
            whenToCheckBack = userTs + blockedTimeout;
            checkBackAfter = blockedTimeout - delta;
            healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
        } else if (delta < unhealthyTimeout) {
        } else if (delta + kTolerance < unhealthyTimeout) {
            LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy.";
            whenToCheckBack = userTs + unhealthyTimeout;
            checkBackAfter = unhealthyTimeout - delta;
            healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED;
        } else {
            LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring.";
            whenToCheckBack = now + unhealthyMonitoring;
            checkBackAfter = unhealthyMonitoring;
            healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
        }
        LOG(DEBUG) << id() << ": updateHealthStatus in "
                   << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack -
                                                                                   now)
                                     .count()) /
                        1000.0
        LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0
                   << "secs";
        mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); });
        mService.addTimedJob(id(), checkBackAfter, [this]() { updateHealthStatus(); });
    }

    // With kTolerance we are expecting these to execute before the next update.
    if (healthStatusToReport != -1) {
        onHealthStatus(healthListener, healthStatusToReport);
    }
@@ -2178,6 +2129,16 @@ void IncrementalService::DataLoaderStub::onDump(int fd) {
    dprintf(fd, "      targetStatus: %d\n", mTargetStatus);
    dprintf(fd, "      targetStatusTs: %lldmcs\n",
            (long long)(elapsedMcs(mTargetStatusTs, Clock::now())));
    dprintf(fd, "      health: {\n");
    dprintf(fd, "        path: %s\n", mHealthPath.c_str());
    dprintf(fd, "        base: %lldmcs (%lld)\n",
            (long long)(elapsedMcs(mHealthBase.userTs, Clock::now())),
            (long long)mHealthBase.kernelTsUs);
    dprintf(fd, "        blockedTimeoutMs: %d\n", int(mHealthCheckParams.blockedTimeoutMs));
    dprintf(fd, "        unhealthyTimeoutMs: %d\n", int(mHealthCheckParams.unhealthyTimeoutMs));
    dprintf(fd, "        unhealthyMonitoringMs: %d\n",
            int(mHealthCheckParams.unhealthyMonitoringMs));
    dprintf(fd, "      }\n");
    const auto& params = mParams;
    dprintf(fd, "      dataLoaderParams: {\n");
    dprintf(fd, "        type: %s\n", toString(params.type).c_str());
+2 −19
Original line number Diff line number Diff line
@@ -56,8 +56,6 @@ using StorageId = int;
using FileId = incfs::FileId;
using BlockIndex = incfs::BlockIndex;
using RawMetadata = incfs::RawMetadata;
using Clock = std::chrono::steady_clock;
using TimePoint = std::chrono::time_point<Clock>;
using Seconds = std::chrono::seconds;
using BootClockTsUs = uint64_t;

@@ -338,8 +336,6 @@ private:
    bool unregisterAppOpsCallback(const std::string& packageName);
    void onAppOpChanged(const std::string& packageName);

    using Job = std::function<void()>;

    void runJobProcessing();
    void extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile, ZipEntry& entry,
                        const incfs::FileId& libFileId, std::string_view targetLibPath,
@@ -347,9 +343,8 @@ private:

    void runCmdLooper();

    void addTimedJob(MountId id, TimePoint when, Job what);
    void addTimedJob(MountId id, Milliseconds after, Job what);
    void removeTimedJobs(MountId id);
    void runTimers();

private:
    const std::unique_ptr<VoldServiceWrapper> mVold;
@@ -358,6 +353,7 @@ private:
    const std::unique_ptr<AppOpsManagerWrapper> mAppOpsManager;
    const std::unique_ptr<JniWrapper> mJni;
    const std::unique_ptr<LooperWrapper> mLooper;
    const std::unique_ptr<TimedQueueWrapper> mTimedQueue;
    const std::string mIncrementalDir;

    mutable std::mutex mLock;
@@ -380,19 +376,6 @@ private:
    std::thread mJobProcessor;

    std::thread mCmdLooperThread;

    struct TimedJob {
        MountId id;
        TimePoint when;
        Job what;
        friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
            return lhs.when < rhs.when;
        }
    };
    std::set<TimedJob> mTimedJobs;
    std::condition_variable mTimerCondition;
    std::mutex mTimerMutex;
    std::thread mTimerThread;
};

} // namespace android::incremental
+88 −0
Original line number Diff line number Diff line
@@ -25,6 +25,8 @@
#include <binder/AppOpsManager.h>
#include <utils/String16.h>

#include <thread>

#include "IncrementalServiceValidation.h"

using namespace std::literals;
@@ -181,6 +183,88 @@ public:
    }
};

static JNIEnv* getOrAttachJniEnv(JavaVM* jvm);

class RealTimedQueueWrapper : public TimedQueueWrapper {
public:
    RealTimedQueueWrapper(JavaVM* jvm) {
        mThread = std::thread([this, jvm]() {
            (void)getOrAttachJniEnv(jvm);
            runTimers();
        });
    }
    ~RealTimedQueueWrapper() final {
        CHECK(!mRunning) << "call stop first";
        CHECK(!mThread.joinable()) << "call stop first";
    }

    void addJob(MountId id, Milliseconds after, Job what) final {
        const auto now = Clock::now();
        {
            std::unique_lock lock(mMutex);
            mJobs.insert(TimedJob{id, now + after, std::move(what)});
        }
        mCondition.notify_all();
    }
    void removeJobs(MountId id) final {
        std::unique_lock lock(mMutex);
        std::erase_if(mJobs, [id](auto&& item) { return item.id == id; });
    }
    void stop() final {
        {
            std::unique_lock lock(mMutex);
            mRunning = false;
        }
        mCondition.notify_all();
        mThread.join();
        mJobs.clear();
    }

private:
    void runTimers() {
        static constexpr TimePoint kInfinityTs{Clock::duration::max()};
        TimePoint nextJobTs = kInfinityTs;
        std::unique_lock lock(mMutex);
        for (;;) {
            mCondition.wait_until(lock, nextJobTs, [this, nextJobTs]() {
                const auto now = Clock::now();
                const auto firstJobTs = !mJobs.empty() ? mJobs.begin()->when : kInfinityTs;
                return !mRunning || firstJobTs <= now || firstJobTs < nextJobTs;
            });
            if (!mRunning) {
                return;
            }

            const auto now = Clock::now();
            auto it = mJobs.begin();
            // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
            for (; it != mJobs.end() && it->when <= now; it = mJobs.begin()) {
                auto job = std::move(it->what);
                mJobs.erase(it);

                lock.unlock();
                job();
                lock.lock();
            }
            nextJobTs = it != mJobs.end() ? it->when : kInfinityTs;
        }
    }

    struct TimedJob {
        MountId id;
        TimePoint when;
        Job what;
        friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
            return lhs.when < rhs.when;
        }
    };
    bool mRunning = true;
    std::set<TimedJob> mJobs;
    std::condition_variable mCondition;
    std::mutex mMutex;
    std::thread mThread;
};

RealServiceManager::RealServiceManager(sp<IServiceManager> serviceManager, JNIEnv* env)
      : mServiceManager(std::move(serviceManager)), mJvm(RealJniWrapper::getJvm(env)) {}

@@ -228,6 +312,10 @@ std::unique_ptr<LooperWrapper> RealServiceManager::getLooper() {
    return std::make_unique<RealLooperWrapper>();
}

std::unique_ptr<TimedQueueWrapper> RealServiceManager::getTimedQueue() {
    return std::make_unique<RealTimedQueueWrapper>(mJvm);
}

static JavaVM* getJavaVm(JNIEnv* env) {
    CHECK(env);
    JavaVM* jvm = nullptr;
+15 −0
Original line number Diff line number Diff line
@@ -35,6 +35,11 @@

namespace android::incremental {

using Clock = std::chrono::steady_clock;
using TimePoint = std::chrono::time_point<Clock>;
using Milliseconds = std::chrono::milliseconds;
using Job = std::function<void()>;

// --- Wrapper interfaces ---

using MountId = int32_t;
@@ -121,6 +126,14 @@ public:
    virtual int pollAll(int timeoutMillis) = 0;
};

class TimedQueueWrapper {
public:
    virtual ~TimedQueueWrapper() = default;
    virtual void addJob(MountId id, Milliseconds after, Job what) = 0;
    virtual void removeJobs(MountId id) = 0;
    virtual void stop() = 0;
};

class ServiceManagerWrapper {
public:
    virtual ~ServiceManagerWrapper() = default;
@@ -130,6 +143,7 @@ public:
    virtual std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() = 0;
    virtual std::unique_ptr<JniWrapper> getJni() = 0;
    virtual std::unique_ptr<LooperWrapper> getLooper() = 0;
    virtual std::unique_ptr<TimedQueueWrapper> getTimedQueue() = 0;
};

// --- Real stuff ---
@@ -144,6 +158,7 @@ public:
    std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() final;
    std::unique_ptr<JniWrapper> getJni() final;
    std::unique_ptr<LooperWrapper> getLooper() final;
    std::unique_ptr<TimedQueueWrapper> getTimedQueue() final;

private:
    template <class INTERFACE>
+211 −16

File changed.

Preview size limit exceeded, changes collapsed.