Loading cmds/statsd/src/FieldValue.cpp +20 −0 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ #include "Log.h" #include "FieldValue.h" #include "HashableDimensionKey.h" #include "math.h" namespace android { namespace os { Loading Loading @@ -174,6 +175,25 @@ std::string Value::toString() const { } } bool Value::isZero() const { switch (type) { case INT: return int_value == 0; case LONG: return long_value == 0; case FLOAT: return fabs(float_value) <= std::numeric_limits<float>::epsilon(); case DOUBLE: return fabs(double_value) <= std::numeric_limits<double>::epsilon(); case STRING: return str_value.size() == 0; case STORAGE: return storage_value.size() == 0; default: return false; } } bool Value::operator==(const Value& that) const { if (type != that.getType()) return false; Loading cmds/statsd/src/FieldValue.h +2 −0 Original line number Diff line number Diff line Loading @@ -331,6 +331,8 @@ struct Value { std::string toString() const; bool isZero() const; Type getType() const { return type; } Loading cmds/statsd/src/guardrail/StatsdStats.cpp +11 −4 Original line number Diff line number Diff line Loading @@ -362,11 +362,17 @@ void StatsdStats::notePullDelay(int pullAtomId, int64_t pullDelayNs) { lock_guard<std::mutex> lock(mLock); auto& pullStats = mPulledAtomStats[pullAtomId]; pullStats.maxPullDelayNs = std::max(pullStats.maxPullDelayNs, pullDelayNs); pullStats.avgPullDelayNs = (pullStats.avgPullDelayNs * pullStats.numPullDelay + pullDelayNs) / pullStats.avgPullDelayNs = (pullStats.avgPullDelayNs * pullStats.numPullDelay + pullDelayNs) / (pullStats.numPullDelay + 1); pullStats.numPullDelay += 1; } void StatsdStats::notePullDataError(int pullAtomId) { lock_guard<std::mutex> lock(mLock); mPulledAtomStats[pullAtomId].dataError++; } void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) { lock_guard<std::mutex> lock(mLock); Loading Loading @@ -422,6 +428,7 @@ void StatsdStats::resetInternalLocked() { pullStats.second.avgPullDelayNs = 0; pullStats.second.maxPullDelayNs = 0; pullStats.second.numPullDelay = 0; pullStats.second.dataError = 0; } } Loading Loading @@ -530,11 +537,11 @@ void StatsdStats::dumpStats(int out) const { dprintf(out, "Atom %d->(total pull)%ld, (pull from cache)%ld, (min pull interval)%ld, (average " "pull time nanos)%lld, (max pull time nanos)%lld, (average pull delay nanos)%lld, " "(max pull delay nanos)%lld\n", "(max pull delay nanos)%lld, (data error)%ld\n", (int)pair.first, (long)pair.second.totalPull, (long)pair.second.totalPullFromCache, (long)pair.second.minPullIntervalSec, (long long)pair.second.avgPullTimeNs, (long long)pair.second.maxPullTimeNs, (long long)pair.second.avgPullDelayNs, (long long)pair.second.maxPullDelayNs); (long long)pair.second.maxPullDelayNs, pair.second.dataError); } if (mAnomalyAlarmRegisteredStats > 0) { Loading cmds/statsd/src/guardrail/StatsdStats.h +6 −0 Original line number Diff line number Diff line Loading @@ -278,6 +278,11 @@ public: */ void notePullFromCache(int pullAtomId); /* * Notify data error for pulled atom. */ void notePullDataError(int pullAtomId); /* * Records time for actual pulling, not including those served from cache and not including * statsd processing delays. Loading Loading @@ -329,6 +334,7 @@ public: int64_t avgPullDelayNs = 0; int64_t maxPullDelayNs = 0; long numPullDelay = 0; long dataError = 0; } PulledAtomStats; private: Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +151 −108 Original line number Diff line number Diff line Loading @@ -92,7 +92,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric : StatsdStats::kDimensionKeySizeHardLimit), mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()), mAggregationType(metric.aggregation_type()), mValueType(metric.aggregation_type() == ValueMetric::AVG ? DOUBLE : LONG) { mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)), mValueDirection(metric.value_direction()), mSkipZeroDiffOutput(metric.skip_zero_diff_output()) { int64_t bucketSizeMills = 0; if (metric.has_bucket()) { bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); Loading Loading @@ -128,21 +130,22 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric HasPositionALL(metric.dimensions_in_condition()); flushIfNeededLocked(startTimeNs); // Kicks off the puller immediately. if (mIsPulled) { mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(), mBucketSizeNs); } // TODO: Only do this for partial buckets like first bucket. All other buckets should use // Only do this for partial buckets like first bucket. All other buckets should use // flushIfNeeded to adjust start and end to bucket boundaries. // Adjust start for partial bucket mCurrentBucketStartTimeNs = startTimeNs; if (mIsPulled) { // Kicks off the puller immediately if condition is true and diff based. if (mIsPulled && mCondition && mUseDiff) { pullLocked(startTimeNs); } VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs); VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs); } ValueMetricProducer::~ValueMetricProducer() { Loading Loading @@ -188,14 +191,14 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, // Fills the dimension path if not slicing by ALL. if (!mSliceByPositionALL) { if (!mDimensionsInWhat.empty()) { uint64_t dimenPathToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); uint64_t dimenPathToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); writeDimensionPathToProto(mDimensionsInWhat, protoOutput); protoOutput->end(dimenPathToken); } if (!mDimensionsInCondition.empty()) { uint64_t dimenPathToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); uint64_t dimenPathToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); writeDimensionPathToProto(mDimensionsInCondition, protoOutput); protoOutput->end(dimenPathToken); } Loading @@ -221,15 +224,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, // First fill dimension. if (mSliceByPositionALL) { uint64_t dimensionToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); uint64_t dimensionToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput); protoOutput->end(dimensionToken); if (dimensionKey.hasDimensionKeyInCondition()) { uint64_t dimensionInConditionToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set, protoOutput); uint64_t dimensionInConditionToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set, protoOutput); protoOutput->end(dimensionInConditionToken); } } else { Loading @@ -237,8 +240,8 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput); if (dimensionKey.hasDimensionKeyInCondition()) { writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(), FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set, protoOutput); FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set, protoOutput); } } Loading @@ -256,15 +259,20 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } if (mValueType == LONG) { if (bucket.value.getType() == LONG) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)bucket.mValueLong); (long long)bucket.value.long_value); VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.value.long_value); } else if (bucket.value.getType() == DOUBLE) { protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.value.double_value); VLOG("\t bucket [%lld - %lld] count: %.2f", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, bucket.value.double_value); } else { protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.mValueDouble); VLOG("Wrong value type for ValueMetric output: %d", bucket.value.getType()); } protoOutput->end(bucketInfoToken); VLOG("\t bucket [%lld - %lld] count: %lld, %.2f", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.mValueLong, bucket.mValueDouble); } protoOutput->end(wrapperToken); } Loading @@ -279,8 +287,6 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, void ValueMetricProducer::onConditionChangedLocked(const bool condition, const int64_t eventTimeNs) { mCondition = condition; if (eventTimeNs < mCurrentBucketStartTimeNs) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); Loading @@ -289,9 +295,19 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, flushIfNeededLocked(eventTimeNs); if (mIsPulled) { // Pull on condition changes. if (mIsPulled && (mCondition != condition)) { pullLocked(eventTimeNs); } // when condition change from true to false, clear diff base if (mUseDiff && mCondition && !condition) { for (auto& slice : mCurrentSlicedBucket) { slice.second.hasBase = false; } } mCondition = condition; } void ValueMetricProducer::pullLocked(const int64_t timestampNs) { Loading @@ -306,30 +322,33 @@ void ValueMetricProducer::pullLocked(const int64_t timestampNs) { } } int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; } void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { std::lock_guard<std::mutex> lock(mMutex); if (mCondition == true || mConditionTrackerIndex < 0) { if (mCondition) { if (allData.size() == 0) { return; } // For scheduled pulled data, the effective event time is snap to the nearest // bucket boundary to make bucket finalize. // bucket end. In the case of waking up from a deep sleep state, we will // attribute to the previous bucket end. If the sleep was long but not very long, we // will be in the immediate next bucket. Previous bucket may get a larger number as // we pull at a later time than real bucket end. // If the sleep was very long, we skip more than one bucket before sleep. In this case, // if the diff base will be cleared and this new data will serve as new diff base. int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs(); int64_t eventTime = mTimeBaseNs + ((realEventTime - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; // close the end of the bucket mCondition = false; for (const auto& data : allData) { data->setElapsedTimestampNs(eventTime - 1); onMatchedLogEventLocked(0, *data); int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1; if (bucketEndTime < mCurrentBucketStartTimeNs) { VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime, (long long)mCurrentBucketStartTimeNs); return; } // start a new bucket mCondition = true; for (const auto& data : allData) { data->setElapsedTimestampNs(eventTime); data->setElapsedTimestampNs(bucketEndTime); onMatchedLogEventLocked(0, *data); } } Loading Loading @@ -363,8 +382,8 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. if (newTupleCount > mDimensionHardLimit) { ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId, newKey.toString().c_str()); ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId, newKey.toString().c_str()); return true; } } Loading Loading @@ -393,10 +412,10 @@ const Value getDoubleOrLong(const Value& value) { return v; } void ValueMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const MetricDimensionKey& eventKey, const ConditionKey& conditionKey, bool condition, const LogEvent& event) { void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex, const MetricDimensionKey& eventKey, const ConditionKey& conditionKey, bool condition, const LogEvent& event) { int64_t eventTimeNs = event.GetElapsedTimestampNs(); if (eventTimeNs < mCurrentBucketStartTimeNs) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, Loading @@ -406,6 +425,14 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( flushIfNeededLocked(eventTimeNs); // For pulled data, we already check condition when we decide to pull or // in onDataPulled. So take all of them. // For pushed data, just check condition. if (!(mIsPulled || condition)) { VLOG("ValueMetric skip event because condition is false"); return; } if (hitGuardRailLocked(eventKey)) { return; } Loading @@ -418,71 +445,70 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( } Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue); Value diff; bool hasDiff = false; if (mIsPulled) { // Always require condition for pulled events. In the case of no condition, only pull // on bucket boundaries, in which we fake condition changes. if (mCondition == true) { if (!interval.startUpdated) { interval.start = value; interval.startUpdated = true; } else { // Skip it if there is already value recorded for the start. Happens when puller // takes too long to finish. In this case we take the previous value. VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str()); if (mUseDiff) { // no base. just update base and return. if (!interval.hasBase) { interval.base = value; interval.hasBase = true; return; } } else { // Generally we expect value to be monotonically increasing. // If not, take absolute value or drop it, based on config. if (interval.startUpdated) { if (value >= interval.start) { diff = (value - interval.start); hasDiff = true; } else { if (mUseAbsoluteValueOnReset) { Value diff; switch (mValueDirection) { case ValueMetric::INCREASING: if (value >= interval.base) { diff = value - interval.base; } else if (mUseAbsoluteValueOnReset) { diff = value; hasDiff = true; } else { VLOG("Dropping data for atom %d, prev: %s, now: %s", mPullTagId, interval.start.toString().c_str(), value.toString().c_str()); } VLOG("Unexpected decreasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); interval.base = value; return; } interval.startUpdated = false; break; case ValueMetric::DECREASING: if (interval.base >= value) { diff = interval.base - value; } else if (mUseAbsoluteValueOnReset) { diff = value; } else { VLOG("No start for matching end %s", value.toString().c_str()); } VLOG("Unexpected increasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); interval.base = value; return; } } else { // for pushed events, only aggregate when sliced condition is true if (condition == true || mConditionTrackerIndex < 0) { diff = value; hasDiff = true; break; case ValueMetric::ANY: diff = value - interval.base; break; default: break; } interval.base = value; value = diff; } if (hasDiff) { if (interval.hasValue) { switch (mAggregationType) { case ValueMetric::SUM: // for AVG, we add up and take average when flushing the bucket case ValueMetric::AVG: interval.value += diff; interval.value += value; break; case ValueMetric::MIN: interval.value = diff < interval.value ? diff : interval.value; interval.value = std::min(value, interval.value); break; case ValueMetric::MAX: interval.value = diff > interval.value ? diff : interval.value; interval.value = std::max(value, interval.value); break; default: break; } } else { interval.value = diff; interval.value = value; interval.hasValue = true; } interval.sampleSize += 1; } // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = interval.value.long_value; Loading Loading @@ -512,6 +538,10 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); // take base again in future good bucket. for (auto& slice : mCurrentSlicedBucket) { slice.second.hasBase = false; } } VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, (long long)mCurrentBucketStartTimeNs); Loading @@ -534,8 +564,18 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { if (slice.second.hasValue) { info.mValueLong = slice.second.value.long_value; info.mValueDouble = (double)slice.second.value.long_value / slice.second.sampleSize; // skip the output if the diff is zero if (mSkipZeroDiffOutput && mUseDiff && slice.second.value.isZero()) { continue; } if (mAggregationType != ValueMetric::AVG) { info.value = slice.second.value; } else { double sum = slice.second.value.type == LONG ? (double)slice.second.value.long_value : slice.second.value.double_value; info.value.setDouble(sum / slice.second.sampleSize); } // it will auto create new vector of ValuebucketInfo if the key is not found. auto& bucketList = mPastBuckets[slice.first]; bucketList.push_back(info); Loading Loading @@ -581,7 +621,10 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { } // Reset counters mCurrentSlicedBucket.clear(); for (auto& slice : mCurrentSlicedBucket) { slice.second.hasValue = false; slice.second.sampleSize = 0; } } size_t ValueMetricProducer::byteSizeLocked() const { Loading Loading
cmds/statsd/src/FieldValue.cpp +20 −0 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ #include "Log.h" #include "FieldValue.h" #include "HashableDimensionKey.h" #include "math.h" namespace android { namespace os { Loading Loading @@ -174,6 +175,25 @@ std::string Value::toString() const { } } bool Value::isZero() const { switch (type) { case INT: return int_value == 0; case LONG: return long_value == 0; case FLOAT: return fabs(float_value) <= std::numeric_limits<float>::epsilon(); case DOUBLE: return fabs(double_value) <= std::numeric_limits<double>::epsilon(); case STRING: return str_value.size() == 0; case STORAGE: return storage_value.size() == 0; default: return false; } } bool Value::operator==(const Value& that) const { if (type != that.getType()) return false; Loading
cmds/statsd/src/FieldValue.h +2 −0 Original line number Diff line number Diff line Loading @@ -331,6 +331,8 @@ struct Value { std::string toString() const; bool isZero() const; Type getType() const { return type; } Loading
cmds/statsd/src/guardrail/StatsdStats.cpp +11 −4 Original line number Diff line number Diff line Loading @@ -362,11 +362,17 @@ void StatsdStats::notePullDelay(int pullAtomId, int64_t pullDelayNs) { lock_guard<std::mutex> lock(mLock); auto& pullStats = mPulledAtomStats[pullAtomId]; pullStats.maxPullDelayNs = std::max(pullStats.maxPullDelayNs, pullDelayNs); pullStats.avgPullDelayNs = (pullStats.avgPullDelayNs * pullStats.numPullDelay + pullDelayNs) / pullStats.avgPullDelayNs = (pullStats.avgPullDelayNs * pullStats.numPullDelay + pullDelayNs) / (pullStats.numPullDelay + 1); pullStats.numPullDelay += 1; } void StatsdStats::notePullDataError(int pullAtomId) { lock_guard<std::mutex> lock(mLock); mPulledAtomStats[pullAtomId].dataError++; } void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) { lock_guard<std::mutex> lock(mLock); Loading Loading @@ -422,6 +428,7 @@ void StatsdStats::resetInternalLocked() { pullStats.second.avgPullDelayNs = 0; pullStats.second.maxPullDelayNs = 0; pullStats.second.numPullDelay = 0; pullStats.second.dataError = 0; } } Loading Loading @@ -530,11 +537,11 @@ void StatsdStats::dumpStats(int out) const { dprintf(out, "Atom %d->(total pull)%ld, (pull from cache)%ld, (min pull interval)%ld, (average " "pull time nanos)%lld, (max pull time nanos)%lld, (average pull delay nanos)%lld, " "(max pull delay nanos)%lld\n", "(max pull delay nanos)%lld, (data error)%ld\n", (int)pair.first, (long)pair.second.totalPull, (long)pair.second.totalPullFromCache, (long)pair.second.minPullIntervalSec, (long long)pair.second.avgPullTimeNs, (long long)pair.second.maxPullTimeNs, (long long)pair.second.avgPullDelayNs, (long long)pair.second.maxPullDelayNs); (long long)pair.second.maxPullDelayNs, pair.second.dataError); } if (mAnomalyAlarmRegisteredStats > 0) { Loading
cmds/statsd/src/guardrail/StatsdStats.h +6 −0 Original line number Diff line number Diff line Loading @@ -278,6 +278,11 @@ public: */ void notePullFromCache(int pullAtomId); /* * Notify data error for pulled atom. */ void notePullDataError(int pullAtomId); /* * Records time for actual pulling, not including those served from cache and not including * statsd processing delays. Loading Loading @@ -329,6 +334,7 @@ public: int64_t avgPullDelayNs = 0; int64_t maxPullDelayNs = 0; long numPullDelay = 0; long dataError = 0; } PulledAtomStats; private: Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +151 −108 Original line number Diff line number Diff line Loading @@ -92,7 +92,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric : StatsdStats::kDimensionKeySizeHardLimit), mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()), mAggregationType(metric.aggregation_type()), mValueType(metric.aggregation_type() == ValueMetric::AVG ? DOUBLE : LONG) { mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)), mValueDirection(metric.value_direction()), mSkipZeroDiffOutput(metric.skip_zero_diff_output()) { int64_t bucketSizeMills = 0; if (metric.has_bucket()) { bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); Loading Loading @@ -128,21 +130,22 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric HasPositionALL(metric.dimensions_in_condition()); flushIfNeededLocked(startTimeNs); // Kicks off the puller immediately. if (mIsPulled) { mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(), mBucketSizeNs); } // TODO: Only do this for partial buckets like first bucket. All other buckets should use // Only do this for partial buckets like first bucket. All other buckets should use // flushIfNeeded to adjust start and end to bucket boundaries. // Adjust start for partial bucket mCurrentBucketStartTimeNs = startTimeNs; if (mIsPulled) { // Kicks off the puller immediately if condition is true and diff based. if (mIsPulled && mCondition && mUseDiff) { pullLocked(startTimeNs); } VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs); VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs); } ValueMetricProducer::~ValueMetricProducer() { Loading Loading @@ -188,14 +191,14 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, // Fills the dimension path if not slicing by ALL. if (!mSliceByPositionALL) { if (!mDimensionsInWhat.empty()) { uint64_t dimenPathToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); uint64_t dimenPathToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); writeDimensionPathToProto(mDimensionsInWhat, protoOutput); protoOutput->end(dimenPathToken); } if (!mDimensionsInCondition.empty()) { uint64_t dimenPathToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); uint64_t dimenPathToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); writeDimensionPathToProto(mDimensionsInCondition, protoOutput); protoOutput->end(dimenPathToken); } Loading @@ -221,15 +224,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, // First fill dimension. if (mSliceByPositionALL) { uint64_t dimensionToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); uint64_t dimensionToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput); protoOutput->end(dimensionToken); if (dimensionKey.hasDimensionKeyInCondition()) { uint64_t dimensionInConditionToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set, protoOutput); uint64_t dimensionInConditionToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set, protoOutput); protoOutput->end(dimensionInConditionToken); } } else { Loading @@ -237,8 +240,8 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput); if (dimensionKey.hasDimensionKeyInCondition()) { writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(), FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set, protoOutput); FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set, protoOutput); } } Loading @@ -256,15 +259,20 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } if (mValueType == LONG) { if (bucket.value.getType() == LONG) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)bucket.mValueLong); (long long)bucket.value.long_value); VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.value.long_value); } else if (bucket.value.getType() == DOUBLE) { protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.value.double_value); VLOG("\t bucket [%lld - %lld] count: %.2f", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, bucket.value.double_value); } else { protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.mValueDouble); VLOG("Wrong value type for ValueMetric output: %d", bucket.value.getType()); } protoOutput->end(bucketInfoToken); VLOG("\t bucket [%lld - %lld] count: %lld, %.2f", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.mValueLong, bucket.mValueDouble); } protoOutput->end(wrapperToken); } Loading @@ -279,8 +287,6 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, void ValueMetricProducer::onConditionChangedLocked(const bool condition, const int64_t eventTimeNs) { mCondition = condition; if (eventTimeNs < mCurrentBucketStartTimeNs) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); Loading @@ -289,9 +295,19 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, flushIfNeededLocked(eventTimeNs); if (mIsPulled) { // Pull on condition changes. if (mIsPulled && (mCondition != condition)) { pullLocked(eventTimeNs); } // when condition change from true to false, clear diff base if (mUseDiff && mCondition && !condition) { for (auto& slice : mCurrentSlicedBucket) { slice.second.hasBase = false; } } mCondition = condition; } void ValueMetricProducer::pullLocked(const int64_t timestampNs) { Loading @@ -306,30 +322,33 @@ void ValueMetricProducer::pullLocked(const int64_t timestampNs) { } } int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; } void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { std::lock_guard<std::mutex> lock(mMutex); if (mCondition == true || mConditionTrackerIndex < 0) { if (mCondition) { if (allData.size() == 0) { return; } // For scheduled pulled data, the effective event time is snap to the nearest // bucket boundary to make bucket finalize. // bucket end. In the case of waking up from a deep sleep state, we will // attribute to the previous bucket end. If the sleep was long but not very long, we // will be in the immediate next bucket. Previous bucket may get a larger number as // we pull at a later time than real bucket end. // If the sleep was very long, we skip more than one bucket before sleep. In this case, // if the diff base will be cleared and this new data will serve as new diff base. int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs(); int64_t eventTime = mTimeBaseNs + ((realEventTime - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; // close the end of the bucket mCondition = false; for (const auto& data : allData) { data->setElapsedTimestampNs(eventTime - 1); onMatchedLogEventLocked(0, *data); int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1; if (bucketEndTime < mCurrentBucketStartTimeNs) { VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime, (long long)mCurrentBucketStartTimeNs); return; } // start a new bucket mCondition = true; for (const auto& data : allData) { data->setElapsedTimestampNs(eventTime); data->setElapsedTimestampNs(bucketEndTime); onMatchedLogEventLocked(0, *data); } } Loading Loading @@ -363,8 +382,8 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. if (newTupleCount > mDimensionHardLimit) { ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId, newKey.toString().c_str()); ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId, newKey.toString().c_str()); return true; } } Loading Loading @@ -393,10 +412,10 @@ const Value getDoubleOrLong(const Value& value) { return v; } void ValueMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const MetricDimensionKey& eventKey, const ConditionKey& conditionKey, bool condition, const LogEvent& event) { void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex, const MetricDimensionKey& eventKey, const ConditionKey& conditionKey, bool condition, const LogEvent& event) { int64_t eventTimeNs = event.GetElapsedTimestampNs(); if (eventTimeNs < mCurrentBucketStartTimeNs) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, Loading @@ -406,6 +425,14 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( flushIfNeededLocked(eventTimeNs); // For pulled data, we already check condition when we decide to pull or // in onDataPulled. So take all of them. // For pushed data, just check condition. if (!(mIsPulled || condition)) { VLOG("ValueMetric skip event because condition is false"); return; } if (hitGuardRailLocked(eventKey)) { return; } Loading @@ -418,71 +445,70 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( } Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue); Value diff; bool hasDiff = false; if (mIsPulled) { // Always require condition for pulled events. In the case of no condition, only pull // on bucket boundaries, in which we fake condition changes. if (mCondition == true) { if (!interval.startUpdated) { interval.start = value; interval.startUpdated = true; } else { // Skip it if there is already value recorded for the start. Happens when puller // takes too long to finish. In this case we take the previous value. VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str()); if (mUseDiff) { // no base. just update base and return. if (!interval.hasBase) { interval.base = value; interval.hasBase = true; return; } } else { // Generally we expect value to be monotonically increasing. // If not, take absolute value or drop it, based on config. if (interval.startUpdated) { if (value >= interval.start) { diff = (value - interval.start); hasDiff = true; } else { if (mUseAbsoluteValueOnReset) { Value diff; switch (mValueDirection) { case ValueMetric::INCREASING: if (value >= interval.base) { diff = value - interval.base; } else if (mUseAbsoluteValueOnReset) { diff = value; hasDiff = true; } else { VLOG("Dropping data for atom %d, prev: %s, now: %s", mPullTagId, interval.start.toString().c_str(), value.toString().c_str()); } VLOG("Unexpected decreasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); interval.base = value; return; } interval.startUpdated = false; break; case ValueMetric::DECREASING: if (interval.base >= value) { diff = interval.base - value; } else if (mUseAbsoluteValueOnReset) { diff = value; } else { VLOG("No start for matching end %s", value.toString().c_str()); } VLOG("Unexpected increasing value"); StatsdStats::getInstance().notePullDataError(mPullTagId); interval.base = value; return; } } else { // for pushed events, only aggregate when sliced condition is true if (condition == true || mConditionTrackerIndex < 0) { diff = value; hasDiff = true; break; case ValueMetric::ANY: diff = value - interval.base; break; default: break; } interval.base = value; value = diff; } if (hasDiff) { if (interval.hasValue) { switch (mAggregationType) { case ValueMetric::SUM: // for AVG, we add up and take average when flushing the bucket case ValueMetric::AVG: interval.value += diff; interval.value += value; break; case ValueMetric::MIN: interval.value = diff < interval.value ? diff : interval.value; interval.value = std::min(value, interval.value); break; case ValueMetric::MAX: interval.value = diff > interval.value ? diff : interval.value; interval.value = std::max(value, interval.value); break; default: break; } } else { interval.value = diff; interval.value = value; interval.hasValue = true; } interval.sampleSize += 1; } // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = interval.value.long_value; Loading Loading @@ -512,6 +538,10 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); // take base again in future good bucket. for (auto& slice : mCurrentSlicedBucket) { slice.second.hasBase = false; } } VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, (long long)mCurrentBucketStartTimeNs); Loading @@ -534,8 +564,18 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { if (slice.second.hasValue) { info.mValueLong = slice.second.value.long_value; info.mValueDouble = (double)slice.second.value.long_value / slice.second.sampleSize; // skip the output if the diff is zero if (mSkipZeroDiffOutput && mUseDiff && slice.second.value.isZero()) { continue; } if (mAggregationType != ValueMetric::AVG) { info.value = slice.second.value; } else { double sum = slice.second.value.type == LONG ? (double)slice.second.value.long_value : slice.second.value.double_value; info.value.setDouble(sum / slice.second.sampleSize); } // it will auto create new vector of ValuebucketInfo if the key is not found. auto& bucketList = mPastBuckets[slice.first]; bucketList.push_back(info); Loading Loading @@ -581,7 +621,10 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { } // Reset counters mCurrentSlicedBucket.clear(); for (auto& slice : mCurrentSlicedBucket) { slice.second.hasValue = false; slice.second.sampleSize = 0; } } size_t ValueMetricProducer::byteSizeLocked() const { Loading