Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +165 −134 Original line number Diff line number Diff line Loading @@ -64,8 +64,10 @@ const int FIELD_ID_BUCKET_INFO = 3; const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5; // for ValueBucketInfo const int FIELD_ID_VALUE_LONG = 7; const int FIELD_ID_VALUE_DOUBLE = 8; const int FIELD_ID_VALUE_INDEX = 1; const int FIELD_ID_VALUE_LONG = 2; const int FIELD_ID_VALUE_DOUBLE = 3; const int FIELD_ID_VALUES = 9; const int FIELD_ID_BUCKET_NUM = 4; const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; Loading @@ -78,7 +80,6 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric const sp<StatsPullerManager>& pullerManager) : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard), mPullerManager(pullerManager), mValueField(metric.value_field()), mPullTagId(pullTagId), mIsPulled(pullTagId != -1), mMinBucketSizeNs(metric.min_bucket_size_nanos()), Loading @@ -103,6 +104,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric } mBucketSizeNs = bucketSizeMills * 1000000; translateFieldMatcher(metric.value_field(), &mFieldMatchers); if (metric.has_dimensions_in_what()) { translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat); mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); Loading @@ -122,9 +126,6 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric } } if (mValueField.child_size() > 0) { mField = mValueField.child(0).field(); } mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0); mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) || HasPositionALL(metric.dimensions_in_condition()); Loading Loading @@ -259,18 +260,27 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } if (bucket.value.getType() == LONG) { for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) { int index = bucket.valueIndex[i]; const Value& value = bucket.values[i]; uint64_t valueToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES); protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, index); if (value.getType() == LONG) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)bucket.value.long_value); VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.value.long_value); } else if (bucket.value.getType() == DOUBLE) { (long long)value.long_value); VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, index, (long long)value.long_value); } else if (value.getType() == DOUBLE) { protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.value.double_value); VLOG("\t bucket [%lld - %lld] count: %.2f", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, bucket.value.double_value); value.double_value); VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, index, value.double_value); } else { VLOG("Wrong value type for ValueMetric output: %d", bucket.value.getType()); VLOG("Wrong value type for ValueMetric output: %d", value.getType()); } protoOutput->end(valueToken); } protoOutput->end(bucketInfoToken); } Loading Loading @@ -303,7 +313,9 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, // when condition change from true to false, clear diff base if (mUseDiff && mCondition && !condition) { for (auto& slice : mCurrentSlicedBucket) { slice.second.hasBase = false; for (auto& interval : slice.second) { interval.hasBase = false; } } } Loading Loading @@ -363,10 +375,12 @@ void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { (unsigned long)mCurrentSlicedBucket.size()); if (verbose) { for (const auto& it : mCurrentSlicedBucket) { for (const auto& interval : it.second) { fprintf(out, "\t(what)%s\t(condition)%s (value)%s\n", it.first.getDimensionKeyInWhat().toString().c_str(), it.first.getDimensionKeyInCondition().toString().c_str(), it.second.value.toString().c_str()); interval.value.toString().c_str()); } } } } Loading @@ -391,25 +405,29 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { return false; } const Value getDoubleOrLong(const Value& value) { Value v; switch (value.type) { bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) { for (const FieldValue& value : event.getValues()) { if (value.mField.matches(matcher)) { switch (value.mValue.type) { case INT: v.setLong(value.int_value); ret.setLong(value.mValue.int_value); break; case LONG: v.setLong(value.long_value); ret.setLong(value.mValue.long_value); break; case FLOAT: v.setDouble(value.float_value); ret.setDouble(value.mValue.float_value); break; case DOUBLE: v.setDouble(value.double_value); ret.setDouble(value.mValue.double_value); break; default: break; } return v; return true; } } return false; } void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex, Loading @@ -436,14 +454,21 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn if (hitGuardRailLocked(eventKey)) { return; } Interval& interval = mCurrentSlicedBucket[eventKey]; vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey]; if (multiIntervals.size() < mFieldMatchers.size()) { VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); multiIntervals.resize(mFieldMatchers.size()); } if (mField > event.size()) { VLOG("Failed to extract value field %d from atom %s. %d", mField, event.ToString().c_str(), (int)event.size()); for (int i = 0; i < (int)mFieldMatchers.size(); i++) { const Matcher& matcher = mFieldMatchers[i]; Interval& interval = multiIntervals[i]; interval.valueIndex = i; Value value; if (!getDoubleOrLong(event, matcher, value)) { VLOG("Failed to get value %d from event %s", i, event.ToString().c_str()); return; } Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue); if (mUseDiff) { // no base. just update base and return. Loading Loading @@ -509,9 +534,10 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn interval.hasValue = true; } interval.sampleSize += 1; } // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = interval.value.long_value; long wholeBucketVal = multiIntervals[0].value.long_value; auto prev = mCurrentFullBucket.find(eventKey); if (prev != mCurrentFullBucket.end()) { wholeBucketVal += prev->second; Loading Loading @@ -540,7 +566,9 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); // take base again in future good bucket. for (auto& slice : mCurrentSlicedBucket) { slice.second.hasBase = false; for (auto& interval : slice.second) { interval.hasBase = false; } } } VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, Loading @@ -552,37 +580,38 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { (int)mCurrentSlicedBucket.size()); int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); ValueBucket info; info.mBucketStartNs = mCurrentBucketStartTimeNs; if (eventTimeNs < fullBucketEndTimeNs) { info.mBucketEndNs = eventTimeNs; } else { info.mBucketEndNs = fullBucketEndTimeNs; } int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs; if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { if (bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { if (slice.second.hasValue) { ValueBucket bucket; bucket.mBucketStartNs = mCurrentBucketStartTimeNs; bucket.mBucketEndNs = bucketEndTime; for (const auto& interval : slice.second) { if (interval.hasValue) { // skip the output if the diff is zero if (mSkipZeroDiffOutput && mUseDiff && slice.second.value.isZero()) { if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) { continue; } bucket.valueIndex.push_back(interval.valueIndex); if (mAggregationType != ValueMetric::AVG) { info.value = slice.second.value; bucket.values.push_back(interval.value); } else { double sum = slice.second.value.type == LONG ? (double)slice.second.value.long_value : slice.second.value.double_value; info.value.setDouble(sum / slice.second.sampleSize); double sum = interval.value.type == LONG ? (double)interval.value.long_value : interval.value.double_value; bucket.values.push_back(Value((double)sum / interval.sampleSize)); } } } // it will auto create new vector of ValuebucketInfo if the key is not found. if (bucket.valueIndex.size() > 0) { auto& bucketList = mPastBuckets[slice.first]; bucketList.push_back(info); bucketList.push_back(bucket); } } } else { mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime); } if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker. Loading @@ -590,7 +619,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second.value.long_value; mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; } for (const auto& slice : mCurrentFullBucket) { for (auto& tracker : mAnomalyTrackers) { Loading @@ -606,7 +635,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { for (auto& tracker : mAnomalyTrackers) { if (tracker != nullptr) { // TODO: fix this when anomaly can accept double values tracker->addPastBucket(slice.first, slice.second.value.long_value, tracker->addPastBucket(slice.first, slice.second[0].value.long_value, mCurrentBucketNum); } } Loading @@ -616,14 +645,16 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { // Accumulate partial bucket. for (const auto& slice : mCurrentSlicedBucket) { // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second.value.long_value; mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; } } // Reset counters for (auto& slice : mCurrentSlicedBucket) { slice.second.hasValue = false; slice.second.sampleSize = 0; for (auto& interval : slice.second) { interval.hasValue = false; interval.sampleSize = 0; } } } Loading cmds/statsd/src/metrics/ValueMetricProducer.h +8 −6 Original line number Diff line number Diff line Loading @@ -34,7 +34,8 @@ namespace statsd { struct ValueBucket { int64_t mBucketStartNs; int64_t mBucketEndNs; Value value; std::vector<int> valueIndex; std::vector<Value> values; }; class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver { Loading Loading @@ -97,7 +98,8 @@ private: sp<StatsPullerManager> mPullerManager; const FieldMatcher mValueField; // Value fields for matching. std::vector<Matcher> mFieldMatchers; // tagId for pulled data. -1 if this is not pulled const int mPullTagId; Loading @@ -105,10 +107,10 @@ private: // if this is pulled metric const bool mIsPulled; int mField; // internal state of a bucket. // internal state of an ongoing aggregation bucket. typedef struct { // Index in multi value aggregation. int valueIndex; // Holds current base value of the dimension. Take diff and update if necessary. Value base; // Whether there is a base to diff to. Loading @@ -122,7 +124,7 @@ private: bool hasValue; } Interval; std::unordered_map<MetricDimensionKey, Interval> mCurrentSlicedBucket; std::unordered_map<MetricDimensionKey, std::vector<Interval>> mCurrentSlicedBucket; std::unordered_map<MetricDimensionKey, int64_t> mCurrentFullBucket; Loading cmds/statsd/src/stats_log.proto +13 −3 Original line number Diff line number Diff line Loading @@ -108,12 +108,22 @@ message ValueBucketInfo { optional int64 value = 3 [deprecated = true]; oneof values { int64 value_long = 7; oneof single_value { int64 value_long = 7 [deprecated = true]; double value_double = 8; double value_double = 8 [deprecated = true]; } message Value { optional int32 index = 1; oneof value { int64 value_long = 2; double value_double = 3; } } repeated Value values = 9; optional int64 bucket_num = 4; optional int64 start_bucket_elapsed_millis = 5; Loading cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp +8 −8 Original line number Diff line number Diff line Loading @@ -142,23 +142,23 @@ TEST(ValueMetricE2eTest, TestPulledEvents) { EXPECT_EQ(baseTimeNs + 2 * bucketSizeNs, data.bucket_info(0).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(0).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(0).has_value_long()); EXPECT_EQ(1, data.bucket_info(0).values_size()); EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(1).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(1).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(1).has_value_long()); EXPECT_EQ(1, data.bucket_info(1).values_size()); EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(2).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 5 * bucketSizeNs, data.bucket_info(2).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(2).has_value_long()); EXPECT_EQ(1, data.bucket_info(2).values_size()); EXPECT_EQ(baseTimeNs + 6 * bucketSizeNs, data.bucket_info(3).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 7 * bucketSizeNs, data.bucket_info(3).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(3).has_value_long()); EXPECT_EQ(1, data.bucket_info(3).values_size()); EXPECT_EQ(baseTimeNs + 7 * bucketSizeNs, data.bucket_info(4).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 8 * bucketSizeNs, data.bucket_info(4).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(4).has_value_long()); EXPECT_EQ(1, data.bucket_info(4).values_size()); } TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm) { Loading Loading @@ -249,15 +249,15 @@ TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm) { EXPECT_EQ(baseTimeNs + 2 * bucketSizeNs, data.bucket_info(0).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(0).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(0).has_value_long()); EXPECT_EQ(1, data.bucket_info(0).values_size()); EXPECT_EQ(baseTimeNs + 8 * bucketSizeNs, data.bucket_info(1).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 9 * bucketSizeNs, data.bucket_info(1).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(1).has_value_long()); EXPECT_EQ(1, data.bucket_info(1).values_size()); EXPECT_EQ(baseTimeNs + 9 * bucketSizeNs, data.bucket_info(2).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 10 * bucketSizeNs, data.bucket_info(2).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(2).has_value_long()); EXPECT_EQ(1, data.bucket_info(2).values_size()); } #else Loading cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +54 −54 File changed.Preview size limit exceeded, changes collapsed. Show changes Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +165 −134 Original line number Diff line number Diff line Loading @@ -64,8 +64,10 @@ const int FIELD_ID_BUCKET_INFO = 3; const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5; // for ValueBucketInfo const int FIELD_ID_VALUE_LONG = 7; const int FIELD_ID_VALUE_DOUBLE = 8; const int FIELD_ID_VALUE_INDEX = 1; const int FIELD_ID_VALUE_LONG = 2; const int FIELD_ID_VALUE_DOUBLE = 3; const int FIELD_ID_VALUES = 9; const int FIELD_ID_BUCKET_NUM = 4; const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; Loading @@ -78,7 +80,6 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric const sp<StatsPullerManager>& pullerManager) : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard), mPullerManager(pullerManager), mValueField(metric.value_field()), mPullTagId(pullTagId), mIsPulled(pullTagId != -1), mMinBucketSizeNs(metric.min_bucket_size_nanos()), Loading @@ -103,6 +104,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric } mBucketSizeNs = bucketSizeMills * 1000000; translateFieldMatcher(metric.value_field(), &mFieldMatchers); if (metric.has_dimensions_in_what()) { translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat); mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); Loading @@ -122,9 +126,6 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric } } if (mValueField.child_size() > 0) { mField = mValueField.child(0).field(); } mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0); mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) || HasPositionALL(metric.dimensions_in_condition()); Loading Loading @@ -259,18 +260,27 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } if (bucket.value.getType() == LONG) { for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) { int index = bucket.valueIndex[i]; const Value& value = bucket.values[i]; uint64_t valueToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES); protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, index); if (value.getType() == LONG) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)bucket.value.long_value); VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.value.long_value); } else if (bucket.value.getType() == DOUBLE) { (long long)value.long_value); VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, index, (long long)value.long_value); } else if (value.getType() == DOUBLE) { protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.value.double_value); VLOG("\t bucket [%lld - %lld] count: %.2f", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, bucket.value.double_value); value.double_value); VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, index, value.double_value); } else { VLOG("Wrong value type for ValueMetric output: %d", bucket.value.getType()); VLOG("Wrong value type for ValueMetric output: %d", value.getType()); } protoOutput->end(valueToken); } protoOutput->end(bucketInfoToken); } Loading Loading @@ -303,7 +313,9 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, // when condition change from true to false, clear diff base if (mUseDiff && mCondition && !condition) { for (auto& slice : mCurrentSlicedBucket) { slice.second.hasBase = false; for (auto& interval : slice.second) { interval.hasBase = false; } } } Loading Loading @@ -363,10 +375,12 @@ void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { (unsigned long)mCurrentSlicedBucket.size()); if (verbose) { for (const auto& it : mCurrentSlicedBucket) { for (const auto& interval : it.second) { fprintf(out, "\t(what)%s\t(condition)%s (value)%s\n", it.first.getDimensionKeyInWhat().toString().c_str(), it.first.getDimensionKeyInCondition().toString().c_str(), it.second.value.toString().c_str()); interval.value.toString().c_str()); } } } } Loading @@ -391,25 +405,29 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { return false; } const Value getDoubleOrLong(const Value& value) { Value v; switch (value.type) { bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) { for (const FieldValue& value : event.getValues()) { if (value.mField.matches(matcher)) { switch (value.mValue.type) { case INT: v.setLong(value.int_value); ret.setLong(value.mValue.int_value); break; case LONG: v.setLong(value.long_value); ret.setLong(value.mValue.long_value); break; case FLOAT: v.setDouble(value.float_value); ret.setDouble(value.mValue.float_value); break; case DOUBLE: v.setDouble(value.double_value); ret.setDouble(value.mValue.double_value); break; default: break; } return v; return true; } } return false; } void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex, Loading @@ -436,14 +454,21 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn if (hitGuardRailLocked(eventKey)) { return; } Interval& interval = mCurrentSlicedBucket[eventKey]; vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey]; if (multiIntervals.size() < mFieldMatchers.size()) { VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); multiIntervals.resize(mFieldMatchers.size()); } if (mField > event.size()) { VLOG("Failed to extract value field %d from atom %s. %d", mField, event.ToString().c_str(), (int)event.size()); for (int i = 0; i < (int)mFieldMatchers.size(); i++) { const Matcher& matcher = mFieldMatchers[i]; Interval& interval = multiIntervals[i]; interval.valueIndex = i; Value value; if (!getDoubleOrLong(event, matcher, value)) { VLOG("Failed to get value %d from event %s", i, event.ToString().c_str()); return; } Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue); if (mUseDiff) { // no base. just update base and return. Loading Loading @@ -509,9 +534,10 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn interval.hasValue = true; } interval.sampleSize += 1; } // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = interval.value.long_value; long wholeBucketVal = multiIntervals[0].value.long_value; auto prev = mCurrentFullBucket.find(eventKey); if (prev != mCurrentFullBucket.end()) { wholeBucketVal += prev->second; Loading Loading @@ -540,7 +566,9 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); // take base again in future good bucket. for (auto& slice : mCurrentSlicedBucket) { slice.second.hasBase = false; for (auto& interval : slice.second) { interval.hasBase = false; } } } VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, Loading @@ -552,37 +580,38 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { (int)mCurrentSlicedBucket.size()); int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); ValueBucket info; info.mBucketStartNs = mCurrentBucketStartTimeNs; if (eventTimeNs < fullBucketEndTimeNs) { info.mBucketEndNs = eventTimeNs; } else { info.mBucketEndNs = fullBucketEndTimeNs; } int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs; if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { if (bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { if (slice.second.hasValue) { ValueBucket bucket; bucket.mBucketStartNs = mCurrentBucketStartTimeNs; bucket.mBucketEndNs = bucketEndTime; for (const auto& interval : slice.second) { if (interval.hasValue) { // skip the output if the diff is zero if (mSkipZeroDiffOutput && mUseDiff && slice.second.value.isZero()) { if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) { continue; } bucket.valueIndex.push_back(interval.valueIndex); if (mAggregationType != ValueMetric::AVG) { info.value = slice.second.value; bucket.values.push_back(interval.value); } else { double sum = slice.second.value.type == LONG ? (double)slice.second.value.long_value : slice.second.value.double_value; info.value.setDouble(sum / slice.second.sampleSize); double sum = interval.value.type == LONG ? (double)interval.value.long_value : interval.value.double_value; bucket.values.push_back(Value((double)sum / interval.sampleSize)); } } } // it will auto create new vector of ValuebucketInfo if the key is not found. if (bucket.valueIndex.size() > 0) { auto& bucketList = mPastBuckets[slice.first]; bucketList.push_back(info); bucketList.push_back(bucket); } } } else { mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime); } if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker. Loading @@ -590,7 +619,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second.value.long_value; mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; } for (const auto& slice : mCurrentFullBucket) { for (auto& tracker : mAnomalyTrackers) { Loading @@ -606,7 +635,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { for (auto& tracker : mAnomalyTrackers) { if (tracker != nullptr) { // TODO: fix this when anomaly can accept double values tracker->addPastBucket(slice.first, slice.second.value.long_value, tracker->addPastBucket(slice.first, slice.second[0].value.long_value, mCurrentBucketNum); } } Loading @@ -616,14 +645,16 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { // Accumulate partial bucket. for (const auto& slice : mCurrentSlicedBucket) { // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second.value.long_value; mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; } } // Reset counters for (auto& slice : mCurrentSlicedBucket) { slice.second.hasValue = false; slice.second.sampleSize = 0; for (auto& interval : slice.second) { interval.hasValue = false; interval.sampleSize = 0; } } } Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +8 −6 Original line number Diff line number Diff line Loading @@ -34,7 +34,8 @@ namespace statsd { struct ValueBucket { int64_t mBucketStartNs; int64_t mBucketEndNs; Value value; std::vector<int> valueIndex; std::vector<Value> values; }; class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver { Loading Loading @@ -97,7 +98,8 @@ private: sp<StatsPullerManager> mPullerManager; const FieldMatcher mValueField; // Value fields for matching. std::vector<Matcher> mFieldMatchers; // tagId for pulled data. -1 if this is not pulled const int mPullTagId; Loading @@ -105,10 +107,10 @@ private: // if this is pulled metric const bool mIsPulled; int mField; // internal state of a bucket. // internal state of an ongoing aggregation bucket. typedef struct { // Index in multi value aggregation. int valueIndex; // Holds current base value of the dimension. Take diff and update if necessary. Value base; // Whether there is a base to diff to. Loading @@ -122,7 +124,7 @@ private: bool hasValue; } Interval; std::unordered_map<MetricDimensionKey, Interval> mCurrentSlicedBucket; std::unordered_map<MetricDimensionKey, std::vector<Interval>> mCurrentSlicedBucket; std::unordered_map<MetricDimensionKey, int64_t> mCurrentFullBucket; Loading
cmds/statsd/src/stats_log.proto +13 −3 Original line number Diff line number Diff line Loading @@ -108,12 +108,22 @@ message ValueBucketInfo { optional int64 value = 3 [deprecated = true]; oneof values { int64 value_long = 7; oneof single_value { int64 value_long = 7 [deprecated = true]; double value_double = 8; double value_double = 8 [deprecated = true]; } message Value { optional int32 index = 1; oneof value { int64 value_long = 2; double value_double = 3; } } repeated Value values = 9; optional int64 bucket_num = 4; optional int64 start_bucket_elapsed_millis = 5; Loading
cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp +8 −8 Original line number Diff line number Diff line Loading @@ -142,23 +142,23 @@ TEST(ValueMetricE2eTest, TestPulledEvents) { EXPECT_EQ(baseTimeNs + 2 * bucketSizeNs, data.bucket_info(0).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(0).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(0).has_value_long()); EXPECT_EQ(1, data.bucket_info(0).values_size()); EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(1).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(1).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(1).has_value_long()); EXPECT_EQ(1, data.bucket_info(1).values_size()); EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(2).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 5 * bucketSizeNs, data.bucket_info(2).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(2).has_value_long()); EXPECT_EQ(1, data.bucket_info(2).values_size()); EXPECT_EQ(baseTimeNs + 6 * bucketSizeNs, data.bucket_info(3).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 7 * bucketSizeNs, data.bucket_info(3).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(3).has_value_long()); EXPECT_EQ(1, data.bucket_info(3).values_size()); EXPECT_EQ(baseTimeNs + 7 * bucketSizeNs, data.bucket_info(4).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 8 * bucketSizeNs, data.bucket_info(4).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(4).has_value_long()); EXPECT_EQ(1, data.bucket_info(4).values_size()); } TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm) { Loading Loading @@ -249,15 +249,15 @@ TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm) { EXPECT_EQ(baseTimeNs + 2 * bucketSizeNs, data.bucket_info(0).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(0).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(0).has_value_long()); EXPECT_EQ(1, data.bucket_info(0).values_size()); EXPECT_EQ(baseTimeNs + 8 * bucketSizeNs, data.bucket_info(1).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 9 * bucketSizeNs, data.bucket_info(1).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(1).has_value_long()); EXPECT_EQ(1, data.bucket_info(1).values_size()); EXPECT_EQ(baseTimeNs + 9 * bucketSizeNs, data.bucket_info(2).start_bucket_elapsed_nanos()); EXPECT_EQ(baseTimeNs + 10 * bucketSizeNs, data.bucket_info(2).end_bucket_elapsed_nanos()); EXPECT_TRUE(data.bucket_info(2).has_value_long()); EXPECT_EQ(1, data.bucket_info(2).values_size()); } #else Loading
cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +54 −54 File changed.Preview size limit exceeded, changes collapsed. Show changes