Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +31 −11 Original line number Diff line number Diff line Loading @@ -524,6 +524,14 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn multiIntervals.resize(mFieldMatchers.size()); } // We only use anomaly detection under certain cases. // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics // containing multiple values. We tried to retain all previous behaviour, but we are unsure the // previous behaviour was correct. At the time of the fix, anomaly detection had no owner. // Whoever next works on it should look into the cases where it is triggered in this function. // Discussion here: http://ag/6124370. bool useAnomalyDetection = true; for (int i = 0; i < (int)mFieldMatchers.size(); i++) { const Matcher& matcher = mFieldMatchers[i]; Interval& interval = multiIntervals[i]; Loading @@ -546,7 +554,11 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn // no base. just update base and return. interval.base = value; interval.hasBase = true; return; // If we're missing a base, do not use anomaly detection on incomplete data useAnomalyDetection = false; // Continue (instead of return) here in order to set interval.base and // interval.hasBase for other intervals continue; } } Value diff; Loading @@ -560,7 +572,9 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn VLOG("Unexpected decreasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); interval.base = value; return; // If we've got bad data, do not use anomaly detection useAnomalyDetection = false; continue; } break; case ValueMetric::DECREASING: Loading @@ -572,7 +586,9 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn VLOG("Unexpected increasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); interval.base = value; return; // If we've got bad data, do not use anomaly detection useAnomalyDetection = false; continue; } break; case ValueMetric::ANY: Loading Loading @@ -608,6 +624,8 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn interval.sampleSize += 1; } // Only trigger the tracker if all intervals are correct if (useAnomalyDetection) { // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = multiIntervals[0].value.long_value; auto prev = mCurrentFullBucket.find(eventKey); Loading @@ -615,7 +633,9 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn wholeBucketVal += prev->second; } for (auto& tracker : mAnomalyTrackers) { tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, wholeBucketVal); tracker->detectAndDeclareAnomaly( eventTimeNs, mCurrentBucketNum, eventKey, wholeBucketVal); } } } Loading cmds/statsd/src/metrics/ValueMetricProducer.h +1 −0 Original line number Diff line number Diff line Loading @@ -212,6 +212,7 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket); FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime); FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput); FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutputMultiValue); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey); Loading cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +107 −0 Original line number Diff line number Diff line Loading @@ -1508,6 +1508,113 @@ TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput) { EXPECT_EQ(5, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value); } TEST(ValueMetricProducerTest, TestSkipZeroDiffOutputMultiValue) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.mutable_value_field()->add_child()->set_field(3); metric.set_aggregation_type(ValueMetric::MIN); metric.set_use_diff(true); UidMap uidMap; SimpleAtomMatcher atomMatcher; atomMatcher.set_atom_id(tagId); sp<EventMatcherWizard> eventMatcherWizard = new EventMatcherWizard({new SimpleLogMatchingTracker( atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex, eventMatcherWizard, -1, bucketStartTimeNs, bucketStartTimeNs, pullerManager); shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event1->write(1); event1->write(10); event1->write(20); event1->init(); shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 15); event2->write(1); event2->write(15); event2->write(22); event2->init(); valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval0 = valueProducer.mCurrentSlicedBucket.begin()->second[0]; ValueMetricProducer::Interval curInterval1 = valueProducer.mCurrentSlicedBucket.begin()->second[1]; EXPECT_EQ(true, curInterval0.hasBase); EXPECT_EQ(10, curInterval0.base.long_value); EXPECT_EQ(false, curInterval0.hasValue); EXPECT_EQ(true, curInterval1.hasBase); EXPECT_EQ(20, curInterval1.base.long_value); EXPECT_EQ(false, curInterval1.hasValue); valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval0 = valueProducer.mCurrentSlicedBucket.begin()->second[0]; curInterval1 = valueProducer.mCurrentSlicedBucket.begin()->second[1]; EXPECT_EQ(true, curInterval0.hasValue); EXPECT_EQ(5, curInterval0.value.long_value); EXPECT_EQ(true, curInterval1.hasValue); EXPECT_EQ(2, curInterval1.value.long_value); // no change in first value field shared_ptr<LogEvent> event3 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 10); event3->write(1); event3->write(15); event3->write(25); event3->init(); valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event3); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval0 = valueProducer.mCurrentSlicedBucket.begin()->second[0]; curInterval1 = valueProducer.mCurrentSlicedBucket.begin()->second[1]; EXPECT_EQ(true, curInterval0.hasBase); EXPECT_EQ(15, curInterval0.base.long_value); EXPECT_EQ(true, curInterval0.hasValue); EXPECT_EQ(true, curInterval1.hasBase); EXPECT_EQ(25, curInterval1.base.long_value); EXPECT_EQ(true, curInterval1.hasValue); shared_ptr<LogEvent> event4 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 15); event4->write(1); event4->write(15); event4->write(29); event4->init(); valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event4); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval0 = valueProducer.mCurrentSlicedBucket.begin()->second[0]; curInterval1 = valueProducer.mCurrentSlicedBucket.begin()->second[1]; EXPECT_EQ(true, curInterval0.hasBase); EXPECT_EQ(15, curInterval0.base.long_value); EXPECT_EQ(true, curInterval0.hasValue); EXPECT_EQ(true, curInterval1.hasBase); EXPECT_EQ(29, curInterval1.base.long_value); EXPECT_EQ(true, curInterval1.hasValue); valueProducer.flushIfNeededLocked(bucket3StartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second[0].values.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second[1].values.size()); EXPECT_EQ(5, valueProducer.mPastBuckets.begin()->second[0].values[0].long_value); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].valueIndex[0]); EXPECT_EQ(2, valueProducer.mPastBuckets.begin()->second[0].values[1].long_value); EXPECT_EQ(1, valueProducer.mPastBuckets.begin()->second[0].valueIndex[1]); EXPECT_EQ(3, valueProducer.mPastBuckets.begin()->second[1].values[0].long_value); EXPECT_EQ(1, valueProducer.mPastBuckets.begin()->second[1].valueIndex[0]); } /* * Tests zero default base. */ Loading Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +31 −11 Original line number Diff line number Diff line Loading @@ -524,6 +524,14 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn multiIntervals.resize(mFieldMatchers.size()); } // We only use anomaly detection under certain cases. // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics // containing multiple values. We tried to retain all previous behaviour, but we are unsure the // previous behaviour was correct. At the time of the fix, anomaly detection had no owner. // Whoever next works on it should look into the cases where it is triggered in this function. // Discussion here: http://ag/6124370. bool useAnomalyDetection = true; for (int i = 0; i < (int)mFieldMatchers.size(); i++) { const Matcher& matcher = mFieldMatchers[i]; Interval& interval = multiIntervals[i]; Loading @@ -546,7 +554,11 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn // no base. just update base and return. interval.base = value; interval.hasBase = true; return; // If we're missing a base, do not use anomaly detection on incomplete data useAnomalyDetection = false; // Continue (instead of return) here in order to set interval.base and // interval.hasBase for other intervals continue; } } Value diff; Loading @@ -560,7 +572,9 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn VLOG("Unexpected decreasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); interval.base = value; return; // If we've got bad data, do not use anomaly detection useAnomalyDetection = false; continue; } break; case ValueMetric::DECREASING: Loading @@ -572,7 +586,9 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn VLOG("Unexpected increasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); interval.base = value; return; // If we've got bad data, do not use anomaly detection useAnomalyDetection = false; continue; } break; case ValueMetric::ANY: Loading Loading @@ -608,6 +624,8 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn interval.sampleSize += 1; } // Only trigger the tracker if all intervals are correct if (useAnomalyDetection) { // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = multiIntervals[0].value.long_value; auto prev = mCurrentFullBucket.find(eventKey); Loading @@ -615,7 +633,9 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn wholeBucketVal += prev->second; } for (auto& tracker : mAnomalyTrackers) { tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, wholeBucketVal); tracker->detectAndDeclareAnomaly( eventTimeNs, mCurrentBucketNum, eventKey, wholeBucketVal); } } } Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +1 −0 Original line number Diff line number Diff line Loading @@ -212,6 +212,7 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket); FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime); FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput); FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutputMultiValue); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey); Loading
cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +107 −0 Original line number Diff line number Diff line Loading @@ -1508,6 +1508,113 @@ TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput) { EXPECT_EQ(5, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value); } TEST(ValueMetricProducerTest, TestSkipZeroDiffOutputMultiValue) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.mutable_value_field()->add_child()->set_field(3); metric.set_aggregation_type(ValueMetric::MIN); metric.set_use_diff(true); UidMap uidMap; SimpleAtomMatcher atomMatcher; atomMatcher.set_atom_id(tagId); sp<EventMatcherWizard> eventMatcherWizard = new EventMatcherWizard({new SimpleLogMatchingTracker( atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex, eventMatcherWizard, -1, bucketStartTimeNs, bucketStartTimeNs, pullerManager); shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event1->write(1); event1->write(10); event1->write(20); event1->init(); shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 15); event2->write(1); event2->write(15); event2->write(22); event2->init(); valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval0 = valueProducer.mCurrentSlicedBucket.begin()->second[0]; ValueMetricProducer::Interval curInterval1 = valueProducer.mCurrentSlicedBucket.begin()->second[1]; EXPECT_EQ(true, curInterval0.hasBase); EXPECT_EQ(10, curInterval0.base.long_value); EXPECT_EQ(false, curInterval0.hasValue); EXPECT_EQ(true, curInterval1.hasBase); EXPECT_EQ(20, curInterval1.base.long_value); EXPECT_EQ(false, curInterval1.hasValue); valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval0 = valueProducer.mCurrentSlicedBucket.begin()->second[0]; curInterval1 = valueProducer.mCurrentSlicedBucket.begin()->second[1]; EXPECT_EQ(true, curInterval0.hasValue); EXPECT_EQ(5, curInterval0.value.long_value); EXPECT_EQ(true, curInterval1.hasValue); EXPECT_EQ(2, curInterval1.value.long_value); // no change in first value field shared_ptr<LogEvent> event3 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 10); event3->write(1); event3->write(15); event3->write(25); event3->init(); valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event3); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval0 = valueProducer.mCurrentSlicedBucket.begin()->second[0]; curInterval1 = valueProducer.mCurrentSlicedBucket.begin()->second[1]; EXPECT_EQ(true, curInterval0.hasBase); EXPECT_EQ(15, curInterval0.base.long_value); EXPECT_EQ(true, curInterval0.hasValue); EXPECT_EQ(true, curInterval1.hasBase); EXPECT_EQ(25, curInterval1.base.long_value); EXPECT_EQ(true, curInterval1.hasValue); shared_ptr<LogEvent> event4 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 15); event4->write(1); event4->write(15); event4->write(29); event4->init(); valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event4); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval0 = valueProducer.mCurrentSlicedBucket.begin()->second[0]; curInterval1 = valueProducer.mCurrentSlicedBucket.begin()->second[1]; EXPECT_EQ(true, curInterval0.hasBase); EXPECT_EQ(15, curInterval0.base.long_value); EXPECT_EQ(true, curInterval0.hasValue); EXPECT_EQ(true, curInterval1.hasBase); EXPECT_EQ(29, curInterval1.base.long_value); EXPECT_EQ(true, curInterval1.hasValue); valueProducer.flushIfNeededLocked(bucket3StartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second[0].values.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second[1].values.size()); EXPECT_EQ(5, valueProducer.mPastBuckets.begin()->second[0].values[0].long_value); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].valueIndex[0]); EXPECT_EQ(2, valueProducer.mPastBuckets.begin()->second[0].values[1].long_value); EXPECT_EQ(1, valueProducer.mPastBuckets.begin()->second[0].valueIndex[1]); EXPECT_EQ(3, valueProducer.mPastBuckets.begin()->second[1].values[0].long_value); EXPECT_EQ(1, valueProducer.mPastBuckets.begin()->second[1].valueIndex[0]); } /* * Tests zero default base. */ Loading