Loading cmds/statsd/src/StatsService.h +4 −0 Original line number Diff line number Diff line Loading @@ -272,6 +272,10 @@ private: FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade); FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval); FRIEND_TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit); FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket); FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket); FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket); FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket); }; } // namespace statsd Loading cmds/statsd/src/metrics/GaugeMetricProducer.cpp +23 −6 Original line number Diff line number Diff line Loading @@ -47,6 +47,9 @@ const int FIELD_ID_ID = 1; const int FIELD_ID_GAUGE_METRICS = 8; // for GaugeMetricDataWrapper const int FIELD_ID_DATA = 1; const int FIELD_ID_SKIPPED = 2; const int FIELD_ID_SKIPPED_START = 1; const int FIELD_ID_SKIPPED_END = 2; // for GaugeMetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; const int FIELD_ID_DIMENSION_IN_CONDITION = 2; Loading @@ -66,6 +69,7 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard), mStatsPullerManager(statsPullerManager), mPullTagId(pullTagId), mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first Loading Loading @@ -174,6 +178,15 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); for (const auto& pair : mSkippedBuckets) { uint64_t wrapperToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second); protoOutput->end(wrapperToken); } mSkippedBuckets.clear(); for (const auto& pair : mPastBuckets) { const MetricDimensionKey& dimensionKey = pair.first; Loading Loading @@ -440,6 +453,7 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { info.mBucketEndNs = fullBucketEndTimeNs; } if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { for (const auto& slice : *mCurrentSlicedBucket) { info.mGaugeAtoms = slice.second; auto& bucketList = mPastBuckets[slice.first]; Loading @@ -447,6 +461,9 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId, slice.first.toString().c_str()); } } else { mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); } // If we have anomaly trackers, we need to update the partial bucket values. if (mAnomalyTrackers.size() > 0) { Loading cmds/statsd/src/metrics/GaugeMetricProducer.h +5 −0 Original line number Diff line number Diff line Loading @@ -136,6 +136,11 @@ private: // this slice (ie, for partial buckets, we use the last partial bucket in this full bucket). std::shared_ptr<DimToValMap> mCurrentSlicedBucketForAnomaly; // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped. std::list<std::pair<int64_t, int64_t>> mSkippedBuckets; const int64_t mMinBucketSizeNs; // Translate Atom based bucket to single numeric value bucket for anomaly and updates the map // for each slice with the latest value. void updateCurrentSlicedBucketForAnomaly(); Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +29 −11 Original line number Diff line number Diff line Loading @@ -50,6 +50,9 @@ const int FIELD_ID_ID = 1; const int FIELD_ID_VALUE_METRICS = 7; // for ValueMetricDataWrapper const int FIELD_ID_DATA = 1; const int FIELD_ID_SKIPPED = 2; const int FIELD_ID_SKIPPED_START = 1; const int FIELD_ID_SKIPPED_END = 2; // for ValueMetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; const int FIELD_ID_DIMENSION_IN_CONDITION = 2; Loading @@ -69,6 +72,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric mValueField(metric.value_field()), mStatsPullerManager(statsPullerManager), mPullTagId(pullTagId), mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first Loading Loading @@ -156,12 +160,21 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, } else { flushIfNeededLocked(dumpTimeNs); } if (mPastBuckets.empty()) { if (mPastBuckets.empty() && mSkippedBuckets.empty()) { return; } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS); for (const auto& pair : mSkippedBuckets) { uint64_t wrapperToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second); protoOutput->end(wrapperToken); } mSkippedBuckets.clear(); for (const auto& pair : mPastBuckets) { const MetricDimensionKey& dimensionKey = pair.first; VLOG(" dimension key %s", dimensionKey.toString().c_str()); Loading Loading @@ -391,6 +404,8 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { info.mBucketEndNs = fullBucketEndTimeNs; } if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { // The current bucket is large enough to keep. int tainted = 0; for (const auto& slice : mCurrentSlicedBucket) { tainted += slice.second.tainted; Loading @@ -403,6 +418,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { } } VLOG("%d tainted pairs in the bucket", tainted); } else { mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); } if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker. // Accumulate partial buckets with current value and then send to anomaly tracker. Loading cmds/statsd/src/metrics/ValueMetricProducer.h +5 −0 Original line number Diff line number Diff line Loading @@ -148,6 +148,11 @@ private: // TODO: Add a lock to mPastBuckets. std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>> mPastBuckets; // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped. std::list<std::pair<int64_t, int64_t>> mSkippedBuckets; const int64_t mMinBucketSizeNs; // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const MetricDimensionKey& newKey); Loading Loading
cmds/statsd/src/StatsService.h +4 −0 Original line number Diff line number Diff line Loading @@ -272,6 +272,10 @@ private: FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade); FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval); FRIEND_TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit); FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket); FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket); FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket); FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket); }; } // namespace statsd Loading
cmds/statsd/src/metrics/GaugeMetricProducer.cpp +23 −6 Original line number Diff line number Diff line Loading @@ -47,6 +47,9 @@ const int FIELD_ID_ID = 1; const int FIELD_ID_GAUGE_METRICS = 8; // for GaugeMetricDataWrapper const int FIELD_ID_DATA = 1; const int FIELD_ID_SKIPPED = 2; const int FIELD_ID_SKIPPED_START = 1; const int FIELD_ID_SKIPPED_END = 2; // for GaugeMetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; const int FIELD_ID_DIMENSION_IN_CONDITION = 2; Loading @@ -66,6 +69,7 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard), mStatsPullerManager(statsPullerManager), mPullTagId(pullTagId), mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first Loading Loading @@ -174,6 +178,15 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); for (const auto& pair : mSkippedBuckets) { uint64_t wrapperToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second); protoOutput->end(wrapperToken); } mSkippedBuckets.clear(); for (const auto& pair : mPastBuckets) { const MetricDimensionKey& dimensionKey = pair.first; Loading Loading @@ -440,6 +453,7 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { info.mBucketEndNs = fullBucketEndTimeNs; } if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { for (const auto& slice : *mCurrentSlicedBucket) { info.mGaugeAtoms = slice.second; auto& bucketList = mPastBuckets[slice.first]; Loading @@ -447,6 +461,9 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId, slice.first.toString().c_str()); } } else { mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); } // If we have anomaly trackers, we need to update the partial bucket values. if (mAnomalyTrackers.size() > 0) { Loading
cmds/statsd/src/metrics/GaugeMetricProducer.h +5 −0 Original line number Diff line number Diff line Loading @@ -136,6 +136,11 @@ private: // this slice (ie, for partial buckets, we use the last partial bucket in this full bucket). std::shared_ptr<DimToValMap> mCurrentSlicedBucketForAnomaly; // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped. std::list<std::pair<int64_t, int64_t>> mSkippedBuckets; const int64_t mMinBucketSizeNs; // Translate Atom based bucket to single numeric value bucket for anomaly and updates the map // for each slice with the latest value. void updateCurrentSlicedBucketForAnomaly(); Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +29 −11 Original line number Diff line number Diff line Loading @@ -50,6 +50,9 @@ const int FIELD_ID_ID = 1; const int FIELD_ID_VALUE_METRICS = 7; // for ValueMetricDataWrapper const int FIELD_ID_DATA = 1; const int FIELD_ID_SKIPPED = 2; const int FIELD_ID_SKIPPED_START = 1; const int FIELD_ID_SKIPPED_END = 2; // for ValueMetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; const int FIELD_ID_DIMENSION_IN_CONDITION = 2; Loading @@ -69,6 +72,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric mValueField(metric.value_field()), mStatsPullerManager(statsPullerManager), mPullTagId(pullTagId), mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first Loading Loading @@ -156,12 +160,21 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, } else { flushIfNeededLocked(dumpTimeNs); } if (mPastBuckets.empty()) { if (mPastBuckets.empty() && mSkippedBuckets.empty()) { return; } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS); for (const auto& pair : mSkippedBuckets) { uint64_t wrapperToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second); protoOutput->end(wrapperToken); } mSkippedBuckets.clear(); for (const auto& pair : mPastBuckets) { const MetricDimensionKey& dimensionKey = pair.first; VLOG(" dimension key %s", dimensionKey.toString().c_str()); Loading Loading @@ -391,6 +404,8 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { info.mBucketEndNs = fullBucketEndTimeNs; } if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { // The current bucket is large enough to keep. int tainted = 0; for (const auto& slice : mCurrentSlicedBucket) { tainted += slice.second.tainted; Loading @@ -403,6 +418,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { } } VLOG("%d tainted pairs in the bucket", tainted); } else { mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); } if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker. // Accumulate partial buckets with current value and then send to anomaly tracker. Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +5 −0 Original line number Diff line number Diff line Loading @@ -148,6 +148,11 @@ private: // TODO: Add a lock to mPastBuckets. std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>> mPastBuckets; // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped. std::list<std::pair<int64_t, int64_t>> mSkippedBuckets; const int64_t mMinBucketSizeNs; // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const MetricDimensionKey& newKey); Loading