Loading cmds/statsd/src/metrics/MetricProducer.h +3 −1 Original line number Diff line number Diff line Loading @@ -82,7 +82,9 @@ enum BucketDropReason { DIMENSION_GUARDRAIL_REACHED = 6, MULTIPLE_BUCKETS_SKIPPED = 7, // Not an invalid bucket case, but the bucket is dropped. BUCKET_TOO_SMALL = 8 BUCKET_TOO_SMALL = 8, // Not an invalid bucket case, but the bucket is skipped. NO_DATA = 9 }; struct Activation { Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +38 −16 Original line number Diff line number Diff line Loading @@ -108,7 +108,7 @@ ValueMetricProducer::ValueMetricProducer( mSkipZeroDiffOutput(metric.skip_zero_diff_output()), mUseZeroDefaultBase(metric.use_zero_default_base()), mHasGlobalBase(false), mCurrentBucketIsInvalid(false), mCurrentBucketIsSkipped(false), mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC : StatsdStats::kPullMaxDelayNs), mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()), Loading Loading @@ -383,15 +383,12 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs, const BucketDropReason reason) { if (!mCurrentBucketIsInvalid) { if (!mCurrentBucketIsSkipped) { // Only report to StatsdStats once per invalid bucket. StatsdStats::getInstance().noteInvalidatedBucket(mMetricId); } if (!maxDropEventsReached()) { mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason)); } mCurrentBucketIsInvalid = true; skipCurrentBucket(dropTimeNs, reason); } void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs, Loading @@ -400,6 +397,14 @@ void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs, resetBase(); } void ValueMetricProducer::skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason) { if (!maxDropEventsReached()) { mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason)); } mCurrentBucketIsSkipped = true; } void ValueMetricProducer::resetBase() { for (auto& slice : mCurrentBaseInfo) { for (auto& baseInfo : slice.second) { Loading Loading @@ -961,12 +966,10 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime); bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; if (!isBucketLargeEnough) { if (!maxDropEventsReached()) { mCurrentSkippedBucket.dropEvents.emplace_back( buildDropEvent(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL)); } skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL); } if (isBucketLargeEnough && !mCurrentBucketIsInvalid) { bool bucketHasData = false; if (!mCurrentBucketIsSkipped) { // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second); Loading @@ -975,14 +978,33 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, if (bucket.valueIndex.size() > 0) { auto& bucketList = mPastBuckets[slice.first]; bucketList.push_back(bucket); bucketHasData = true; } } } else { } if (!bucketHasData && !mCurrentBucketIsSkipped) { skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA); } if (mCurrentBucketIsSkipped) { mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs; mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime; // Fill in the gap if we skipped multiple buckets. mCurrentSkippedBucket.bucketEndTimeNs = numBucketsForward > 1 ? nextBucketStartTimeNs : bucketEndTime; mSkippedBuckets.emplace_back(mCurrentSkippedBucket); } // This means that the current bucket was not flushed before a forced bucket split. if (bucketEndTime < nextBucketStartTimeNs && numBucketsForward <= 1) { SkippedBucket bucketInGap; bucketInGap.bucketStartTimeNs = bucketEndTime; bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs; bucketInGap.dropEvents.emplace_back( buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA)); mSkippedBuckets.emplace_back(bucketInGap); } appendToFullBucket(eventTimeNs, fullBucketEndTimeNs); initCurrentSlicedBucket(nextBucketStartTimeNs); // Update the condition timer again, in case we skipped buckets. Loading Loading @@ -1036,13 +1058,13 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) // TODO: remove mCurrentBaseInfo entries when obsolete } mCurrentBucketIsInvalid = false; mCurrentBucketIsSkipped = false; mCurrentSkippedBucket.reset(); // If we do not have a global base when the condition is true, // we will have incomplete bucket for the next bucket. if (mUseDiff && !mHasGlobalBase && mCondition) { mCurrentBucketIsInvalid = false; mCurrentBucketIsSkipped = false; } mCurrentBucketStartTimeNs = nextBucketStartTimeNs; VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, Loading @@ -1051,7 +1073,7 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) { bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs; if (mCurrentBucketIsInvalid) { if (mCurrentBucketIsSkipped) { if (isFullBucketReached) { // If the bucket is invalid, we ignore the full bucket since it contains invalid data. mCurrentFullBucket.clear(); Loading cmds/statsd/src/metrics/ValueMetricProducer.h +7 −5 Original line number Diff line number Diff line Loading @@ -144,6 +144,10 @@ private: void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); void invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs, const BucketDropReason reason); // Skips the current bucket without notifying StatsdStats of the skipped bucket. // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that // causes the bucket to be invalidated will not notify StatsdStats. void skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); const int mWhatMatcherIndex; Loading Loading @@ -250,11 +254,9 @@ private: // diff against. bool mHasGlobalBase; // Invalid bucket. There was a problem in collecting data in the current bucket so we cannot // trust any of the data in this bucket. // // For instance, one pull failed. bool mCurrentBucketIsInvalid; // This is to track whether or not the bucket is skipped for any of the reasons listed in // BucketDropReason, many of which make the bucket potentially invalid. bool mCurrentBucketIsSkipped; const int64_t mMaxPullDelayNs; Loading cmds/statsd/src/stats_log.proto +2 −0 Original line number Diff line number Diff line Loading @@ -212,6 +212,8 @@ message StatsLogReport { MULTIPLE_BUCKETS_SKIPPED = 7; // Not an invalid bucket case, but the bucket is dropped. BUCKET_TOO_SMALL = 8; // Not an invalid bucket case, but the bucket is skipped. NO_DATA = 9; }; message DropEvent { Loading cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +120 −10 Original line number Diff line number Diff line Loading @@ -1115,13 +1115,21 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { EXPECT_EQ(false, curInterval.hasValue); assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {12}, {bucketSizeNs}, {bucket2StartTimeNs}, {bucket3StartTimeNs}); // The 1st bucket is dropped because of no data // The 3rd bucket is dropped due to multiple buckets being skipped. ASSERT_EQ(1, valueProducer->mSkippedBuckets.size()); EXPECT_EQ(bucket3StartTimeNs, valueProducer->mSkippedBuckets[0].bucketStartTimeNs); EXPECT_EQ(bucket4StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs); ASSERT_EQ(2, valueProducer->mSkippedBuckets.size()); EXPECT_EQ(bucketStartTimeNs, valueProducer->mSkippedBuckets[0].bucketStartTimeNs); EXPECT_EQ(bucket2StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs); ASSERT_EQ(1, valueProducer->mSkippedBuckets[0].dropEvents.size()); EXPECT_EQ(MULTIPLE_BUCKETS_SKIPPED, valueProducer->mSkippedBuckets[0].dropEvents[0].reason); EXPECT_EQ(bucket6StartTimeNs, valueProducer->mSkippedBuckets[0].dropEvents[0].dropTimeNs); EXPECT_EQ(NO_DATA, valueProducer->mSkippedBuckets[0].dropEvents[0].reason); EXPECT_EQ(bucket2StartTimeNs, valueProducer->mSkippedBuckets[0].dropEvents[0].dropTimeNs); EXPECT_EQ(bucket3StartTimeNs, valueProducer->mSkippedBuckets[1].bucketStartTimeNs); EXPECT_EQ(bucket6StartTimeNs, valueProducer->mSkippedBuckets[1].bucketEndTimeNs); ASSERT_EQ(1, valueProducer->mSkippedBuckets[1].dropEvents.size()); EXPECT_EQ(MULTIPLE_BUCKETS_SKIPPED, valueProducer->mSkippedBuckets[1].dropEvents[0].reason); EXPECT_EQ(bucket6StartTimeNs, valueProducer->mSkippedBuckets[1].dropEvents[0].dropTimeNs); } /* Loading Loading @@ -2214,7 +2222,7 @@ TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenGuardRailHit) { valueProducer->mCondition = ConditionState::kFalse; valueProducer->onConditionChanged(true, bucketStartTimeNs + 2); EXPECT_EQ(true, valueProducer->mCurrentBucketIsInvalid); EXPECT_EQ(true, valueProducer->mCurrentBucketIsSkipped); ASSERT_EQ(0UL, valueProducer->mCurrentSlicedBucket.size()); ASSERT_EQ(0UL, valueProducer->mSkippedBuckets.size()); Loading Loading @@ -2629,13 +2637,17 @@ TEST_P(ValueMetricProducerTest_PartialBucket, TestFullBucketResetWhenLastBucketI vector<shared_ptr<LogEvent>> allData; allData.push_back(CreateRepeatedValueLogEvent(tagId, bucket3StartTimeNs + 1, 4)); // Pull fails and arrives late. valueProducer->onDataPulled(allData, /** fails */ false, bucket3StartTimeNs + 1); assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {9}, {partialBucketSplitTimeNs - bucketStartTimeNs}, {bucketStartTimeNs}, {partialBucketSplitTimeNs}); ASSERT_EQ(1, valueProducer->mSkippedBuckets.size()); ASSERT_EQ(2, valueProducer->mSkippedBuckets[0].dropEvents.size()); EXPECT_EQ(PULL_FAILED, valueProducer->mSkippedBuckets[0].dropEvents[0].reason); EXPECT_EQ(MULTIPLE_BUCKETS_SKIPPED, valueProducer->mSkippedBuckets[0].dropEvents[1].reason); EXPECT_EQ(partialBucketSplitTimeNs, valueProducer->mSkippedBuckets[0].bucketStartTimeNs); EXPECT_EQ(bucket2StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs); EXPECT_EQ(bucket3StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs); ASSERT_EQ(0UL, valueProducer->mCurrentFullBucket.size()); } Loading Loading @@ -3464,26 +3476,41 @@ TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenMultipleBucketsSki // Condition change event that skips forward by three buckets. valueProducer->onConditionChanged(false, bucket4StartTimeNs + 10); int64_t dumpTimeNs = bucket4StartTimeNs + 1000; // Check dump report. ProtoOutputStream output; std::set<string> strSet; valueProducer->onDumpReport(bucket4StartTimeNs + 1000, true /* include recent buckets */, true, valueProducer->onDumpReport(dumpTimeNs, true /* include current buckets */, true, NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output); StatsLogReport report = outputStreamToProto(&output); EXPECT_TRUE(report.has_value_metrics()); ASSERT_EQ(0, report.value_metrics().data_size()); ASSERT_EQ(1, report.value_metrics().skipped_size()); ASSERT_EQ(2, report.value_metrics().skipped_size()); EXPECT_EQ(NanoToMillis(bucketStartTimeNs), report.value_metrics().skipped(0).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), EXPECT_EQ(NanoToMillis(bucket4StartTimeNs), report.value_metrics().skipped(0).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size()); auto dropEvent = report.value_metrics().skipped(0).drop_event(0); EXPECT_EQ(BucketDropReason::MULTIPLE_BUCKETS_SKIPPED, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(bucket4StartTimeNs + 10), dropEvent.drop_time_millis()); // This bucket is skipped because a dumpReport with include current buckets is called. // This creates a new bucket from bucket4StartTimeNs to dumpTimeNs in which we have no data // since the condition is false for the entire bucket interval. EXPECT_EQ(NanoToMillis(bucket4StartTimeNs), report.value_metrics().skipped(1).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(dumpTimeNs), report.value_metrics().skipped(1).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(1).drop_event_size()); dropEvent = report.value_metrics().skipped(1).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(dumpTimeNs), dropEvent.drop_time_millis()); } /* Loading Loading @@ -3543,6 +3570,89 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenBucketTooSmall) { EXPECT_EQ(NanoToMillis(dumpReportTimeNs), dropEvent.drop_time_millis()); } /* * Test that NO_DATA dump reason is logged when a flushed bucket contains no data. */ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenDataUnavailable) { ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); sp<ValueMetricProducer> valueProducer = ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); // Check dump report. ProtoOutputStream output; std::set<string> strSet; int64_t dumpReportTimeNs = bucketStartTimeNs + 10000000000; // 10 seconds valueProducer->onDumpReport(dumpReportTimeNs, true /* include current bucket */, true, NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output); StatsLogReport report = outputStreamToProto(&output); EXPECT_TRUE(report.has_value_metrics()); ASSERT_EQ(0, report.value_metrics().data_size()); ASSERT_EQ(1, report.value_metrics().skipped_size()); EXPECT_EQ(NanoToMillis(bucketStartTimeNs), report.value_metrics().skipped(0).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(dumpReportTimeNs), report.value_metrics().skipped(0).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size()); auto dropEvent = report.value_metrics().skipped(0).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(dumpReportTimeNs), dropEvent.drop_time_millis()); } /* * Test that a skipped bucket is logged when a forced bucket split occurs when the previous bucket * was not flushed in time. */ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBeforeBucketFlush) { ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); sp<ValueMetricProducer> valueProducer = ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); // App update event. int64_t appUpdateTimeNs = bucket2StartTimeNs + 1000; valueProducer->notifyAppUpgrade(appUpdateTimeNs); // Check dump report. ProtoOutputStream output; std::set<string> strSet; int64_t dumpReportTimeNs = bucket2StartTimeNs + 10000000000; // 10 seconds valueProducer->onDumpReport(dumpReportTimeNs, false /* include current buckets */, true, NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output); StatsLogReport report = outputStreamToProto(&output); EXPECT_TRUE(report.has_value_metrics()); ASSERT_EQ(0, report.value_metrics().data_size()); ASSERT_EQ(2, report.value_metrics().skipped_size()); EXPECT_EQ(NanoToMillis(bucketStartTimeNs), report.value_metrics().skipped(0).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), report.value_metrics().skipped(0).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size()); auto dropEvent = report.value_metrics().skipped(0).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), report.value_metrics().skipped(1).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), report.value_metrics().skipped(1).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(1).drop_event_size()); dropEvent = report.value_metrics().skipped(1).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); } /* * Test multiple bucket drop events in the same bucket. */ Loading Loading
cmds/statsd/src/metrics/MetricProducer.h +3 −1 Original line number Diff line number Diff line Loading @@ -82,7 +82,9 @@ enum BucketDropReason { DIMENSION_GUARDRAIL_REACHED = 6, MULTIPLE_BUCKETS_SKIPPED = 7, // Not an invalid bucket case, but the bucket is dropped. BUCKET_TOO_SMALL = 8 BUCKET_TOO_SMALL = 8, // Not an invalid bucket case, but the bucket is skipped. NO_DATA = 9 }; struct Activation { Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +38 −16 Original line number Diff line number Diff line Loading @@ -108,7 +108,7 @@ ValueMetricProducer::ValueMetricProducer( mSkipZeroDiffOutput(metric.skip_zero_diff_output()), mUseZeroDefaultBase(metric.use_zero_default_base()), mHasGlobalBase(false), mCurrentBucketIsInvalid(false), mCurrentBucketIsSkipped(false), mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC : StatsdStats::kPullMaxDelayNs), mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()), Loading Loading @@ -383,15 +383,12 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs, const BucketDropReason reason) { if (!mCurrentBucketIsInvalid) { if (!mCurrentBucketIsSkipped) { // Only report to StatsdStats once per invalid bucket. StatsdStats::getInstance().noteInvalidatedBucket(mMetricId); } if (!maxDropEventsReached()) { mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason)); } mCurrentBucketIsInvalid = true; skipCurrentBucket(dropTimeNs, reason); } void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs, Loading @@ -400,6 +397,14 @@ void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs, resetBase(); } void ValueMetricProducer::skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason) { if (!maxDropEventsReached()) { mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason)); } mCurrentBucketIsSkipped = true; } void ValueMetricProducer::resetBase() { for (auto& slice : mCurrentBaseInfo) { for (auto& baseInfo : slice.second) { Loading Loading @@ -961,12 +966,10 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime); bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; if (!isBucketLargeEnough) { if (!maxDropEventsReached()) { mCurrentSkippedBucket.dropEvents.emplace_back( buildDropEvent(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL)); } skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL); } if (isBucketLargeEnough && !mCurrentBucketIsInvalid) { bool bucketHasData = false; if (!mCurrentBucketIsSkipped) { // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second); Loading @@ -975,14 +978,33 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, if (bucket.valueIndex.size() > 0) { auto& bucketList = mPastBuckets[slice.first]; bucketList.push_back(bucket); bucketHasData = true; } } } else { } if (!bucketHasData && !mCurrentBucketIsSkipped) { skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA); } if (mCurrentBucketIsSkipped) { mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs; mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime; // Fill in the gap if we skipped multiple buckets. mCurrentSkippedBucket.bucketEndTimeNs = numBucketsForward > 1 ? nextBucketStartTimeNs : bucketEndTime; mSkippedBuckets.emplace_back(mCurrentSkippedBucket); } // This means that the current bucket was not flushed before a forced bucket split. if (bucketEndTime < nextBucketStartTimeNs && numBucketsForward <= 1) { SkippedBucket bucketInGap; bucketInGap.bucketStartTimeNs = bucketEndTime; bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs; bucketInGap.dropEvents.emplace_back( buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA)); mSkippedBuckets.emplace_back(bucketInGap); } appendToFullBucket(eventTimeNs, fullBucketEndTimeNs); initCurrentSlicedBucket(nextBucketStartTimeNs); // Update the condition timer again, in case we skipped buckets. Loading Loading @@ -1036,13 +1058,13 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) // TODO: remove mCurrentBaseInfo entries when obsolete } mCurrentBucketIsInvalid = false; mCurrentBucketIsSkipped = false; mCurrentSkippedBucket.reset(); // If we do not have a global base when the condition is true, // we will have incomplete bucket for the next bucket. if (mUseDiff && !mHasGlobalBase && mCondition) { mCurrentBucketIsInvalid = false; mCurrentBucketIsSkipped = false; } mCurrentBucketStartTimeNs = nextBucketStartTimeNs; VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, Loading @@ -1051,7 +1073,7 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) { bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs; if (mCurrentBucketIsInvalid) { if (mCurrentBucketIsSkipped) { if (isFullBucketReached) { // If the bucket is invalid, we ignore the full bucket since it contains invalid data. mCurrentFullBucket.clear(); Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +7 −5 Original line number Diff line number Diff line Loading @@ -144,6 +144,10 @@ private: void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); void invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs, const BucketDropReason reason); // Skips the current bucket without notifying StatsdStats of the skipped bucket. // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that // causes the bucket to be invalidated will not notify StatsdStats. void skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); const int mWhatMatcherIndex; Loading Loading @@ -250,11 +254,9 @@ private: // diff against. bool mHasGlobalBase; // Invalid bucket. There was a problem in collecting data in the current bucket so we cannot // trust any of the data in this bucket. // // For instance, one pull failed. bool mCurrentBucketIsInvalid; // This is to track whether or not the bucket is skipped for any of the reasons listed in // BucketDropReason, many of which make the bucket potentially invalid. bool mCurrentBucketIsSkipped; const int64_t mMaxPullDelayNs; Loading
cmds/statsd/src/stats_log.proto +2 −0 Original line number Diff line number Diff line Loading @@ -212,6 +212,8 @@ message StatsLogReport { MULTIPLE_BUCKETS_SKIPPED = 7; // Not an invalid bucket case, but the bucket is dropped. BUCKET_TOO_SMALL = 8; // Not an invalid bucket case, but the bucket is skipped. NO_DATA = 9; }; message DropEvent { Loading
cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +120 −10 Original line number Diff line number Diff line Loading @@ -1115,13 +1115,21 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { EXPECT_EQ(false, curInterval.hasValue); assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {12}, {bucketSizeNs}, {bucket2StartTimeNs}, {bucket3StartTimeNs}); // The 1st bucket is dropped because of no data // The 3rd bucket is dropped due to multiple buckets being skipped. ASSERT_EQ(1, valueProducer->mSkippedBuckets.size()); EXPECT_EQ(bucket3StartTimeNs, valueProducer->mSkippedBuckets[0].bucketStartTimeNs); EXPECT_EQ(bucket4StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs); ASSERT_EQ(2, valueProducer->mSkippedBuckets.size()); EXPECT_EQ(bucketStartTimeNs, valueProducer->mSkippedBuckets[0].bucketStartTimeNs); EXPECT_EQ(bucket2StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs); ASSERT_EQ(1, valueProducer->mSkippedBuckets[0].dropEvents.size()); EXPECT_EQ(MULTIPLE_BUCKETS_SKIPPED, valueProducer->mSkippedBuckets[0].dropEvents[0].reason); EXPECT_EQ(bucket6StartTimeNs, valueProducer->mSkippedBuckets[0].dropEvents[0].dropTimeNs); EXPECT_EQ(NO_DATA, valueProducer->mSkippedBuckets[0].dropEvents[0].reason); EXPECT_EQ(bucket2StartTimeNs, valueProducer->mSkippedBuckets[0].dropEvents[0].dropTimeNs); EXPECT_EQ(bucket3StartTimeNs, valueProducer->mSkippedBuckets[1].bucketStartTimeNs); EXPECT_EQ(bucket6StartTimeNs, valueProducer->mSkippedBuckets[1].bucketEndTimeNs); ASSERT_EQ(1, valueProducer->mSkippedBuckets[1].dropEvents.size()); EXPECT_EQ(MULTIPLE_BUCKETS_SKIPPED, valueProducer->mSkippedBuckets[1].dropEvents[0].reason); EXPECT_EQ(bucket6StartTimeNs, valueProducer->mSkippedBuckets[1].dropEvents[0].dropTimeNs); } /* Loading Loading @@ -2214,7 +2222,7 @@ TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenGuardRailHit) { valueProducer->mCondition = ConditionState::kFalse; valueProducer->onConditionChanged(true, bucketStartTimeNs + 2); EXPECT_EQ(true, valueProducer->mCurrentBucketIsInvalid); EXPECT_EQ(true, valueProducer->mCurrentBucketIsSkipped); ASSERT_EQ(0UL, valueProducer->mCurrentSlicedBucket.size()); ASSERT_EQ(0UL, valueProducer->mSkippedBuckets.size()); Loading Loading @@ -2629,13 +2637,17 @@ TEST_P(ValueMetricProducerTest_PartialBucket, TestFullBucketResetWhenLastBucketI vector<shared_ptr<LogEvent>> allData; allData.push_back(CreateRepeatedValueLogEvent(tagId, bucket3StartTimeNs + 1, 4)); // Pull fails and arrives late. valueProducer->onDataPulled(allData, /** fails */ false, bucket3StartTimeNs + 1); assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {9}, {partialBucketSplitTimeNs - bucketStartTimeNs}, {bucketStartTimeNs}, {partialBucketSplitTimeNs}); ASSERT_EQ(1, valueProducer->mSkippedBuckets.size()); ASSERT_EQ(2, valueProducer->mSkippedBuckets[0].dropEvents.size()); EXPECT_EQ(PULL_FAILED, valueProducer->mSkippedBuckets[0].dropEvents[0].reason); EXPECT_EQ(MULTIPLE_BUCKETS_SKIPPED, valueProducer->mSkippedBuckets[0].dropEvents[1].reason); EXPECT_EQ(partialBucketSplitTimeNs, valueProducer->mSkippedBuckets[0].bucketStartTimeNs); EXPECT_EQ(bucket2StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs); EXPECT_EQ(bucket3StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs); ASSERT_EQ(0UL, valueProducer->mCurrentFullBucket.size()); } Loading Loading @@ -3464,26 +3476,41 @@ TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenMultipleBucketsSki // Condition change event that skips forward by three buckets. valueProducer->onConditionChanged(false, bucket4StartTimeNs + 10); int64_t dumpTimeNs = bucket4StartTimeNs + 1000; // Check dump report. ProtoOutputStream output; std::set<string> strSet; valueProducer->onDumpReport(bucket4StartTimeNs + 1000, true /* include recent buckets */, true, valueProducer->onDumpReport(dumpTimeNs, true /* include current buckets */, true, NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output); StatsLogReport report = outputStreamToProto(&output); EXPECT_TRUE(report.has_value_metrics()); ASSERT_EQ(0, report.value_metrics().data_size()); ASSERT_EQ(1, report.value_metrics().skipped_size()); ASSERT_EQ(2, report.value_metrics().skipped_size()); EXPECT_EQ(NanoToMillis(bucketStartTimeNs), report.value_metrics().skipped(0).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), EXPECT_EQ(NanoToMillis(bucket4StartTimeNs), report.value_metrics().skipped(0).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size()); auto dropEvent = report.value_metrics().skipped(0).drop_event(0); EXPECT_EQ(BucketDropReason::MULTIPLE_BUCKETS_SKIPPED, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(bucket4StartTimeNs + 10), dropEvent.drop_time_millis()); // This bucket is skipped because a dumpReport with include current buckets is called. // This creates a new bucket from bucket4StartTimeNs to dumpTimeNs in which we have no data // since the condition is false for the entire bucket interval. EXPECT_EQ(NanoToMillis(bucket4StartTimeNs), report.value_metrics().skipped(1).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(dumpTimeNs), report.value_metrics().skipped(1).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(1).drop_event_size()); dropEvent = report.value_metrics().skipped(1).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(dumpTimeNs), dropEvent.drop_time_millis()); } /* Loading Loading @@ -3543,6 +3570,89 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenBucketTooSmall) { EXPECT_EQ(NanoToMillis(dumpReportTimeNs), dropEvent.drop_time_millis()); } /* * Test that NO_DATA dump reason is logged when a flushed bucket contains no data. */ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenDataUnavailable) { ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); sp<ValueMetricProducer> valueProducer = ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); // Check dump report. ProtoOutputStream output; std::set<string> strSet; int64_t dumpReportTimeNs = bucketStartTimeNs + 10000000000; // 10 seconds valueProducer->onDumpReport(dumpReportTimeNs, true /* include current bucket */, true, NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output); StatsLogReport report = outputStreamToProto(&output); EXPECT_TRUE(report.has_value_metrics()); ASSERT_EQ(0, report.value_metrics().data_size()); ASSERT_EQ(1, report.value_metrics().skipped_size()); EXPECT_EQ(NanoToMillis(bucketStartTimeNs), report.value_metrics().skipped(0).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(dumpReportTimeNs), report.value_metrics().skipped(0).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size()); auto dropEvent = report.value_metrics().skipped(0).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(dumpReportTimeNs), dropEvent.drop_time_millis()); } /* * Test that a skipped bucket is logged when a forced bucket split occurs when the previous bucket * was not flushed in time. */ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBeforeBucketFlush) { ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); sp<ValueMetricProducer> valueProducer = ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); // App update event. int64_t appUpdateTimeNs = bucket2StartTimeNs + 1000; valueProducer->notifyAppUpgrade(appUpdateTimeNs); // Check dump report. ProtoOutputStream output; std::set<string> strSet; int64_t dumpReportTimeNs = bucket2StartTimeNs + 10000000000; // 10 seconds valueProducer->onDumpReport(dumpReportTimeNs, false /* include current buckets */, true, NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output); StatsLogReport report = outputStreamToProto(&output); EXPECT_TRUE(report.has_value_metrics()); ASSERT_EQ(0, report.value_metrics().data_size()); ASSERT_EQ(2, report.value_metrics().skipped_size()); EXPECT_EQ(NanoToMillis(bucketStartTimeNs), report.value_metrics().skipped(0).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), report.value_metrics().skipped(0).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size()); auto dropEvent = report.value_metrics().skipped(0).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), report.value_metrics().skipped(1).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), report.value_metrics().skipped(1).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(1).drop_event_size()); dropEvent = report.value_metrics().skipped(1).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); } /* * Test multiple bucket drop events in the same bucket. */ Loading