Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 81245fd5 authored by David Chen's avatar David Chen
Browse files

Adds option to drop small buckets for statsd.

We notice that some of the pulled metrics have a ton of data, and
during app upgrades, we're forming partial buckets that represent
small periods of time but require many bytes of data. We now have an
option to drop these buckets that are too short. Note that we still
have to pull the data to keep the metrics for the next bucket
correct. We include a new field in the value and gauge metric outputs
so that it's easy to tell when a bucket was dropped.

We drop the partial buckets also from anomaly detection since we
should be computing anomalies from the same data that is reported.

Test: Added unit-tests for value and gauge metrics.
Bug: 77925710
Change-Id: Ic370496377c6afd380e02278a6c1ed8b521a2731
parent 34a0b18a
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -272,6 +272,10 @@ private:
    FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade);
    FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval);
    FRIEND_TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit);
    FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket);
    FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket);
    FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket);
    FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket);
};

}  // namespace statsd
+23 −6
Original line number Diff line number Diff line
@@ -47,6 +47,9 @@ const int FIELD_ID_ID = 1;
const int FIELD_ID_GAUGE_METRICS = 8;
// for GaugeMetricDataWrapper
const int FIELD_ID_DATA = 1;
const int FIELD_ID_SKIPPED = 2;
const int FIELD_ID_SKIPPED_START = 1;
const int FIELD_ID_SKIPPED_END = 2;
// for GaugeMetricData
const int FIELD_ID_DIMENSION_IN_WHAT = 1;
const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
@@ -66,6 +69,7 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric
    : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
      mStatsPullerManager(statsPullerManager),
      mPullTagId(pullTagId),
      mMinBucketSizeNs(metric.min_bucket_size_nanos()),
      mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
                                          StatsdStats::kAtomDimensionKeySizeLimitMap.end()
                                  ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -174,6 +178,15 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
    uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);

    for (const auto& pair : mSkippedBuckets) {
        uint64_t wrapperToken =
                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first);
        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second);
        protoOutput->end(wrapperToken);
    }
    mSkippedBuckets.clear();

    for (const auto& pair : mPastBuckets) {
        const MetricDimensionKey& dimensionKey = pair.first;

@@ -440,6 +453,7 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
        info.mBucketEndNs = fullBucketEndTimeNs;
    }

    if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
        for (const auto& slice : *mCurrentSlicedBucket) {
            info.mGaugeAtoms = slice.second;
            auto& bucketList = mPastBuckets[slice.first];
@@ -447,6 +461,9 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
            VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
                 slice.first.toString().c_str());
        }
    } else {
        mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
    }

    // If we have anomaly trackers, we need to update the partial bucket values.
    if (mAnomalyTrackers.size() > 0) {
+5 −0
Original line number Diff line number Diff line
@@ -136,6 +136,11 @@ private:
    // this slice (ie, for partial buckets, we use the last partial bucket in this full bucket).
    std::shared_ptr<DimToValMap> mCurrentSlicedBucketForAnomaly;

    // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped.
    std::list<std::pair<int64_t, int64_t>> mSkippedBuckets;

    const int64_t mMinBucketSizeNs;

    // Translate Atom based bucket to single numeric value bucket for anomaly and updates the map
    // for each slice with the latest value.
    void updateCurrentSlicedBucketForAnomaly();
+29 −11
Original line number Diff line number Diff line
@@ -50,6 +50,9 @@ const int FIELD_ID_ID = 1;
const int FIELD_ID_VALUE_METRICS = 7;
// for ValueMetricDataWrapper
const int FIELD_ID_DATA = 1;
const int FIELD_ID_SKIPPED = 2;
const int FIELD_ID_SKIPPED_START = 1;
const int FIELD_ID_SKIPPED_END = 2;
// for ValueMetricData
const int FIELD_ID_DIMENSION_IN_WHAT = 1;
const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
@@ -69,6 +72,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
      mValueField(metric.value_field()),
      mStatsPullerManager(statsPullerManager),
      mPullTagId(pullTagId),
      mMinBucketSizeNs(metric.min_bucket_size_nanos()),
      mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
                                          StatsdStats::kAtomDimensionKeySizeLimitMap.end()
                                  ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -156,12 +160,21 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
    } else {
        flushIfNeededLocked(dumpTimeNs);
    }
    if (mPastBuckets.empty()) {
    if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
        return;
    }
    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
    uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);

    for (const auto& pair : mSkippedBuckets) {
        uint64_t wrapperToken =
                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first);
        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second);
        protoOutput->end(wrapperToken);
    }
    mSkippedBuckets.clear();

    for (const auto& pair : mPastBuckets) {
        const MetricDimensionKey& dimensionKey = pair.first;
        VLOG("  dimension key %s", dimensionKey.toString().c_str());
@@ -391,6 +404,8 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
        info.mBucketEndNs = fullBucketEndTimeNs;
    }

    if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
        // The current bucket is large enough to keep.
        int tainted = 0;
        for (const auto& slice : mCurrentSlicedBucket) {
            tainted += slice.second.tainted;
@@ -403,6 +418,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
            }
        }
        VLOG("%d tainted pairs in the bucket", tainted);
    } else {
        mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
    }

    if (eventTimeNs > fullBucketEndTimeNs) {  // If full bucket, send to anomaly tracker.
        // Accumulate partial buckets with current value and then send to anomaly tracker.
+5 −0
Original line number Diff line number Diff line
@@ -148,6 +148,11 @@ private:
    // TODO: Add a lock to mPastBuckets.
    std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>> mPastBuckets;

    // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped.
    std::list<std::pair<int64_t, int64_t>> mSkippedBuckets;

    const int64_t mMinBucketSizeNs;

    // Util function to check whether the specified dimension hits the guardrail.
    bool hitGuardRailLocked(const MetricDimensionKey& newKey);

Loading