Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +20 −16 Original line number Diff line number Diff line Loading @@ -951,6 +951,11 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId); } VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, (int)mCurrentSlicedBucket.size()); int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); int64_t bucketEndTime = fullBucketEndTimeNs; int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); Loading @@ -959,20 +964,20 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, // to mark the current bucket as invalid. The last pull might have been successful through. invalidateCurrentBucketWithoutResetBase(eventTimeNs, BucketDropReason::MULTIPLE_BUCKETS_SKIPPED); // End the bucket at the next bucket start time so the entire interval is skipped. bucketEndTime = nextBucketStartTimeNs; } else if (eventTimeNs < fullBucketEndTimeNs) { bucketEndTime = eventTimeNs; } VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, (int)mCurrentSlicedBucket.size()); int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs; // Close the current bucket. int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime); bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; if (!isBucketLargeEnough) { skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL); } bool bucketHasData = false; if (!mCurrentBucketIsSkipped) { bool bucketHasData = false; // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second); Loading @@ -984,22 +989,22 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, bucketHasData = true; } } } if (!bucketHasData && !mCurrentBucketIsSkipped) { if (!bucketHasData) { skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA); } } if (mCurrentBucketIsSkipped) { mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs; // Fill in the gap if we skipped multiple buckets. mCurrentSkippedBucket.bucketEndTimeNs = numBucketsForward > 1 ? nextBucketStartTimeNs : bucketEndTime; mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime; mSkippedBuckets.emplace_back(mCurrentSkippedBucket); } // This means that the current bucket was not flushed before a forced bucket split. if (bucketEndTime < nextBucketStartTimeNs && numBucketsForward <= 1) { // This can happen if an app update or a dump report with include_current_partial_bucket is // requested before we get a chance to flush the bucket due to receiving new data, either from // the statsd socket or the StatsPullerManager. if (bucketEndTime < nextBucketStartTimeNs) { SkippedBucket bucketInGap; bucketInGap.bucketStartTimeNs = bucketEndTime; bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs; Loading @@ -1008,7 +1013,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, mSkippedBuckets.emplace_back(bucketInGap); } appendToFullBucket(eventTimeNs, fullBucketEndTimeNs); appendToFullBucket(eventTimeNs > fullBucketEndTimeNs); initCurrentSlicedBucket(nextBucketStartTimeNs); // Update the condition timer again, in case we skipped buckets. mConditionTimer.newBucketStart(nextBucketStartTimeNs); Loading Loading @@ -1074,8 +1079,7 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) (long long)mCurrentBucketStartTimeNs); } void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) { bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs; void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { if (mCurrentBucketIsSkipped) { if (isFullBucketReached) { // If the bucket is invalid, we ignore the full bucket since it contains invalid data. Loading cmds/statsd/src/metrics/ValueMetricProducer.h +6 −1 Original line number Diff line number Diff line Loading @@ -142,8 +142,10 @@ private: // Mark the data as invalid. void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); void invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs, const BucketDropReason reason); // Skips the current bucket without notifying StatsdStats of the skipped bucket. // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that // causes the bucket to be invalidated will not notify StatsdStats. Loading Loading @@ -209,6 +211,7 @@ private: // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const MetricDimensionKey& newKey); bool hasReachedGuardRailLimit() const; bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey); Loading @@ -220,8 +223,10 @@ private: ValueBucket buildPartialBucket(int64_t bucketEndTime, const std::vector<Interval>& intervals); void initCurrentSlicedBucket(int64_t nextBucketStartTimeNs); void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs); void appendToFullBucket(const bool isFullBucketReached); // Reset diff base and mHasGlobalBase void resetBase(); Loading cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +31 −14 Original line number Diff line number Diff line Loading @@ -3612,10 +3612,32 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBefor ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _, _)) // Condition change to true. .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool) { EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 10); data->clear(); data->push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 10, 10)); return true; })) // App Update. .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool) { EXPECT_EQ(eventTimeNs, bucket2StartTimeNs + 1000); data->clear(); data->push_back( CreateRepeatedValueLogEvent(tagId, bucket2StartTimeNs + 1000, 15)); return true; })); sp<ValueMetricProducer> valueProducer = ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); // Condition changed event int64_t conditionChangeTimeNs = bucketStartTimeNs + 10; valueProducer->onConditionChanged(true, conditionChangeTimeNs); // App update event. int64_t appUpdateTimeNs = bucket2StartTimeNs + 1000; valueProducer->notifyAppUpgrade(appUpdateTimeNs); Loading @@ -3629,28 +3651,23 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBefor StatsLogReport report = outputStreamToProto(&output); EXPECT_TRUE(report.has_value_metrics()); ASSERT_EQ(0, report.value_metrics().data_size()); ASSERT_EQ(2, report.value_metrics().skipped_size()); ASSERT_EQ(1, report.value_metrics().data_size()); ASSERT_EQ(1, report.value_metrics().skipped_size()); ASSERT_EQ(1, report.value_metrics().data(0).bucket_info_size()); auto data = report.value_metrics().data(0); ASSERT_EQ(0, data.bucket_info(0).bucket_num()); EXPECT_EQ(5, data.bucket_info(0).values(0).value_long()); EXPECT_EQ(NanoToMillis(bucketStartTimeNs), report.value_metrics().skipped(0).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), report.value_metrics().skipped(0).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), report.value_metrics().skipped(0).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size()); auto dropEvent = report.value_metrics().skipped(0).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), report.value_metrics().skipped(1).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), report.value_metrics().skipped(1).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(1).drop_event_size()); dropEvent = report.value_metrics().skipped(1).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); } /* Loading Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +20 −16 Original line number Diff line number Diff line Loading @@ -951,6 +951,11 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId); } VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, (int)mCurrentSlicedBucket.size()); int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); int64_t bucketEndTime = fullBucketEndTimeNs; int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); Loading @@ -959,20 +964,20 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, // to mark the current bucket as invalid. The last pull might have been successful through. invalidateCurrentBucketWithoutResetBase(eventTimeNs, BucketDropReason::MULTIPLE_BUCKETS_SKIPPED); // End the bucket at the next bucket start time so the entire interval is skipped. bucketEndTime = nextBucketStartTimeNs; } else if (eventTimeNs < fullBucketEndTimeNs) { bucketEndTime = eventTimeNs; } VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, (int)mCurrentSlicedBucket.size()); int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs; // Close the current bucket. int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime); bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; if (!isBucketLargeEnough) { skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL); } bool bucketHasData = false; if (!mCurrentBucketIsSkipped) { bool bucketHasData = false; // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second); Loading @@ -984,22 +989,22 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, bucketHasData = true; } } } if (!bucketHasData && !mCurrentBucketIsSkipped) { if (!bucketHasData) { skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA); } } if (mCurrentBucketIsSkipped) { mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs; // Fill in the gap if we skipped multiple buckets. mCurrentSkippedBucket.bucketEndTimeNs = numBucketsForward > 1 ? nextBucketStartTimeNs : bucketEndTime; mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime; mSkippedBuckets.emplace_back(mCurrentSkippedBucket); } // This means that the current bucket was not flushed before a forced bucket split. if (bucketEndTime < nextBucketStartTimeNs && numBucketsForward <= 1) { // This can happen if an app update or a dump report with include_current_partial_bucket is // requested before we get a chance to flush the bucket due to receiving new data, either from // the statsd socket or the StatsPullerManager. if (bucketEndTime < nextBucketStartTimeNs) { SkippedBucket bucketInGap; bucketInGap.bucketStartTimeNs = bucketEndTime; bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs; Loading @@ -1008,7 +1013,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, mSkippedBuckets.emplace_back(bucketInGap); } appendToFullBucket(eventTimeNs, fullBucketEndTimeNs); appendToFullBucket(eventTimeNs > fullBucketEndTimeNs); initCurrentSlicedBucket(nextBucketStartTimeNs); // Update the condition timer again, in case we skipped buckets. mConditionTimer.newBucketStart(nextBucketStartTimeNs); Loading Loading @@ -1074,8 +1079,7 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) (long long)mCurrentBucketStartTimeNs); } void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) { bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs; void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { if (mCurrentBucketIsSkipped) { if (isFullBucketReached) { // If the bucket is invalid, we ignore the full bucket since it contains invalid data. Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +6 −1 Original line number Diff line number Diff line Loading @@ -142,8 +142,10 @@ private: // Mark the data as invalid. void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); void invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs, const BucketDropReason reason); // Skips the current bucket without notifying StatsdStats of the skipped bucket. // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that // causes the bucket to be invalidated will not notify StatsdStats. Loading Loading @@ -209,6 +211,7 @@ private: // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const MetricDimensionKey& newKey); bool hasReachedGuardRailLimit() const; bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey); Loading @@ -220,8 +223,10 @@ private: ValueBucket buildPartialBucket(int64_t bucketEndTime, const std::vector<Interval>& intervals); void initCurrentSlicedBucket(int64_t nextBucketStartTimeNs); void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs); void appendToFullBucket(const bool isFullBucketReached); // Reset diff base and mHasGlobalBase void resetBase(); Loading
cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +31 −14 Original line number Diff line number Diff line Loading @@ -3612,10 +3612,32 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBefor ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _, _)) // Condition change to true. .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool) { EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 10); data->clear(); data->push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 10, 10)); return true; })) // App Update. .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool) { EXPECT_EQ(eventTimeNs, bucket2StartTimeNs + 1000); data->clear(); data->push_back( CreateRepeatedValueLogEvent(tagId, bucket2StartTimeNs + 1000, 15)); return true; })); sp<ValueMetricProducer> valueProducer = ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); // Condition changed event int64_t conditionChangeTimeNs = bucketStartTimeNs + 10; valueProducer->onConditionChanged(true, conditionChangeTimeNs); // App update event. int64_t appUpdateTimeNs = bucket2StartTimeNs + 1000; valueProducer->notifyAppUpgrade(appUpdateTimeNs); Loading @@ -3629,28 +3651,23 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBefor StatsLogReport report = outputStreamToProto(&output); EXPECT_TRUE(report.has_value_metrics()); ASSERT_EQ(0, report.value_metrics().data_size()); ASSERT_EQ(2, report.value_metrics().skipped_size()); ASSERT_EQ(1, report.value_metrics().data_size()); ASSERT_EQ(1, report.value_metrics().skipped_size()); ASSERT_EQ(1, report.value_metrics().data(0).bucket_info_size()); auto data = report.value_metrics().data(0); ASSERT_EQ(0, data.bucket_info(0).bucket_num()); EXPECT_EQ(5, data.bucket_info(0).values(0).value_long()); EXPECT_EQ(NanoToMillis(bucketStartTimeNs), report.value_metrics().skipped(0).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), report.value_metrics().skipped(0).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), report.value_metrics().skipped(0).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size()); auto dropEvent = report.value_metrics().skipped(0).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), report.value_metrics().skipped(1).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), report.value_metrics().skipped(1).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(1).drop_event_size()); dropEvent = report.value_metrics().skipped(1).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); } /* Loading