Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +9 −5 Original line number Diff line number Diff line Loading @@ -733,6 +733,11 @@ bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) return false; } bool ValueMetricProducer::multipleBucketsSkipped(const int64_t numBucketsForward) { // Skip buckets if this is a pulled metric or a pushed metric that is diffed. return numBucketsForward > 1 && (mIsPulled || mUseDiff); } void ValueMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const MetricDimensionKey& eventKey, const ConditionKey& conditionKey, bool condition, const LogEvent& event, Loading Loading @@ -910,8 +915,9 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( interval.sampleSize += 1; } // Only trigger the tracker if all intervals are correct if (useAnomalyDetection) { // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due // to MULTIPLE_BUCKETS_SKIPPED. if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) { // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = intervals[0].value.long_value; auto prev = mCurrentFullBucket.find(eventKey); Loading Loading @@ -961,9 +967,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, int64_t bucketEndTime = fullBucketEndTimeNs; int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); // Skip buckets if this is a pulled metric or a pushed metric that is diffed. if (numBucketsForward > 1 && (mIsPulled || mUseDiff)) { if (multipleBucketsSkipped(numBucketsForward)) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); // Something went wrong. Maybe the device was sleeping for a long time. It is better Loading cmds/statsd/src/metrics/ValueMetricProducer.h +2 −0 Original line number Diff line number Diff line Loading @@ -219,6 +219,8 @@ private: void pullAndMatchEventsLocked(const int64_t timestampNs); bool multipleBucketsSkipped(const int64_t numBucketsForward); void accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, int64_t originalPullTimeNs, int64_t eventElapsedTimeNs); Loading cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +43 −0 Original line number Diff line number Diff line Loading @@ -1081,6 +1081,49 @@ TEST(ValueMetricProducerTest, TestAnomalyDetection) { std::ceil(1.0 * event6.GetElapsedTimestampNs() / NS_PER_SEC + refPeriodSec)); } TEST(ValueMetricProducerTest, TestAnomalyDetectionMultipleBucketsSkipped) { sp<AlarmMonitor> alarmMonitor; Alert alert; alert.set_id(101); alert.set_metric_id(metricId); alert.set_trigger_if_sum_gt(100); alert.set_num_buckets(1); const int32_t refPeriodSec = 3; alert.set_refractory_period_secs(refPeriodSec); ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _)) .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data) { EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 1); // Condition change to true time. data->clear(); data->push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 1, 0)); return true; })) .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data) { EXPECT_EQ(eventTimeNs, bucket3StartTimeNs + 100); // Condition changed to false time. data->clear(); data->push_back(CreateRepeatedValueLogEvent(tagId, bucket3StartTimeNs + 100, 120)); return true; })); sp<ValueMetricProducer> valueProducer = ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric, ConditionState::kFalse); sp<AnomalyTracker> anomalyTracker = valueProducer->addAnomalyTracker(alert, alarmMonitor); valueProducer->onConditionChanged(true, bucketStartTimeNs + 1); // multiple buckets should be skipped here. valueProducer->onConditionChanged(false, bucket3StartTimeNs + 100); // No alert is fired when multiple buckets are skipped. EXPECT_EQ(anomalyTracker->getRefractoryPeriodEndsSec(DEFAULT_METRIC_DIMENSION_KEY), 0U); } // Test value metric no condition, the pull on bucket boundary come in time and too late TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); Loading Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +9 −5 Original line number Diff line number Diff line Loading @@ -733,6 +733,11 @@ bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) return false; } bool ValueMetricProducer::multipleBucketsSkipped(const int64_t numBucketsForward) { // Skip buckets if this is a pulled metric or a pushed metric that is diffed. return numBucketsForward > 1 && (mIsPulled || mUseDiff); } void ValueMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const MetricDimensionKey& eventKey, const ConditionKey& conditionKey, bool condition, const LogEvent& event, Loading Loading @@ -910,8 +915,9 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( interval.sampleSize += 1; } // Only trigger the tracker if all intervals are correct if (useAnomalyDetection) { // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due // to MULTIPLE_BUCKETS_SKIPPED. if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) { // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = intervals[0].value.long_value; auto prev = mCurrentFullBucket.find(eventKey); Loading Loading @@ -961,9 +967,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, int64_t bucketEndTime = fullBucketEndTimeNs; int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); // Skip buckets if this is a pulled metric or a pushed metric that is diffed. if (numBucketsForward > 1 && (mIsPulled || mUseDiff)) { if (multipleBucketsSkipped(numBucketsForward)) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); // Something went wrong. Maybe the device was sleeping for a long time. It is better Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +2 −0 Original line number Diff line number Diff line Loading @@ -219,6 +219,8 @@ private: void pullAndMatchEventsLocked(const int64_t timestampNs); bool multipleBucketsSkipped(const int64_t numBucketsForward); void accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, int64_t originalPullTimeNs, int64_t eventElapsedTimeNs); Loading
cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +43 −0 Original line number Diff line number Diff line Loading @@ -1081,6 +1081,49 @@ TEST(ValueMetricProducerTest, TestAnomalyDetection) { std::ceil(1.0 * event6.GetElapsedTimestampNs() / NS_PER_SEC + refPeriodSec)); } TEST(ValueMetricProducerTest, TestAnomalyDetectionMultipleBucketsSkipped) { sp<AlarmMonitor> alarmMonitor; Alert alert; alert.set_id(101); alert.set_metric_id(metricId); alert.set_trigger_if_sum_gt(100); alert.set_num_buckets(1); const int32_t refPeriodSec = 3; alert.set_refractory_period_secs(refPeriodSec); ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _)) .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data) { EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 1); // Condition change to true time. data->clear(); data->push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 1, 0)); return true; })) .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data) { EXPECT_EQ(eventTimeNs, bucket3StartTimeNs + 100); // Condition changed to false time. data->clear(); data->push_back(CreateRepeatedValueLogEvent(tagId, bucket3StartTimeNs + 100, 120)); return true; })); sp<ValueMetricProducer> valueProducer = ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric, ConditionState::kFalse); sp<AnomalyTracker> anomalyTracker = valueProducer->addAnomalyTracker(alert, alarmMonitor); valueProducer->onConditionChanged(true, bucketStartTimeNs + 1); // multiple buckets should be skipped here. valueProducer->onConditionChanged(false, bucket3StartTimeNs + 100); // No alert is fired when multiple buckets are skipped. EXPECT_EQ(anomalyTracker->getRefractoryPeriodEndsSec(DEFAULT_METRIC_DIMENSION_KEY), 0U); } // Test value metric no condition, the pull on bucket boundary come in time and too late TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); Loading