Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +26 −9 Original line number Diff line number Diff line Loading @@ -787,7 +787,6 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, } int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); mCurrentBucketNum += numBucketsForward; if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); Loading Loading @@ -816,10 +815,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime); } if (!mCurrentBucketIsInvalid) { appendToFullBucket(eventTimeNs, fullBucketEndTimeNs); } initCurrentSlicedBucket(nextBucketStartTimeNs); mCurrentBucketNum += numBucketsForward; } ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime, Loading Loading @@ -879,7 +877,17 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) } void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) { if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker. bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs; if (mCurrentBucketIsInvalid) { if (isFullBucketReached) { // If the bucket is invalid, we ignore the full bucket since it contains invalid data. mCurrentFullBucket.clear(); } // Current bucket is invalid, we do not add it to the full bucket. return; } if (isFullBucketReached) { // If full bucket, send to anomaly tracker. // Accumulate partial buckets with current value and then send to anomaly tracker. if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { Loading @@ -887,7 +895,10 @@ void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBu continue; } // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; auto& interval = slice.second[0]; if (interval.hasValue) { mCurrentFullBucket[slice.first] += interval.value.long_value; } } for (const auto& slice : mCurrentFullBucket) { for (auto& tracker : mAnomalyTrackers) { Loading @@ -903,17 +914,23 @@ void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBu for (auto& tracker : mAnomalyTrackers) { if (tracker != nullptr) { // TODO: fix this when anomaly can accept double values tracker->addPastBucket(slice.first, slice.second[0].value.long_value, auto& interval = slice.second[0]; if (interval.hasValue) { tracker->addPastBucket(slice.first, interval.value.long_value, mCurrentBucketNum); } } } } } } else { // Accumulate partial bucket. for (const auto& slice : mCurrentSlicedBucket) { // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; auto& interval = slice.second[0]; if (interval.hasValue) { mCurrentFullBucket[slice.first] += interval.value.long_value; } } } } Loading cmds/statsd/src/metrics/ValueMetricProducer.h +1 −0 Original line number Diff line number Diff line Loading @@ -244,6 +244,7 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled); FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition); FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket); FRIEND_TEST(ValueMetricProducerTest, TestFullBucketResetWhenLastBucketInvalid); FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit); FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed); FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed); Loading cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +31 −0 Original line number Diff line number Diff line Loading @@ -2500,6 +2500,37 @@ TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid) { EXPECT_EQ(0UL, valueProducer->mPastBuckets.size()); } TEST(ValueMetricProducerTest, TestFullBucketResetWhenLastBucketInvalid) { ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, Pull(tagId, _)) // Initialization. .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1)); return true; })) // notifyAppUpgrade. .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); data->push_back(ValueMetricProducerTestHelper::createEvent( bucketStartTimeNs + bucketSizeNs / 2, 10)); return true; })); sp<ValueMetricProducer> valueProducer = ValueMetricProducerTestHelper::createValueProducerNoConditions(pullerManager, metric); ASSERT_EQ(0UL, valueProducer->mCurrentFullBucket.size()); valueProducer->notifyAppUpgrade(bucketStartTimeNs + bucketSizeNs / 2, "com.foo", 10000, 1); ASSERT_EQ(1UL, valueProducer->mCurrentFullBucket.size()); vector<shared_ptr<LogEvent>> allData; allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket3StartTimeNs + 1, 4)); valueProducer->onDataPulled(allData, /** fails */ false, bucket3StartTimeNs + 1); ASSERT_EQ(0UL, valueProducer->mCurrentFullBucket.size()); } TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange) { ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); Loading Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +26 −9 Original line number Diff line number Diff line Loading @@ -787,7 +787,6 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, } int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); mCurrentBucketNum += numBucketsForward; if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); Loading Loading @@ -816,10 +815,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime); } if (!mCurrentBucketIsInvalid) { appendToFullBucket(eventTimeNs, fullBucketEndTimeNs); } initCurrentSlicedBucket(nextBucketStartTimeNs); mCurrentBucketNum += numBucketsForward; } ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime, Loading Loading @@ -879,7 +877,17 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) } void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) { if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker. bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs; if (mCurrentBucketIsInvalid) { if (isFullBucketReached) { // If the bucket is invalid, we ignore the full bucket since it contains invalid data. mCurrentFullBucket.clear(); } // Current bucket is invalid, we do not add it to the full bucket. return; } if (isFullBucketReached) { // If full bucket, send to anomaly tracker. // Accumulate partial buckets with current value and then send to anomaly tracker. if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { Loading @@ -887,7 +895,10 @@ void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBu continue; } // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; auto& interval = slice.second[0]; if (interval.hasValue) { mCurrentFullBucket[slice.first] += interval.value.long_value; } } for (const auto& slice : mCurrentFullBucket) { for (auto& tracker : mAnomalyTrackers) { Loading @@ -903,17 +914,23 @@ void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBu for (auto& tracker : mAnomalyTrackers) { if (tracker != nullptr) { // TODO: fix this when anomaly can accept double values tracker->addPastBucket(slice.first, slice.second[0].value.long_value, auto& interval = slice.second[0]; if (interval.hasValue) { tracker->addPastBucket(slice.first, interval.value.long_value, mCurrentBucketNum); } } } } } } else { // Accumulate partial bucket. for (const auto& slice : mCurrentSlicedBucket) { // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; auto& interval = slice.second[0]; if (interval.hasValue) { mCurrentFullBucket[slice.first] += interval.value.long_value; } } } } Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +1 −0 Original line number Diff line number Diff line Loading @@ -244,6 +244,7 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled); FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition); FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket); FRIEND_TEST(ValueMetricProducerTest, TestFullBucketResetWhenLastBucketInvalid); FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit); FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed); FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed); Loading
cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +31 −0 Original line number Diff line number Diff line Loading @@ -2500,6 +2500,37 @@ TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid) { EXPECT_EQ(0UL, valueProducer->mPastBuckets.size()); } TEST(ValueMetricProducerTest, TestFullBucketResetWhenLastBucketInvalid) { ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, Pull(tagId, _)) // Initialization. .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1)); return true; })) // notifyAppUpgrade. .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); data->push_back(ValueMetricProducerTestHelper::createEvent( bucketStartTimeNs + bucketSizeNs / 2, 10)); return true; })); sp<ValueMetricProducer> valueProducer = ValueMetricProducerTestHelper::createValueProducerNoConditions(pullerManager, metric); ASSERT_EQ(0UL, valueProducer->mCurrentFullBucket.size()); valueProducer->notifyAppUpgrade(bucketStartTimeNs + bucketSizeNs / 2, "com.foo", 10000, 1); ASSERT_EQ(1UL, valueProducer->mCurrentFullBucket.size()); vector<shared_ptr<LogEvent>> allData; allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket3StartTimeNs + 1, 4)); valueProducer->onDataPulled(allData, /** fails */ false, bucket3StartTimeNs + 1); ASSERT_EQ(0UL, valueProducer->mCurrentFullBucket.size()); } TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange) { ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); Loading