Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9a5d3598 authored by Olivier Gaillard's avatar Olivier Gaillard
Browse files

Adds the concept of invalid bucket.

Whenever a pull fails, we cannot compute a correct metric for this
bucket. Whether the failures happen at the beginning, in the middle or
at the end of the bucket, we cannot trust the data inside the bucket.

Let's say we have a metric with a screen on condition, a bucket size of
4 hours and we have the following events:
- h+0 bucket start -- screen is on -- data pull failed
- h+2 screen off -- pull succeed
- h+3 screen on -- pull succeed
- h+4 bucket end -- screen on -- pull succeed

The current logic will be wrong, it will ignore any data between h+0
and h+2. That timespan might be a huge contributor of the total bucket
value so this is wrong to keep any data from this bucket.

We also extend the concept of invalid buckets to other problems like a
pull being too long.

Bug: 123866830
Test: atest statsd_test
Change-Id: I300ab05cd7582cd2d7af9167de8d99b349071f0d
parent c5f11c40
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -448,6 +448,11 @@ void StatsdStats::noteConditionChangeInNextBucket(int metricId) {
    getAtomMetricStats(metricId).conditionChangeInNextBucket++;
}

void StatsdStats::noteInvalidatedBucket(int metricId) {
    lock_guard<std::mutex> lock(mLock);
    getAtomMetricStats(metricId).invalidatedBucket++;
}

StatsdStats::AtomMetricStats& StatsdStats::getAtomMetricStats(int metricId) {
    auto atomMetricStatsIter = mAtomMetricStats.find(metricId);
    if (atomMetricStatsIter != mAtomMetricStats.end()) {
+6 −0
Original line number Diff line number Diff line
@@ -364,6 +364,11 @@ public:
     */
    void noteConditionChangeInNextBucket(int atomId);

    /**
     * A bucket has been tagged as invalid.
     */
    void noteInvalidatedBucket(int metricId);

    /**
     * Reset the historical stats. Including all stats in icebox, and the tracked stats about
     * metrics, matchers, and atoms. The active configs will be kept and StatsdStats will continue
@@ -408,6 +413,7 @@ public:
        long skippedForwardBuckets = 0;
        long badValueType = 0;
        long conditionChangeInNextBucket = 0;
        long invalidatedBucket = 0;
    } AtomMetricStats;

private:
+72 −44
Original line number Diff line number Diff line
@@ -104,6 +104,7 @@ ValueMetricProducer::ValueMetricProducer(
      mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
      mUseZeroDefaultBase(metric.use_zero_default_base()),
      mHasGlobalBase(false),
      mCurrentBucketIsInvalid(false),
      mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
                                                      : StatsdStats::kPullMaxDelayNs),
      mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()) {
@@ -308,6 +309,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
    }
}

void ValueMetricProducer::invalidateCurrentBucket() {
    if (!mCurrentBucketIsInvalid) {
        // Only report once per invalid bucket.
        StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
    }
    mCurrentBucketIsInvalid = true;
    resetBase();
}

void ValueMetricProducer::resetBase() {
    for (auto& slice : mCurrentSlicedBucket) {
        for (auto& interval : slice.second) {
@@ -323,6 +333,7 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
             (long long)mCurrentBucketStartTimeNs);
        StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
        invalidateCurrentBucket();
        return;
    }

@@ -346,19 +357,20 @@ void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
    vector<std::shared_ptr<LogEvent>> allData;
    if (!mPullerManager->Pull(mPullTagId, &allData)) {
        ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
        resetBase();
        invalidateCurrentBucket();
        return;
    }
    const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
    StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
    if (pullDelayNs > mMaxPullDelayNs) {
        ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
              (long long)mMaxPullDelayNs);
        StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
        StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
        resetBase();
        // We are missing one pull from the bucket which means we will not have a complete view of
        // what's going on.
        invalidateCurrentBucket();
        return;
    }
    StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);

    if (timestampNs < mCurrentBucketStartTimeNs) {
        // The data will be skipped in onMatchedLogEventInternalLocked, but we don't want to report
@@ -388,7 +400,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
    if (mCondition) {
        if (!pullSuccess) {
            // If the pull failed, we won't be able to compute a diff.
            resetBase();
            invalidateCurrentBucket();
            return;
        }

@@ -687,16 +699,36 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
    VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
         (int)mCurrentSlicedBucket.size());
    int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();

    int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;

    if (bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
    bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
    if (isBucketLargeEnough && !mCurrentBucketIsInvalid) {
        // The current bucket is large enough to keep.
        for (const auto& slice : mCurrentSlicedBucket) {
            ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
            // it will auto create new vector of ValuebucketInfo if the key is not found.
            if (bucket.valueIndex.size() > 0) {
                auto& bucketList = mPastBuckets[slice.first];
                bucketList.push_back(bucket);
            }
        }
    } else {
        mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
    }

    if (!mCurrentBucketIsInvalid) {
        appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
    }
    initCurrentSlicedBucket();
    mCurrentBucketIsInvalid = false;
}

ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
                                                    const std::vector<Interval>& intervals) {
    ValueBucket bucket;
    bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
    bucket.mBucketEndNs = bucketEndTime;
            for (const auto& interval : slice.second) {
    for (const auto& interval : intervals) {
        if (interval.hasValue) {
            // skip the output if the diff is zero
            if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
@@ -712,16 +744,30 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
            }
        }
    }
            // it will auto create new vector of ValuebucketInfo if the key is not found.
            if (bucket.valueIndex.size() > 0) {
                auto& bucketList = mPastBuckets[slice.first];
                bucketList.push_back(bucket);
    return bucket;
}

void ValueMetricProducer::initCurrentSlicedBucket() {
    for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
        bool obsolete = true;
        for (auto& interval : it->second) {
            interval.hasValue = false;
            interval.sampleSize = 0;
            if (interval.seenNewData) {
                obsolete = false;
            }
            interval.seenNewData = false;
        }

        if (obsolete) {
            it = mCurrentSlicedBucket.erase(it);
        } else {
        mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
            it++;
        }
    }
}

void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
    if (eventTimeNs > fullBucketEndTimeNs) {  // If full bucket, send to anomaly tracker.
        // Accumulate partial buckets with current value and then send to anomaly tracker.
        if (mCurrentFullBucket.size() > 0) {
@@ -759,24 +805,6 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
            mCurrentFullBucket[slice.first] += slice.second[0].value.long_value;
        }
    }

    for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
        bool obsolete = true;
        for (auto& interval : it->second) {
            interval.hasValue = false;
            interval.sampleSize = 0;
            if (interval.seenNewData) {
                obsolete = false;
            }
            interval.seenNewData = false;
        }

        if (obsolete) {
            it = mCurrentSlicedBucket.erase(it);
        } else {
            it++;
        }
    }
}

size_t ValueMetricProducer::byteSizeLocked() const {
+17 −0
Original line number Diff line number Diff line
@@ -103,6 +103,9 @@ private:
    // Calculate previous bucket end time based on current time.
    int64_t calcPreviousBucketEndTime(const int64_t currentTimeNs);

    // Mark the data as invalid.
    void invalidateCurrentBucket();

    const int mWhatMatcherIndex;

    sp<EventMatcherWizard> mEventMatcherWizard;
@@ -156,6 +159,11 @@ private:

    void pullAndMatchEventsLocked(const int64_t timestampNs);

    ValueBucket buildPartialBucket(int64_t bucketEndTime,
                                   const std::vector<Interval>& intervals);
    void initCurrentSlicedBucket();
    void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs);

    // Reset diff base and mHasGlobalBase
    void resetBase();

@@ -187,6 +195,12 @@ private:
    // diff against.
    bool mHasGlobalBase;

    // Invalid bucket. There was a problem in collecting data in the current bucket so we cannot
    // trust any of the data in this bucket.
    //
    // For instance, one pull failed.
    bool mCurrentBucketIsInvalid;

    const int64_t mMaxPullDelayNs;

    const bool mSplitBucketForAppUpgrade;
@@ -221,6 +235,9 @@ private:
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed);
    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed);
    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed);
};

}  // namespace statsd
+1 −0
Original line number Diff line number Diff line
@@ -417,6 +417,7 @@ message StatsdStatsReport {
      optional int64 skipped_forward_buckets = 4;
      optional int64 bad_value_type = 5;
      optional int64 condition_change_in_next_bucket = 6;
      optional int64 invalidated_bucket = 7;
    }
    repeated AtomMetricStats atom_metric_stats = 17;

Loading