Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +37 −3 Original line number Diff line number Diff line Loading @@ -432,6 +432,26 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { return false; } bool ValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) { // ===========GuardRail============== // 1. Report the tuple count if the tuple count > soft limit if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) { return false; } if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) { size_t newTupleCount = mCurrentFullBucket.size() + 1; // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. if (newTupleCount > mDimensionHardLimit) { ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s", (long long)mMetricId, newKey.toString().c_str()); return true; } } return false; } bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) { for (const FieldValue& value : event.getValues()) { if (value.mField.matches(matcher)) { Loading Loading @@ -496,6 +516,7 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn VLOG("Failed to get value %d from event %s", i, event.ToString().c_str()); return; } interval.seenNewData = true; if (mUseDiff) { if (!interval.hasBase) { Loading Loading @@ -648,6 +669,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { // Accumulate partial buckets with current value and then send to anomaly tracker. if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { if (hitFullBucketGuardRailLocked(slice.first)) { continue; } // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; } Loading Loading @@ -679,11 +703,21 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { } } // Reset counters for (auto& slice : mCurrentSlicedBucket) { for (auto& interval : slice.second) { for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) { bool obsolete = true; for (auto& interval : it->second) { interval.hasValue = false; interval.sampleSize = 0; if (interval.seenNewData) { obsolete = false; } interval.seenNewData = false; } if (obsolete) { it = mCurrentSlicedBucket.erase(it); } else { it++; } } } Loading cmds/statsd/src/metrics/ValueMetricProducer.h +6 −1 Original line number Diff line number Diff line Loading @@ -128,7 +128,9 @@ private: int sampleSize; // If this dimension has any non-tainted value. If not, don't report the // dimension. bool hasValue; bool hasValue = false; // Whether new data is seen in the bucket. bool seenNewData = false; } Interval; std::unordered_map<MetricDimensionKey, std::vector<Interval>> mCurrentSlicedBucket; Loading @@ -146,6 +148,8 @@ private: // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const MetricDimensionKey& newKey); bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey); void pullAndMatchEventsLocked(const int64_t timestampNs); // Reset diff base and mHasGlobalBase Loading Loading @@ -202,6 +206,7 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey); }; } // namespace statsd Loading cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +125 −2 Original line number Diff line number Diff line Loading @@ -1469,7 +1469,7 @@ TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput) { } /* * Tests pulled atoms with no conditions * Tests zero default base. */ TEST(ValueMetricProducerTest, TestUseZeroDefaultBase) { ValueMetric metric; Loading Loading @@ -1554,7 +1554,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBase) { } /* * Tests pulled atoms with no conditions * Tests using zero default base with failed pull. */ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { ValueMetric metric; Loading Loading @@ -1681,6 +1681,129 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { EXPECT_EQ(2UL, valueProducer.mPastBuckets.size()); } /* * Tests trim unused dimension key if no new data is seen in an entire bucket. */ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.mutable_dimensions_in_what()->set_field(tagId); metric.mutable_dimensions_in_what()->add_child()->set_field(1); UidMap uidMap; SimpleAtomMatcher atomMatcher; atomMatcher.set_atom_id(tagId); sp<EventMatcherWizard> eventMatcherWizard = new EventMatcherWizard({new SimpleLogMatchingTracker( atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs); event->write(1); event->write(3); event->init(); data->push_back(event); return true; })); ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, logEventMatcherIndex, eventMatcherWizard, tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); auto iter = valueProducer.mCurrentSlicedBucket.begin(); auto& interval1 = iter->second[0]; EXPECT_EQ(1, iter->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(3, interval1.base.long_value); EXPECT_EQ(false, interval1.hasValue); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); vector<shared_ptr<LogEvent>> allData; allData.clear(); shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); event1->write(2); event1->write(4); event1->init(); shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); event2->write(1); event2->write(11); event2->init(); allData.push_back(event1); allData.push_back(event2); valueProducer.onDataPulled(allData); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(11, interval1.base.long_value); EXPECT_EQ(true, interval1.hasValue); EXPECT_EQ(8, interval1.value.long_value); EXPECT_TRUE(interval1.seenNewData); auto it = valueProducer.mCurrentSlicedBucket.begin(); for (; it != valueProducer.mCurrentSlicedBucket.end(); it++) { if (it != iter) { break; } } EXPECT_TRUE(it != iter); auto& interval2 = it->second[0]; EXPECT_EQ(2, it->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value); EXPECT_EQ(true, interval2.hasBase); EXPECT_EQ(4, interval2.base.long_value); EXPECT_EQ(false, interval2.hasValue); EXPECT_TRUE(interval2.seenNewData); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // next pull somehow did not happen, skip to end of bucket 3 allData.clear(); event1 = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1); event1->write(2); event1->write(5); event1->init(); allData.push_back(event1); valueProducer.onDataPulled(allData); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(false, interval1.hasBase); EXPECT_EQ(false, interval1.hasValue); EXPECT_EQ(8, interval1.value.long_value); // on probation now EXPECT_FALSE(interval1.seenNewData); EXPECT_EQ(true, interval2.hasBase); EXPECT_EQ(5, interval2.base.long_value); EXPECT_EQ(false, interval2.hasValue); // back to good status EXPECT_TRUE(interval2.seenNewData); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); allData.clear(); event1 = make_shared<LogEvent>(tagId, bucket5StartTimeNs + 1); event1->write(2); event1->write(13); event1->init(); allData.push_back(event1); valueProducer.onDataPulled(allData); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval2.hasBase); EXPECT_EQ(13, interval2.base.long_value); EXPECT_EQ(true, interval2.hasValue); EXPECT_EQ(8, interval2.value.long_value); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); } } // namespace statsd } // namespace os } // namespace android Loading Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +37 −3 Original line number Diff line number Diff line Loading @@ -432,6 +432,26 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { return false; } bool ValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) { // ===========GuardRail============== // 1. Report the tuple count if the tuple count > soft limit if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) { return false; } if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) { size_t newTupleCount = mCurrentFullBucket.size() + 1; // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. if (newTupleCount > mDimensionHardLimit) { ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s", (long long)mMetricId, newKey.toString().c_str()); return true; } } return false; } bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) { for (const FieldValue& value : event.getValues()) { if (value.mField.matches(matcher)) { Loading Loading @@ -496,6 +516,7 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn VLOG("Failed to get value %d from event %s", i, event.ToString().c_str()); return; } interval.seenNewData = true; if (mUseDiff) { if (!interval.hasBase) { Loading Loading @@ -648,6 +669,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { // Accumulate partial buckets with current value and then send to anomaly tracker. if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { if (hitFullBucketGuardRailLocked(slice.first)) { continue; } // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; } Loading Loading @@ -679,11 +703,21 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { } } // Reset counters for (auto& slice : mCurrentSlicedBucket) { for (auto& interval : slice.second) { for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) { bool obsolete = true; for (auto& interval : it->second) { interval.hasValue = false; interval.sampleSize = 0; if (interval.seenNewData) { obsolete = false; } interval.seenNewData = false; } if (obsolete) { it = mCurrentSlicedBucket.erase(it); } else { it++; } } } Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +6 −1 Original line number Diff line number Diff line Loading @@ -128,7 +128,9 @@ private: int sampleSize; // If this dimension has any non-tainted value. If not, don't report the // dimension. bool hasValue; bool hasValue = false; // Whether new data is seen in the bucket. bool seenNewData = false; } Interval; std::unordered_map<MetricDimensionKey, std::vector<Interval>> mCurrentSlicedBucket; Loading @@ -146,6 +148,8 @@ private: // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const MetricDimensionKey& newKey); bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey); void pullAndMatchEventsLocked(const int64_t timestampNs); // Reset diff base and mHasGlobalBase Loading Loading @@ -202,6 +206,7 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey); }; } // namespace statsd Loading
cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +125 −2 Original line number Diff line number Diff line Loading @@ -1469,7 +1469,7 @@ TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput) { } /* * Tests pulled atoms with no conditions * Tests zero default base. */ TEST(ValueMetricProducerTest, TestUseZeroDefaultBase) { ValueMetric metric; Loading Loading @@ -1554,7 +1554,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBase) { } /* * Tests pulled atoms with no conditions * Tests using zero default base with failed pull. */ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { ValueMetric metric; Loading Loading @@ -1681,6 +1681,129 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { EXPECT_EQ(2UL, valueProducer.mPastBuckets.size()); } /* * Tests trim unused dimension key if no new data is seen in an entire bucket. */ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.mutable_dimensions_in_what()->set_field(tagId); metric.mutable_dimensions_in_what()->add_child()->set_field(1); UidMap uidMap; SimpleAtomMatcher atomMatcher; atomMatcher.set_atom_id(tagId); sp<EventMatcherWizard> eventMatcherWizard = new EventMatcherWizard({new SimpleLogMatchingTracker( atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs); event->write(1); event->write(3); event->init(); data->push_back(event); return true; })); ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, logEventMatcherIndex, eventMatcherWizard, tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); auto iter = valueProducer.mCurrentSlicedBucket.begin(); auto& interval1 = iter->second[0]; EXPECT_EQ(1, iter->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(3, interval1.base.long_value); EXPECT_EQ(false, interval1.hasValue); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); vector<shared_ptr<LogEvent>> allData; allData.clear(); shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); event1->write(2); event1->write(4); event1->init(); shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); event2->write(1); event2->write(11); event2->init(); allData.push_back(event1); allData.push_back(event2); valueProducer.onDataPulled(allData); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(11, interval1.base.long_value); EXPECT_EQ(true, interval1.hasValue); EXPECT_EQ(8, interval1.value.long_value); EXPECT_TRUE(interval1.seenNewData); auto it = valueProducer.mCurrentSlicedBucket.begin(); for (; it != valueProducer.mCurrentSlicedBucket.end(); it++) { if (it != iter) { break; } } EXPECT_TRUE(it != iter); auto& interval2 = it->second[0]; EXPECT_EQ(2, it->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value); EXPECT_EQ(true, interval2.hasBase); EXPECT_EQ(4, interval2.base.long_value); EXPECT_EQ(false, interval2.hasValue); EXPECT_TRUE(interval2.seenNewData); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // next pull somehow did not happen, skip to end of bucket 3 allData.clear(); event1 = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1); event1->write(2); event1->write(5); event1->init(); allData.push_back(event1); valueProducer.onDataPulled(allData); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(false, interval1.hasBase); EXPECT_EQ(false, interval1.hasValue); EXPECT_EQ(8, interval1.value.long_value); // on probation now EXPECT_FALSE(interval1.seenNewData); EXPECT_EQ(true, interval2.hasBase); EXPECT_EQ(5, interval2.base.long_value); EXPECT_EQ(false, interval2.hasValue); // back to good status EXPECT_TRUE(interval2.seenNewData); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); allData.clear(); event1 = make_shared<LogEvent>(tagId, bucket5StartTimeNs + 1); event1->write(2); event1->write(13); event1->init(); allData.push_back(event1); valueProducer.onDataPulled(allData); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval2.hasBase); EXPECT_EQ(13, interval2.base.long_value); EXPECT_EQ(true, interval2.hasValue); EXPECT_EQ(8, interval2.value.long_value); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); } } // namespace statsd } // namespace os } // namespace android Loading