Loading cmds/statsd/src/FieldValue.cpp +147 −0 Original line number Diff line number Diff line Loading @@ -141,6 +141,9 @@ Value::Value(const Value& from) { case FLOAT: float_value = from.float_value; break; case DOUBLE: double_value = from.double_value; break; case STRING: str_value = from.str_value; break; Loading @@ -157,6 +160,8 @@ std::string Value::toString() const { return std::to_string(long_value) + "[L]"; case FLOAT: return std::to_string(float_value) + "[F]"; case DOUBLE: return std::to_string(double_value) + "[D]"; case STRING: return str_value + "[S]"; default: Loading @@ -174,6 +179,8 @@ bool Value::operator==(const Value& that) const { return long_value == that.long_value; case FLOAT: return float_value == that.float_value; case DOUBLE: return double_value == that.double_value; case STRING: return str_value == that.str_value; default: Loading @@ -190,6 +197,8 @@ bool Value::operator!=(const Value& that) const { return long_value != that.long_value; case FLOAT: return float_value != that.float_value; case DOUBLE: return double_value != that.double_value; case STRING: return str_value != that.str_value; default: Loading @@ -207,6 +216,8 @@ bool Value::operator<(const Value& that) const { return long_value < that.long_value; case FLOAT: return float_value < that.float_value; case DOUBLE: return double_value < that.double_value; case STRING: return str_value < that.str_value; default: Loading @@ -214,6 +225,142 @@ bool Value::operator<(const Value& that) const { } } bool Value::operator>(const Value& that) const { if (type != that.getType()) return type > that.getType(); switch (type) { case INT: return int_value > that.int_value; case LONG: return long_value > that.long_value; case FLOAT: return float_value > that.float_value; case DOUBLE: return double_value > that.double_value; case STRING: return str_value > that.str_value; default: return false; } } bool Value::operator>=(const Value& that) const { if (type != that.getType()) return type >= that.getType(); switch (type) { case INT: return int_value >= that.int_value; case LONG: return long_value >= that.long_value; case FLOAT: return float_value >= that.float_value; case DOUBLE: return double_value >= that.double_value; case STRING: return str_value >= that.str_value; default: return false; } } Value Value::operator-(const Value& that) const { Value v; if (type != that.type) { ALOGE("Can't operate on different value types, %d, %d", type, that.type); return v; } if (type == STRING) { ALOGE("Can't operate on string value type"); return v; } switch (type) { case INT: v.setInt(int_value - that.int_value); break; case LONG: v.setLong(long_value - that.long_value); break; case FLOAT: v.setFloat(float_value - that.float_value); break; case DOUBLE: v.setDouble(double_value - that.double_value); break; default: break; } return v; } Value& Value::operator=(const Value& that) { type = that.type; switch (type) { case INT: int_value = that.int_value; break; case LONG: long_value = that.long_value; break; case FLOAT: float_value = that.float_value; break; case DOUBLE: double_value = that.double_value; break; case STRING: str_value = that.str_value; break; default: break; } return *this; } Value& Value::operator+=(const Value& that) { if (type != that.type) { ALOGE("Can't operate on different value types, %d, %d", type, that.type); return *this; } if (type == STRING) { ALOGE("Can't operate on string value type"); return *this; } switch (type) { case INT: int_value += that.int_value; break; case LONG: long_value += that.long_value; break; case FLOAT: float_value += that.float_value; break; case DOUBLE: double_value += that.double_value; break; default: break; } return *this; } double Value::getDouble() const { switch (type) { case INT: return int_value; case LONG: return long_value; case FLOAT: return float_value; case DOUBLE: return double_value; default: return 0; } } bool equalDimensions(const std::vector<Matcher>& dimension_a, const std::vector<Matcher>& dimension_b) { bool eq = dimension_a.size() == dimension_b.size(); Loading cmds/statsd/src/FieldValue.h +24 −1 Original line number Diff line number Diff line Loading @@ -32,7 +32,7 @@ const int32_t kLastBitMask = 0x80; const int32_t kClearLastBitDeco = 0x7f; const int32_t kClearAllPositionMatcherMask = 0xffff00ff; enum Type { UNKNOWN, INT, LONG, FLOAT, STRING }; enum Type { UNKNOWN, INT, LONG, FLOAT, DOUBLE, STRING }; int32_t getEncodedField(int32_t pos[], int32_t depth, bool includeDepth); Loading Loading @@ -283,6 +283,11 @@ struct Value { type = FLOAT; } Value(double v) { double_value = v; type = DOUBLE; } Value(const std::string& v) { str_value = v; type = STRING; Loading @@ -298,10 +303,21 @@ struct Value { type = LONG; } void setFloat(float v) { float_value = v; type = FLOAT; } void setDouble(double v) { double_value = v; type = DOUBLE; } union { int32_t int_value; int64_t long_value; float float_value; double double_value; }; std::string str_value; Loading @@ -313,12 +329,19 @@ struct Value { return type; } double getDouble() const; Value(const Value& from); bool operator==(const Value& that) const; bool operator!=(const Value& that) const; bool operator<(const Value& that) const; bool operator>(const Value& that) const; bool operator>=(const Value& that) const; Value operator-(const Value& that) const; Value& operator+=(const Value& that); Value& operator=(const Value& that); }; /** Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +101 −41 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ * limitations under the License. */ #define DEBUG false // STOPSHIP if true #define DEBUG true // STOPSHIP if true #include "Log.h" #include "ValueMetricProducer.h" Loading @@ -27,7 +27,7 @@ using android::util::FIELD_COUNT_REPEATED; using android::util::FIELD_TYPE_BOOL; using android::util::FIELD_TYPE_FLOAT; using android::util::FIELD_TYPE_DOUBLE; using android::util::FIELD_TYPE_INT32; using android::util::FIELD_TYPE_INT64; using android::util::FIELD_TYPE_MESSAGE; Loading Loading @@ -64,7 +64,8 @@ const int FIELD_ID_BUCKET_INFO = 3; const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5; // for ValueBucketInfo const int FIELD_ID_VALUE = 3; const int FIELD_ID_VALUE_LONG = 3; const int FIELD_ID_VALUE_DOUBLE = 7; const int FIELD_ID_BUCKET_NUM = 4; const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; Loading @@ -79,6 +80,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric mPullerManager(pullerManager), mValueField(metric.value_field()), mPullTagId(pullTagId), mIsPulled(pullTagId != -1), mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() Loading @@ -88,7 +90,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second : StatsdStats::kDimensionKeySizeHardLimit), mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()) { mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()), mAggregationType(metric.aggregation_type()), mValueType(metric.aggregation_type() == ValueMetric::AVG ? DOUBLE : LONG) { int64_t bucketSizeMills = 0; if (metric.has_bucket()) { bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); Loading Loading @@ -123,9 +127,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) || HasPositionALL(metric.dimensions_in_condition()); // Kicks off the puller immediately. flushIfNeededLocked(startTimestampNs); if (mPullTagId != -1) { // Kicks off the puller immediately. if (mIsPulled) { mPullerManager->RegisterReceiver(mPullTagId, this, mCurrentBucketStartTimeNs + mBucketSizeNs, mBucketSizeNs); } Loading @@ -136,7 +140,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric ValueMetricProducer::~ValueMetricProducer() { VLOG("~ValueMetricProducer() called"); if (mPullTagId != -1) { if (mIsPulled) { mPullerManager->UnRegisterReceiver(mPullTagId, this); } } Loading Loading @@ -245,11 +249,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue); if (mValueType == LONG) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)bucket.mValueLong); } else { protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.mValueDouble); } protoOutput->end(bucketInfoToken); VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.mValue); VLOG("\t bucket [%lld - %lld] count: %lld, %.2f", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.mValueLong, bucket.mValueDouble); } protoOutput->end(wrapperToken); } Loading @@ -271,7 +279,7 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, flushIfNeededLocked(eventTimeNs); if (mPullTagId != -1) { if (mIsPulled) { vector<shared_ptr<LogEvent>> allData; if (mPullerManager->Pull(mPullTagId, eventTimeNs, &allData)) { if (allData.size() == 0) { Loading Loading @@ -321,10 +329,10 @@ void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { (unsigned long)mCurrentSlicedBucket.size()); if (verbose) { for (const auto& it : mCurrentSlicedBucket) { fprintf(out, "\t(what)%s\t(condition)%s (value)%lld\n", fprintf(out, "\t(what)%s\t(condition)%s (value)%s\n", it.first.getDimensionKeyInWhat().toString().c_str(), it.first.getDimensionKeyInCondition().toString().c_str(), (unsigned long long)it.second.sum); it.second.value.toString().c_str()); } } } Loading @@ -349,6 +357,27 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { return false; } const Value getDoubleOrLong(const Value& value) { Value v; switch (value.type) { case INT: v.setLong(value.int_value); break; case LONG: v.setLong(value.long_value); break; case FLOAT: v.setDouble(value.float_value); break; case DOUBLE: v.setDouble(value.double_value); break; default: break; } return v; } void ValueMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const MetricDimensionKey& eventKey, const ConditionKey& conditionKey, bool condition, Loading @@ -367,19 +396,25 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( } Interval& interval = mCurrentSlicedBucket[eventKey]; int error = 0; const int64_t value = event.GetLong(mField, &error); if (error < 0) { if (mField > event.size()) { VLOG("Failed to extract value field %d from atom %s. %d", mField, event.ToString().c_str(), (int)event.size()); return; } Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue); if (mPullTagId != -1) { // for pulled events Value diff; bool hasDiff = false; if (mIsPulled) { // Always require condition for pulled events. In the case of no condition, only pull // on bucket boundaries, in which we fake condition changes. if (mCondition == true) { if (!interval.startUpdated) { interval.start = value; interval.startUpdated = true; } else { // skip it if there is already value recorded for the start // Skip it if there is already value recorded for the start. Happens when puller // takes too long to finish. In this case we take the previous value. VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str()); } } else { Loading @@ -387,31 +422,55 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( // If not, take absolute value or drop it, based on config. if (interval.startUpdated) { if (value >= interval.start) { interval.sum += (value - interval.start); interval.hasValue = true; diff = (value - interval.start); hasDiff = true; } else { if (mUseAbsoluteValueOnReset) { interval.sum += value; interval.hasValue = true; diff = value; hasDiff = true; } else { VLOG("Dropping data for atom %d, prev: %lld, now: %lld", mPullTagId, (long long)interval.start, (long long)value); VLOG("Dropping data for atom %d, prev: %s, now: %s", mPullTagId, interval.start.toString().c_str(), value.toString().c_str()); } } interval.startUpdated = false; } else { VLOG("No start for matching end %lld", (long long)value); interval.tainted += 1; VLOG("No start for matching end %s", value.toString().c_str()); } } } else { // for pushed events, only accumulate when condition is true if (mCondition == true || mConditionTrackerIndex < 0) { interval.sum += value; } else { // for pushed events, only aggregate when sliced condition is true if (condition == true || mConditionTrackerIndex < 0) { diff = value; hasDiff = true; } } if (hasDiff) { if (interval.hasValue) { switch (mAggregationType) { case ValueMetric::SUM: // for AVG, we add up and take average when flushing the bucket case ValueMetric::AVG: interval.value += diff; break; case ValueMetric::MIN: interval.value = diff < interval.value ? diff : interval.value; break; case ValueMetric::MAX: interval.value = diff > interval.value ? diff : interval.value; break; default: break; } } else { interval.value = diff; interval.hasValue = true; } interval.sampleSize += 1; } long wholeBucketVal = interval.sum; // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = interval.value.long_value; auto prev = mCurrentFullBucket.find(eventKey); if (prev != mCurrentFullBucket.end()) { wholeBucketVal += prev->second; Loading Loading @@ -458,18 +517,15 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { // The current bucket is large enough to keep. int tainted = 0; for (const auto& slice : mCurrentSlicedBucket) { tainted += slice.second.tainted; tainted += slice.second.startUpdated; if (slice.second.hasValue) { info.mValue = slice.second.sum; info.mValueLong = slice.second.value.long_value; info.mValueDouble = (double)slice.second.value.long_value / slice.second.sampleSize; // it will auto create new vector of ValuebucketInfo if the key is not found. auto& bucketList = mPastBuckets[slice.first]; bucketList.push_back(info); } } VLOG("%d tainted pairs in the bucket", tainted); } else { mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); } Loading @@ -478,7 +534,8 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { // Accumulate partial buckets with current value and then send to anomaly tracker. if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { mCurrentFullBucket[slice.first] += slice.second.sum; // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second.value.long_value; } for (const auto& slice : mCurrentFullBucket) { for (auto& tracker : mAnomalyTrackers) { Loading @@ -493,7 +550,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { for (const auto& slice : mCurrentSlicedBucket) { for (auto& tracker : mAnomalyTrackers) { if (tracker != nullptr) { tracker->addPastBucket(slice.first, slice.second.sum, mCurrentBucketNum); // TODO: fix this when anomaly can accept double values tracker->addPastBucket(slice.first, slice.second.value.long_value, mCurrentBucketNum); } } } Loading @@ -501,7 +560,8 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { } else { // Accumulate partial bucket. for (const auto& slice : mCurrentSlicedBucket) { mCurrentFullBucket[slice.first] += slice.second.sum; // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second.value.long_value; } } Loading cmds/statsd/src/metrics/ValueMetricProducer.h +22 −8 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ #include "../condition/ConditionTracker.h" #include "../external/PullDataReceiver.h" #include "../external/StatsPullerManager.h" #include "../stats_log_util.h" #include "MetricProducer.h" #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h" Loading @@ -33,7 +34,8 @@ namespace statsd { struct ValueBucket { int64_t mBucketStartNs; int64_t mBucketEndNs; int64_t mValue; int64_t mValueLong; double mValueDouble; }; class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver { Loading @@ -45,6 +47,7 @@ public: virtual ~ValueMetricProducer(); // Process data pulled on bucket boundary. void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override; // ValueMetric needs special logic if it's a pulled atom. Loading Loading @@ -120,20 +123,22 @@ private: // tagId for pulled data. -1 if this is not pulled const int mPullTagId; // if this is pulled metric const bool mIsPulled; int mField; // internal state of a bucket. typedef struct { // Pulled data always come in pair of <start, end>. This holds the value // for start. The diff (end - start) is added to sum. int64_t start; // for start. The diff (end - start) is taken as the real value. Value start; // Whether the start data point is updated bool startUpdated; // If end data point comes before the start, record this pair as tainted // and the value is not added to the running sum. int tainted; // Running sum of known pairs in this bucket int64_t sum; // Current value, depending on the aggregation type. Value value; // Number of samples collected. int sampleSize; // If this dimension has any non-tainted value. If not, don't report the // dimension. bool hasValue; Loading Loading @@ -162,6 +167,10 @@ private: const bool mUseAbsoluteValueOnReset; const ValueMetric::AggregationType mAggregationType; const Type mValueType; FRIEND_TEST(ValueMetricProducerTest, TestNonDimensionalEvents); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset); Loading @@ -176,6 +185,11 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateAvg); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSum); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSumSliced); }; } // namespace statsd Loading cmds/statsd/src/stats_log.proto +5 −1 Original line number Diff line number Diff line Loading @@ -106,7 +106,11 @@ message ValueBucketInfo { optional int64 end_bucket_elapsed_nanos = 2; optional int64 value = 3; oneof values { int64 value_long = 3; double value_double = 7; } optional int64 bucket_num = 4; Loading Loading
cmds/statsd/src/FieldValue.cpp +147 −0 Original line number Diff line number Diff line Loading @@ -141,6 +141,9 @@ Value::Value(const Value& from) { case FLOAT: float_value = from.float_value; break; case DOUBLE: double_value = from.double_value; break; case STRING: str_value = from.str_value; break; Loading @@ -157,6 +160,8 @@ std::string Value::toString() const { return std::to_string(long_value) + "[L]"; case FLOAT: return std::to_string(float_value) + "[F]"; case DOUBLE: return std::to_string(double_value) + "[D]"; case STRING: return str_value + "[S]"; default: Loading @@ -174,6 +179,8 @@ bool Value::operator==(const Value& that) const { return long_value == that.long_value; case FLOAT: return float_value == that.float_value; case DOUBLE: return double_value == that.double_value; case STRING: return str_value == that.str_value; default: Loading @@ -190,6 +197,8 @@ bool Value::operator!=(const Value& that) const { return long_value != that.long_value; case FLOAT: return float_value != that.float_value; case DOUBLE: return double_value != that.double_value; case STRING: return str_value != that.str_value; default: Loading @@ -207,6 +216,8 @@ bool Value::operator<(const Value& that) const { return long_value < that.long_value; case FLOAT: return float_value < that.float_value; case DOUBLE: return double_value < that.double_value; case STRING: return str_value < that.str_value; default: Loading @@ -214,6 +225,142 @@ bool Value::operator<(const Value& that) const { } } bool Value::operator>(const Value& that) const { if (type != that.getType()) return type > that.getType(); switch (type) { case INT: return int_value > that.int_value; case LONG: return long_value > that.long_value; case FLOAT: return float_value > that.float_value; case DOUBLE: return double_value > that.double_value; case STRING: return str_value > that.str_value; default: return false; } } bool Value::operator>=(const Value& that) const { if (type != that.getType()) return type >= that.getType(); switch (type) { case INT: return int_value >= that.int_value; case LONG: return long_value >= that.long_value; case FLOAT: return float_value >= that.float_value; case DOUBLE: return double_value >= that.double_value; case STRING: return str_value >= that.str_value; default: return false; } } Value Value::operator-(const Value& that) const { Value v; if (type != that.type) { ALOGE("Can't operate on different value types, %d, %d", type, that.type); return v; } if (type == STRING) { ALOGE("Can't operate on string value type"); return v; } switch (type) { case INT: v.setInt(int_value - that.int_value); break; case LONG: v.setLong(long_value - that.long_value); break; case FLOAT: v.setFloat(float_value - that.float_value); break; case DOUBLE: v.setDouble(double_value - that.double_value); break; default: break; } return v; } Value& Value::operator=(const Value& that) { type = that.type; switch (type) { case INT: int_value = that.int_value; break; case LONG: long_value = that.long_value; break; case FLOAT: float_value = that.float_value; break; case DOUBLE: double_value = that.double_value; break; case STRING: str_value = that.str_value; break; default: break; } return *this; } Value& Value::operator+=(const Value& that) { if (type != that.type) { ALOGE("Can't operate on different value types, %d, %d", type, that.type); return *this; } if (type == STRING) { ALOGE("Can't operate on string value type"); return *this; } switch (type) { case INT: int_value += that.int_value; break; case LONG: long_value += that.long_value; break; case FLOAT: float_value += that.float_value; break; case DOUBLE: double_value += that.double_value; break; default: break; } return *this; } double Value::getDouble() const { switch (type) { case INT: return int_value; case LONG: return long_value; case FLOAT: return float_value; case DOUBLE: return double_value; default: return 0; } } bool equalDimensions(const std::vector<Matcher>& dimension_a, const std::vector<Matcher>& dimension_b) { bool eq = dimension_a.size() == dimension_b.size(); Loading
cmds/statsd/src/FieldValue.h +24 −1 Original line number Diff line number Diff line Loading @@ -32,7 +32,7 @@ const int32_t kLastBitMask = 0x80; const int32_t kClearLastBitDeco = 0x7f; const int32_t kClearAllPositionMatcherMask = 0xffff00ff; enum Type { UNKNOWN, INT, LONG, FLOAT, STRING }; enum Type { UNKNOWN, INT, LONG, FLOAT, DOUBLE, STRING }; int32_t getEncodedField(int32_t pos[], int32_t depth, bool includeDepth); Loading Loading @@ -283,6 +283,11 @@ struct Value { type = FLOAT; } Value(double v) { double_value = v; type = DOUBLE; } Value(const std::string& v) { str_value = v; type = STRING; Loading @@ -298,10 +303,21 @@ struct Value { type = LONG; } void setFloat(float v) { float_value = v; type = FLOAT; } void setDouble(double v) { double_value = v; type = DOUBLE; } union { int32_t int_value; int64_t long_value; float float_value; double double_value; }; std::string str_value; Loading @@ -313,12 +329,19 @@ struct Value { return type; } double getDouble() const; Value(const Value& from); bool operator==(const Value& that) const; bool operator!=(const Value& that) const; bool operator<(const Value& that) const; bool operator>(const Value& that) const; bool operator>=(const Value& that) const; Value operator-(const Value& that) const; Value& operator+=(const Value& that); Value& operator=(const Value& that); }; /** Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +101 −41 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ * limitations under the License. */ #define DEBUG false // STOPSHIP if true #define DEBUG true // STOPSHIP if true #include "Log.h" #include "ValueMetricProducer.h" Loading @@ -27,7 +27,7 @@ using android::util::FIELD_COUNT_REPEATED; using android::util::FIELD_TYPE_BOOL; using android::util::FIELD_TYPE_FLOAT; using android::util::FIELD_TYPE_DOUBLE; using android::util::FIELD_TYPE_INT32; using android::util::FIELD_TYPE_INT64; using android::util::FIELD_TYPE_MESSAGE; Loading Loading @@ -64,7 +64,8 @@ const int FIELD_ID_BUCKET_INFO = 3; const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5; // for ValueBucketInfo const int FIELD_ID_VALUE = 3; const int FIELD_ID_VALUE_LONG = 3; const int FIELD_ID_VALUE_DOUBLE = 7; const int FIELD_ID_BUCKET_NUM = 4; const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; Loading @@ -79,6 +80,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric mPullerManager(pullerManager), mValueField(metric.value_field()), mPullTagId(pullTagId), mIsPulled(pullTagId != -1), mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() Loading @@ -88,7 +90,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second : StatsdStats::kDimensionKeySizeHardLimit), mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()) { mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()), mAggregationType(metric.aggregation_type()), mValueType(metric.aggregation_type() == ValueMetric::AVG ? DOUBLE : LONG) { int64_t bucketSizeMills = 0; if (metric.has_bucket()) { bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); Loading Loading @@ -123,9 +127,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) || HasPositionALL(metric.dimensions_in_condition()); // Kicks off the puller immediately. flushIfNeededLocked(startTimestampNs); if (mPullTagId != -1) { // Kicks off the puller immediately. if (mIsPulled) { mPullerManager->RegisterReceiver(mPullTagId, this, mCurrentBucketStartTimeNs + mBucketSizeNs, mBucketSizeNs); } Loading @@ -136,7 +140,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric ValueMetricProducer::~ValueMetricProducer() { VLOG("~ValueMetricProducer() called"); if (mPullTagId != -1) { if (mIsPulled) { mPullerManager->UnRegisterReceiver(mPullTagId, this); } } Loading Loading @@ -245,11 +249,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue); if (mValueType == LONG) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)bucket.mValueLong); } else { protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.mValueDouble); } protoOutput->end(bucketInfoToken); VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.mValue); VLOG("\t bucket [%lld - %lld] count: %lld, %.2f", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.mValueLong, bucket.mValueDouble); } protoOutput->end(wrapperToken); } Loading @@ -271,7 +279,7 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, flushIfNeededLocked(eventTimeNs); if (mPullTagId != -1) { if (mIsPulled) { vector<shared_ptr<LogEvent>> allData; if (mPullerManager->Pull(mPullTagId, eventTimeNs, &allData)) { if (allData.size() == 0) { Loading Loading @@ -321,10 +329,10 @@ void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { (unsigned long)mCurrentSlicedBucket.size()); if (verbose) { for (const auto& it : mCurrentSlicedBucket) { fprintf(out, "\t(what)%s\t(condition)%s (value)%lld\n", fprintf(out, "\t(what)%s\t(condition)%s (value)%s\n", it.first.getDimensionKeyInWhat().toString().c_str(), it.first.getDimensionKeyInCondition().toString().c_str(), (unsigned long long)it.second.sum); it.second.value.toString().c_str()); } } } Loading @@ -349,6 +357,27 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { return false; } const Value getDoubleOrLong(const Value& value) { Value v; switch (value.type) { case INT: v.setLong(value.int_value); break; case LONG: v.setLong(value.long_value); break; case FLOAT: v.setDouble(value.float_value); break; case DOUBLE: v.setDouble(value.double_value); break; default: break; } return v; } void ValueMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const MetricDimensionKey& eventKey, const ConditionKey& conditionKey, bool condition, Loading @@ -367,19 +396,25 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( } Interval& interval = mCurrentSlicedBucket[eventKey]; int error = 0; const int64_t value = event.GetLong(mField, &error); if (error < 0) { if (mField > event.size()) { VLOG("Failed to extract value field %d from atom %s. %d", mField, event.ToString().c_str(), (int)event.size()); return; } Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue); if (mPullTagId != -1) { // for pulled events Value diff; bool hasDiff = false; if (mIsPulled) { // Always require condition for pulled events. In the case of no condition, only pull // on bucket boundaries, in which we fake condition changes. if (mCondition == true) { if (!interval.startUpdated) { interval.start = value; interval.startUpdated = true; } else { // skip it if there is already value recorded for the start // Skip it if there is already value recorded for the start. Happens when puller // takes too long to finish. In this case we take the previous value. VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str()); } } else { Loading @@ -387,31 +422,55 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( // If not, take absolute value or drop it, based on config. if (interval.startUpdated) { if (value >= interval.start) { interval.sum += (value - interval.start); interval.hasValue = true; diff = (value - interval.start); hasDiff = true; } else { if (mUseAbsoluteValueOnReset) { interval.sum += value; interval.hasValue = true; diff = value; hasDiff = true; } else { VLOG("Dropping data for atom %d, prev: %lld, now: %lld", mPullTagId, (long long)interval.start, (long long)value); VLOG("Dropping data for atom %d, prev: %s, now: %s", mPullTagId, interval.start.toString().c_str(), value.toString().c_str()); } } interval.startUpdated = false; } else { VLOG("No start for matching end %lld", (long long)value); interval.tainted += 1; VLOG("No start for matching end %s", value.toString().c_str()); } } } else { // for pushed events, only accumulate when condition is true if (mCondition == true || mConditionTrackerIndex < 0) { interval.sum += value; } else { // for pushed events, only aggregate when sliced condition is true if (condition == true || mConditionTrackerIndex < 0) { diff = value; hasDiff = true; } } if (hasDiff) { if (interval.hasValue) { switch (mAggregationType) { case ValueMetric::SUM: // for AVG, we add up and take average when flushing the bucket case ValueMetric::AVG: interval.value += diff; break; case ValueMetric::MIN: interval.value = diff < interval.value ? diff : interval.value; break; case ValueMetric::MAX: interval.value = diff > interval.value ? diff : interval.value; break; default: break; } } else { interval.value = diff; interval.hasValue = true; } interval.sampleSize += 1; } long wholeBucketVal = interval.sum; // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = interval.value.long_value; auto prev = mCurrentFullBucket.find(eventKey); if (prev != mCurrentFullBucket.end()) { wholeBucketVal += prev->second; Loading Loading @@ -458,18 +517,15 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { // The current bucket is large enough to keep. int tainted = 0; for (const auto& slice : mCurrentSlicedBucket) { tainted += slice.second.tainted; tainted += slice.second.startUpdated; if (slice.second.hasValue) { info.mValue = slice.second.sum; info.mValueLong = slice.second.value.long_value; info.mValueDouble = (double)slice.second.value.long_value / slice.second.sampleSize; // it will auto create new vector of ValuebucketInfo if the key is not found. auto& bucketList = mPastBuckets[slice.first]; bucketList.push_back(info); } } VLOG("%d tainted pairs in the bucket", tainted); } else { mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); } Loading @@ -478,7 +534,8 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { // Accumulate partial buckets with current value and then send to anomaly tracker. if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { mCurrentFullBucket[slice.first] += slice.second.sum; // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second.value.long_value; } for (const auto& slice : mCurrentFullBucket) { for (auto& tracker : mAnomalyTrackers) { Loading @@ -493,7 +550,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { for (const auto& slice : mCurrentSlicedBucket) { for (auto& tracker : mAnomalyTrackers) { if (tracker != nullptr) { tracker->addPastBucket(slice.first, slice.second.sum, mCurrentBucketNum); // TODO: fix this when anomaly can accept double values tracker->addPastBucket(slice.first, slice.second.value.long_value, mCurrentBucketNum); } } } Loading @@ -501,7 +560,8 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { } else { // Accumulate partial bucket. for (const auto& slice : mCurrentSlicedBucket) { mCurrentFullBucket[slice.first] += slice.second.sum; // TODO: fix this when anomaly can accept double values mCurrentFullBucket[slice.first] += slice.second.value.long_value; } } Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +22 −8 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ #include "../condition/ConditionTracker.h" #include "../external/PullDataReceiver.h" #include "../external/StatsPullerManager.h" #include "../stats_log_util.h" #include "MetricProducer.h" #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h" Loading @@ -33,7 +34,8 @@ namespace statsd { struct ValueBucket { int64_t mBucketStartNs; int64_t mBucketEndNs; int64_t mValue; int64_t mValueLong; double mValueDouble; }; class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver { Loading @@ -45,6 +47,7 @@ public: virtual ~ValueMetricProducer(); // Process data pulled on bucket boundary. void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override; // ValueMetric needs special logic if it's a pulled atom. Loading Loading @@ -120,20 +123,22 @@ private: // tagId for pulled data. -1 if this is not pulled const int mPullTagId; // if this is pulled metric const bool mIsPulled; int mField; // internal state of a bucket. typedef struct { // Pulled data always come in pair of <start, end>. This holds the value // for start. The diff (end - start) is added to sum. int64_t start; // for start. The diff (end - start) is taken as the real value. Value start; // Whether the start data point is updated bool startUpdated; // If end data point comes before the start, record this pair as tainted // and the value is not added to the running sum. int tainted; // Running sum of known pairs in this bucket int64_t sum; // Current value, depending on the aggregation type. Value value; // Number of samples collected. int sampleSize; // If this dimension has any non-tainted value. If not, don't report the // dimension. bool hasValue; Loading Loading @@ -162,6 +167,10 @@ private: const bool mUseAbsoluteValueOnReset; const ValueMetric::AggregationType mAggregationType; const Type mValueType; FRIEND_TEST(ValueMetricProducerTest, TestNonDimensionalEvents); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset); Loading @@ -176,6 +185,11 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateAvg); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSum); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSumSliced); }; } // namespace statsd Loading
cmds/statsd/src/stats_log.proto +5 −1 Original line number Diff line number Diff line Loading @@ -106,7 +106,11 @@ message ValueBucketInfo { optional int64 end_bucket_elapsed_nanos = 2; optional int64 value = 3; oneof values { int64 value_long = 3; double value_double = 7; } optional int64 bucket_num = 4; Loading