Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7b975a85 authored by Tej Singh's avatar Tej Singh
Browse files

Statsd: pull once per event time

If a pull happens at the same event time, we should reuse the existing
data, regardless of whether or not the cool down has been met. For
example, if an app upgrade happens at time t, and two metrics need to
pull atom a, if metric one pulls at time t, but metric two initiates the
pull at time t+2, we should still reuse the pull from time t since that
is when the app upgrade happened.

Bug: 156294650
Test: atest statsd_test

Change-Id: I4efc49545093f6683bf6dd89ed68c5dfa5b44d8f
parent 1ecd4528
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -826,7 +826,7 @@ status_t StatsService::cmd_print_pulled_metrics(int out, const Vector<String8>&
        uids.push_back(AID_SYSTEM);
    }
    vector<shared_ptr<LogEvent>> stats;
    if (mPullerManager->Pull(s, uids, &stats)) {
    if (mPullerManager->Pull(s, uids, getElapsedRealtimeNs(), &stats)) {
        for (const auto& it : stats) {
            dprintf(out, "Pull from %d: %s\n", s, it->ToString().c_str());
        }
+8 −5
Original line number Diff line number Diff line
@@ -38,14 +38,16 @@ StatsPuller::StatsPuller(const int tagId, const int64_t coolDownNs, const int64_
      mPullTimeoutNs(pullTimeoutNs),
      mCoolDownNs(coolDownNs),
      mAdditiveFields(additiveFields),
      mLastPullTimeNs(0) {
      mLastPullTimeNs(0),
      mLastEventTimeNs(0) {
}

bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) {
bool StatsPuller::Pull(const int64_t eventTimeNs, std::vector<std::shared_ptr<LogEvent>>* data) {
    lock_guard<std::mutex> lock(mLock);
    int64_t elapsedTimeNs = getElapsedRealtimeNs();
    StatsdStats::getInstance().notePull(mTagId);
    const bool shouldUseCache = elapsedTimeNs - mLastPullTimeNs < mCoolDownNs;
    const bool shouldUseCache =
            (mLastEventTimeNs == eventTimeNs) || (elapsedTimeNs - mLastPullTimeNs < mCoolDownNs);
    if (shouldUseCache) {
        if (mHasGoodData) {
            (*data) = mCachedData;
@@ -54,13 +56,13 @@ bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) {
        }
        return mHasGoodData;
    }

    if (mLastPullTimeNs > 0) {
        StatsdStats::getInstance().updateMinPullIntervalSec(
                mTagId, (elapsedTimeNs - mLastPullTimeNs) / NS_PER_SEC);
    }
    mCachedData.clear();
    mLastPullTimeNs = elapsedTimeNs;
    mLastEventTimeNs = eventTimeNs;
    mHasGoodData = PullInternal(&mCachedData);
    if (!mHasGoodData) {
        return mHasGoodData;
@@ -70,7 +72,7 @@ bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) {
    const bool pullTimeOut = pullDurationNs > mPullTimeoutNs;
    if (pullTimeOut) {
        // Something went wrong. Discard the data.
        clearCacheLocked();
        mCachedData.clear();
        mHasGoodData = false;
        StatsdStats::getInstance().notePullTimeout(mTagId);
        ALOGW("Pull for atom %d exceeds timeout %lld nano seconds.", mTagId,
@@ -104,6 +106,7 @@ int StatsPuller::clearCacheLocked() {
    int ret = mCachedData.size();
    mCachedData.clear();
    mLastPullTimeNs = 0;
    mLastEventTimeNs = 0;
    return ret;
}

+6 −1
Original line number Diff line number Diff line
@@ -51,7 +51,7 @@ public:
    //   2) pull takes longer than mPullTimeoutNs (intrinsic to puller)
    // If a metric wants to make any change to the data, like timestamps, it
    // should make a copy as this data may be shared with multiple metrics.
    bool Pull(std::vector<std::shared_ptr<LogEvent>>* data);
    bool Pull(const int64_t eventTimeNs, std::vector<std::shared_ptr<LogEvent>>* data);

    // Clear cache immediately
    int ForceClearCache();
@@ -94,6 +94,11 @@ private:

    int64_t mLastPullTimeNs;

    // All pulls happen due to an event (app upgrade, bucket boundary, condition change, etc).
    // If multiple pulls need to be done at the same event time, we will always use the cache after
    // the first pull.
    int64_t mLastEventTimeNs;

    // Cache of data from last pull. If next request comes before cool down finishes,
    // cached data will be returned.
    // Cached data is cleared when
+13 −10
Original line number Diff line number Diff line
@@ -91,20 +91,21 @@ StatsPullerManager::StatsPullerManager()
      mPullAtomCallbackDeathRecipient(AIBinder_DeathRecipient_new(pullAtomCallbackDied)) {
}

bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey,
bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
                              vector<shared_ptr<LogEvent>>* data, bool useUids) {
    std::lock_guard<std::mutex> _l(mLock);
    return PullLocked(tagId, configKey, data, useUids);
    return PullLocked(tagId, configKey, eventTimeNs, data, useUids);
}

bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids,
bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
                              vector<std::shared_ptr<LogEvent>>* data, bool useUids) {
    std::lock_guard<std::mutex> _l(mLock);
    return PullLocked(tagId, uids, data, useUids);
    return PullLocked(tagId, uids, eventTimeNs, data, useUids);
}

bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey,
                                    vector<shared_ptr<LogEvent>>* data, bool useUids) {
                                    const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data,
                                    bool useUids) {
    vector<int32_t> uids;
    if (useUids) {
        auto uidProviderIt = mPullUidProviders.find(configKey);
@@ -123,18 +124,19 @@ bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey,
        }
        uids = pullUidProvider->getPullAtomUids(tagId);
    }
    return PullLocked(tagId, uids, data, useUids);
    return PullLocked(tagId, uids, eventTimeNs, data, useUids);
}

bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids,
                                    vector<shared_ptr<LogEvent>>* data, bool useUids) {
                                    const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data,
                                    bool useUids) {
    VLOG("Initiating pulling %d", tagId);
    if (useUids) {
        for (int32_t uid : uids) {
            PullerKey key = {.atomTag = tagId, .uid = uid};
            auto pullerIt = kAllPullAtomInfo.find(key);
            if (pullerIt != kAllPullAtomInfo.end()) {
                bool ret = pullerIt->second->Pull(data);
                bool ret = pullerIt->second->Pull(eventTimeNs, data);
                VLOG("pulled %zu items", data->size());
                if (!ret) {
                    StatsdStats::getInstance().notePullFailed(tagId);
@@ -149,7 +151,7 @@ bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids,
        PullerKey key = {.atomTag = tagId, .uid = -1};
        auto pullerIt = kAllPullAtomInfo.find(key);
        if (pullerIt != kAllPullAtomInfo.end()) {
            bool ret = pullerIt->second->Pull(data);
            bool ret = pullerIt->second->Pull(eventTimeNs, data);
            VLOG("pulled %zu items", data->size());
            if (!ret) {
                StatsdStats::getInstance().notePullFailed(tagId);
@@ -290,7 +292,8 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
    }
    for (const auto& pullInfo : needToPull) {
        vector<shared_ptr<LogEvent>> data;
        bool pullSuccess = PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, &data);
        bool pullSuccess = PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey,
                                      elapsedTimeNs, &data);
        if (!pullSuccess) {
            VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
        }
+6 −6
Original line number Diff line number Diff line
@@ -101,11 +101,11 @@ public:
    //      registered for any of the uids for this atom.
    // If the metric wants to make any change to the data, like timestamps, they
    // should make a copy as this data may be shared with multiple metrics.
    virtual bool Pull(int tagId, const ConfigKey& configKey,
    virtual bool Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
                      vector<std::shared_ptr<LogEvent>>* data, bool useUids = true);

    // Same as above, but directly specify the allowed uids to pull from.
    virtual bool Pull(int tagId, const vector<int32_t>& uids,
    virtual bool Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
                      vector<std::shared_ptr<LogEvent>>* data, bool useUids = true);

    // Clear pull data cache immediately.
@@ -152,11 +152,11 @@ private:
    // mapping from Config Key to the PullUidProvider for that config
    std::map<ConfigKey, wp<PullUidProvider>> mPullUidProviders;

    bool PullLocked(int tagId, const ConfigKey& configKey, vector<std::shared_ptr<LogEvent>>* data,
                    bool useUids = true);
    bool PullLocked(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
                    vector<std::shared_ptr<LogEvent>>* data, bool useUids = true);

    bool PullLocked(int tagId, const vector<int32_t>& uids, vector<std::shared_ptr<LogEvent>>* data,
                    bool useUids);
    bool PullLocked(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
                    vector<std::shared_ptr<LogEvent>>* data, bool useUids);

    // locks for data receiver and StatsCompanionService changes
    std::mutex mLock;
Loading