Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 32717c3b authored by Chenjie Yu's avatar Chenjie Yu
Browse files

multi-value aggregation in ValueMetric

Allow aggregation on multiple fields, instead of one at a time.
All these fields should use the same aggregation time, use_diff,
direction, etc.
The config reuses value_field but allows multiple fields to be
specified.
The order they are specified determines the "index" of a value in the
output.

Bug: 119217634
Test: unit test
Change-Id: I38b1465d13723a897b30ee0b4f868498f60ad4db
parent 252c7c43
Loading
Loading
Loading
Loading
+165 −134
Original line number Diff line number Diff line
@@ -64,8 +64,10 @@ const int FIELD_ID_BUCKET_INFO = 3;
const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
// for ValueBucketInfo
const int FIELD_ID_VALUE_LONG = 7;
const int FIELD_ID_VALUE_DOUBLE = 8;
const int FIELD_ID_VALUE_INDEX = 1;
const int FIELD_ID_VALUE_LONG = 2;
const int FIELD_ID_VALUE_DOUBLE = 3;
const int FIELD_ID_VALUES = 9;
const int FIELD_ID_BUCKET_NUM = 4;
const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
@@ -78,7 +80,6 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
                                         const sp<StatsPullerManager>& pullerManager)
    : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
      mPullerManager(pullerManager),
      mValueField(metric.value_field()),
      mPullTagId(pullTagId),
      mIsPulled(pullTagId != -1),
      mMinBucketSizeNs(metric.min_bucket_size_nanos()),
@@ -103,6 +104,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
    }

    mBucketSizeNs = bucketSizeMills * 1000000;

    translateFieldMatcher(metric.value_field(), &mFieldMatchers);

    if (metric.has_dimensions_in_what()) {
        translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
        mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
@@ -122,9 +126,6 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
        }
    }

    if (mValueField.child_size() > 0) {
        mField = mValueField.child(0).field();
    }
    mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
    mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
                          HasPositionALL(metric.dimensions_in_condition());
@@ -259,18 +260,27 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
                                   (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
            }
            if (bucket.value.getType() == LONG) {
            for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) {
                int index = bucket.valueIndex[i];
                const Value& value = bucket.values[i];
                uint64_t valueToken = protoOutput->start(
                        FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
                protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX,
                                   index);
                if (value.getType() == LONG) {
                    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
                                   (long long)bucket.value.long_value);
                VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
                     (long long)bucket.mBucketEndNs, (long long)bucket.value.long_value);
            } else if (bucket.value.getType() == DOUBLE) {
                                       (long long)value.long_value);
                    VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs,
                         (long long)bucket.mBucketEndNs, index, (long long)value.long_value);
                } else if (value.getType() == DOUBLE) {
                    protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
                                   bucket.value.double_value);
                VLOG("\t bucket [%lld - %lld] count: %.2f", (long long)bucket.mBucketStartNs,
                     (long long)bucket.mBucketEndNs, bucket.value.double_value);
                                       value.double_value);
                    VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs,
                         (long long)bucket.mBucketEndNs, index, value.double_value);
                } else {
                VLOG("Wrong value type for ValueMetric output: %d", bucket.value.getType());
                    VLOG("Wrong value type for ValueMetric output: %d", value.getType());
                }
                protoOutput->end(valueToken);
            }
            protoOutput->end(bucketInfoToken);
        }
@@ -303,7 +313,9 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
    // when condition change from true to false, clear diff base
    if (mUseDiff && mCondition && !condition) {
        for (auto& slice : mCurrentSlicedBucket) {
            slice.second.hasBase = false;
            for (auto& interval : slice.second) {
                interval.hasBase = false;
            }
        }
    }

@@ -363,10 +375,12 @@ void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
            (unsigned long)mCurrentSlicedBucket.size());
    if (verbose) {
        for (const auto& it : mCurrentSlicedBucket) {
          for (const auto& interval : it.second) {
            fprintf(out, "\t(what)%s\t(condition)%s  (value)%s\n",
                    it.first.getDimensionKeyInWhat().toString().c_str(),
                    it.first.getDimensionKeyInCondition().toString().c_str(),
                    it.second.value.toString().c_str());
                    interval.value.toString().c_str());
          }
        }
    }
}
@@ -391,25 +405,29 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
    return false;
}

const Value getDoubleOrLong(const Value& value) {
    Value v;
    switch (value.type) {
bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
    for (const FieldValue& value : event.getValues()) {
        if (value.mField.matches(matcher)) {
            switch (value.mValue.type) {
                case INT:
            v.setLong(value.int_value);
                    ret.setLong(value.mValue.int_value);
                    break;
                case LONG:
            v.setLong(value.long_value);
                    ret.setLong(value.mValue.long_value);
                    break;
                case FLOAT:
            v.setDouble(value.float_value);
                    ret.setDouble(value.mValue.float_value);
                    break;
                case DOUBLE:
            v.setDouble(value.double_value);
                    ret.setDouble(value.mValue.double_value);
                    break;
                default:
                    break;
            }
    return v;
            return true;
        }
    }
    return false;
}

void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
@@ -436,14 +454,21 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
    if (hitGuardRailLocked(eventKey)) {
        return;
    }
    Interval& interval = mCurrentSlicedBucket[eventKey];
    vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey];
    if (multiIntervals.size() < mFieldMatchers.size()) {
        VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
        multiIntervals.resize(mFieldMatchers.size());
    }

    if (mField > event.size()) {
        VLOG("Failed to extract value field %d from atom %s. %d", mField, event.ToString().c_str(),
             (int)event.size());
    for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
        const Matcher& matcher = mFieldMatchers[i];
        Interval& interval = multiIntervals[i];
        interval.valueIndex = i;
        Value value;
        if (!getDoubleOrLong(event, matcher, value)) {
            VLOG("Failed to get value %d from event %s", i, event.ToString().c_str());
            return;
        }
    Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue);

        if (mUseDiff) {
            // no base. just update base and return.
@@ -509,9 +534,10 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
            interval.hasValue = true;
        }
        interval.sampleSize += 1;
    }

    // TODO: propgate proper values down stream when anomaly support doubles
    long wholeBucketVal = interval.value.long_value;
    long wholeBucketVal = multiIntervals[0].value.long_value;
    auto prev = mCurrentFullBucket.find(eventKey);
    if (prev != mCurrentFullBucket.end()) {
        wholeBucketVal += prev->second;
@@ -540,7 +566,9 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
        VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
        // take base again in future good bucket.
        for (auto& slice : mCurrentSlicedBucket) {
            slice.second.hasBase = false;
            for (auto& interval : slice.second) {
                interval.hasBase = false;
            }
        }
    }
    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
@@ -552,37 +580,38 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
         (int)mCurrentSlicedBucket.size());
    int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();

    ValueBucket info;
    info.mBucketStartNs = mCurrentBucketStartTimeNs;
    if (eventTimeNs < fullBucketEndTimeNs) {
        info.mBucketEndNs = eventTimeNs;
    } else {
        info.mBucketEndNs = fullBucketEndTimeNs;
    }
    int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;

    if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
    if (bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
        // The current bucket is large enough to keep.
        for (const auto& slice : mCurrentSlicedBucket) {
            if (slice.second.hasValue) {
            ValueBucket bucket;
            bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
            bucket.mBucketEndNs = bucketEndTime;
            for (const auto& interval : slice.second) {
                if (interval.hasValue) {
                    // skip the output if the diff is zero
                if (mSkipZeroDiffOutput && mUseDiff && slice.second.value.isZero()) {
                    if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
                        continue;
                    }
                    bucket.valueIndex.push_back(interval.valueIndex);
                    if (mAggregationType != ValueMetric::AVG) {
                    info.value = slice.second.value;
                        bucket.values.push_back(interval.value);
                    } else {
                    double sum = slice.second.value.type == LONG
                                         ? (double)slice.second.value.long_value
                                         : slice.second.value.double_value;
                    info.value.setDouble(sum / slice.second.sampleSize);
                        double sum = interval.value.type == LONG ? (double)interval.value.long_value
                                                                 : interval.value.double_value;
                        bucket.values.push_back(Value((double)sum / interval.sampleSize));
                    }
                }
            }
            // it will auto create new vector of ValuebucketInfo if the key is not found.
            if (bucket.valueIndex.size() > 0) {
                auto& bucketList = mPastBuckets[slice.first];
                bucketList.push_back(info);
                bucketList.push_back(bucket);
            }
        }
    } else {
        mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
        mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
    }

    if (eventTimeNs > fullBucketEndTimeNs) {  // If full bucket, send to anomaly tracker.
@@ -590,7 +619,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
        if (mCurrentFullBucket.size() > 0) {
            for (const auto& slice : mCurrentSlicedBucket) {
                // TODO: fix this when anomaly can accept double values
                mCurrentFullBucket[slice.first] += slice.second.value.long_value;
                mCurrentFullBucket[slice.first] += slice.second[0].value.long_value;
            }
            for (const auto& slice : mCurrentFullBucket) {
                for (auto& tracker : mAnomalyTrackers) {
@@ -606,7 +635,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
                for (auto& tracker : mAnomalyTrackers) {
                    if (tracker != nullptr) {
                        // TODO: fix this when anomaly can accept double values
                        tracker->addPastBucket(slice.first, slice.second.value.long_value,
                        tracker->addPastBucket(slice.first, slice.second[0].value.long_value,
                                               mCurrentBucketNum);
                    }
                }
@@ -616,14 +645,16 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
        // Accumulate partial bucket.
        for (const auto& slice : mCurrentSlicedBucket) {
            // TODO: fix this when anomaly can accept double values
            mCurrentFullBucket[slice.first] += slice.second.value.long_value;
            mCurrentFullBucket[slice.first] += slice.second[0].value.long_value;
        }
    }

    // Reset counters
    for (auto& slice : mCurrentSlicedBucket) {
        slice.second.hasValue = false;
        slice.second.sampleSize = 0;
        for (auto& interval : slice.second) {
            interval.hasValue = false;
            interval.sampleSize = 0;
        }
    }
}

+8 −6
Original line number Diff line number Diff line
@@ -34,7 +34,8 @@ namespace statsd {
struct ValueBucket {
    int64_t mBucketStartNs;
    int64_t mBucketEndNs;
    Value value;
    std::vector<int> valueIndex;
    std::vector<Value> values;
};

class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver {
@@ -97,7 +98,8 @@ private:

    sp<StatsPullerManager> mPullerManager;

    const FieldMatcher mValueField;
    // Value fields for matching.
    std::vector<Matcher> mFieldMatchers;

    // tagId for pulled data. -1 if this is not pulled
    const int mPullTagId;
@@ -105,10 +107,10 @@ private:
    // if this is pulled metric
    const bool mIsPulled;

    int mField;

    // internal state of a bucket.
    // internal state of an ongoing aggregation bucket.
    typedef struct {
        // Index in multi value aggregation.
        int valueIndex;
        // Holds current base value of the dimension. Take diff and update if necessary.
        Value base;
        // Whether there is a base to diff to.
@@ -122,7 +124,7 @@ private:
        bool hasValue;
    } Interval;

    std::unordered_map<MetricDimensionKey, Interval> mCurrentSlicedBucket;
    std::unordered_map<MetricDimensionKey, std::vector<Interval>> mCurrentSlicedBucket;

    std::unordered_map<MetricDimensionKey, int64_t> mCurrentFullBucket;

+13 −3
Original line number Diff line number Diff line
@@ -108,12 +108,22 @@ message ValueBucketInfo {

  optional int64 value = 3 [deprecated = true];

  oneof values {
      int64 value_long = 7;
  oneof single_value {
      int64 value_long = 7 [deprecated = true];

      double value_double = 8;
      double value_double = 8 [deprecated = true];
  }

  message Value {
      optional int32 index = 1;
      oneof value {
          int64 value_long = 2;
          double value_double = 3;
      }
  }

  repeated Value values = 9;

  optional int64 bucket_num = 4;

  optional int64 start_bucket_elapsed_millis = 5;
+8 −8
Original line number Diff line number Diff line
@@ -142,23 +142,23 @@ TEST(ValueMetricE2eTest, TestPulledEvents) {

    EXPECT_EQ(baseTimeNs + 2 * bucketSizeNs, data.bucket_info(0).start_bucket_elapsed_nanos());
    EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(0).end_bucket_elapsed_nanos());
    EXPECT_TRUE(data.bucket_info(0).has_value_long());
    EXPECT_EQ(1, data.bucket_info(0).values_size());

    EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(1).start_bucket_elapsed_nanos());
    EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(1).end_bucket_elapsed_nanos());
    EXPECT_TRUE(data.bucket_info(1).has_value_long());
    EXPECT_EQ(1, data.bucket_info(1).values_size());

    EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(2).start_bucket_elapsed_nanos());
    EXPECT_EQ(baseTimeNs + 5 * bucketSizeNs, data.bucket_info(2).end_bucket_elapsed_nanos());
    EXPECT_TRUE(data.bucket_info(2).has_value_long());
    EXPECT_EQ(1, data.bucket_info(2).values_size());

    EXPECT_EQ(baseTimeNs + 6 * bucketSizeNs, data.bucket_info(3).start_bucket_elapsed_nanos());
    EXPECT_EQ(baseTimeNs + 7 * bucketSizeNs, data.bucket_info(3).end_bucket_elapsed_nanos());
    EXPECT_TRUE(data.bucket_info(3).has_value_long());
    EXPECT_EQ(1, data.bucket_info(3).values_size());

    EXPECT_EQ(baseTimeNs + 7 * bucketSizeNs, data.bucket_info(4).start_bucket_elapsed_nanos());
    EXPECT_EQ(baseTimeNs + 8 * bucketSizeNs, data.bucket_info(4).end_bucket_elapsed_nanos());
    EXPECT_TRUE(data.bucket_info(4).has_value_long());
    EXPECT_EQ(1, data.bucket_info(4).values_size());
}

TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm) {
@@ -249,15 +249,15 @@ TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm) {

    EXPECT_EQ(baseTimeNs + 2 * bucketSizeNs, data.bucket_info(0).start_bucket_elapsed_nanos());
    EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(0).end_bucket_elapsed_nanos());
    EXPECT_TRUE(data.bucket_info(0).has_value_long());
    EXPECT_EQ(1, data.bucket_info(0).values_size());

    EXPECT_EQ(baseTimeNs + 8 * bucketSizeNs, data.bucket_info(1).start_bucket_elapsed_nanos());
    EXPECT_EQ(baseTimeNs + 9 * bucketSizeNs, data.bucket_info(1).end_bucket_elapsed_nanos());
    EXPECT_TRUE(data.bucket_info(1).has_value_long());
    EXPECT_EQ(1, data.bucket_info(1).values_size());

    EXPECT_EQ(baseTimeNs + 9 * bucketSizeNs, data.bucket_info(2).start_bucket_elapsed_nanos());
    EXPECT_EQ(baseTimeNs + 10 * bucketSizeNs, data.bucket_info(2).end_bucket_elapsed_nanos());
    EXPECT_TRUE(data.bucket_info(2).has_value_long());
    EXPECT_EQ(1, data.bucket_info(2).values_size());
}

#else
+54 −54

File changed.

Preview size limit exceeded, changes collapsed.