Loading cmds/statsd/src/external/StatsPuller.cpp +7 −1 Original line number Diff line number Diff line Loading @@ -59,15 +59,21 @@ bool StatsPuller::Pull(const int64_t elapsedTimeNs, std::vector<std::shared_ptr< mLastPullTimeNs = elapsedTimeNs; int64_t pullStartTimeNs = getElapsedRealtimeNs(); bool ret = PullInternal(&mCachedData); if (!ret) { mCachedData.clear(); return false; } StatsdStats::getInstance().notePullTime(mTagId, getElapsedRealtimeNs() - pullStartTimeNs); for (const shared_ptr<LogEvent>& data : mCachedData) { data->setElapsedTimestampNs(elapsedTimeNs); data->setLogdWallClockTimestampNs(wallClockTimeNs); } if (ret && mCachedData.size() > 0) { if (mCachedData.size() > 0) { mapAndMergeIsolatedUidsToHostUid(mCachedData, mUidMap, mTagId); (*data) = mCachedData; } StatsdStats::getInstance().notePullDelay(mTagId, getElapsedRealtimeNs() - elapsedTimeNs); return ret; } Loading cmds/statsd/src/external/StatsPuller.h +1 −0 Original line number Diff line number Diff line Loading @@ -39,6 +39,7 @@ public: // Pulls the data. The returned data will have elapsedTimeNs set as timeNs // and will have wallClockTimeNs set as current wall clock time. // Return true if the pull is successful. bool Pull(const int64_t timeNs, std::vector<std::shared_ptr<LogEvent>>* data); // Clear cache immediately Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +41 −29 Original line number Diff line number Diff line Loading @@ -72,17 +72,15 @@ const int FIELD_ID_BUCKET_NUM = 4; const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; const Value ZERO_LONG((int64_t)0); const Value ZERO_DOUBLE((int64_t)0); // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric& metric, const int conditionIndex, const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex, const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs, const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager) ValueMetricProducer::ValueMetricProducer( const ConfigKey& key, const ValueMetric& metric, const int conditionIndex, const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex, const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs, const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager) : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard), mWhatMatcherIndex(whatMatcherIndex), mEventMatcherWizard(matcherWizard), Loading @@ -102,7 +100,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, mAggregationType(metric.aggregation_type()), mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)), mValueDirection(metric.value_direction()), mSkipZeroDiffOutput(metric.skip_zero_diff_output()) { mSkipZeroDiffOutput(metric.skip_zero_diff_output()), mUseZeroDefaultBase(metric.use_zero_default_base()), mHasGlobalBase(false) { int64_t bucketSizeMills = 0; if (metric.has_bucket()) { bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); Loading Loading @@ -302,6 +302,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, } } void ValueMetricProducer::resetBase() { for (auto& slice : mCurrentSlicedBucket) { for (auto& interval : slice.second) { interval.hasBase = false; } } mHasGlobalBase = false; } void ValueMetricProducer::onConditionChangedLocked(const bool condition, const int64_t eventTimeNs) { if (eventTimeNs < mCurrentBucketStartTimeNs) { Loading @@ -317,13 +326,10 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, pullAndMatchEventsLocked(eventTimeNs); } // when condition change from true to false, clear diff base // when condition change from true to false, clear diff base but don't // reset other counters as we may accumulate more value in the bucket. if (mUseDiff && mCondition && !condition) { for (auto& slice : mCurrentSlicedBucket) { for (auto& interval : slice.second) { interval.hasBase = false; } } resetBase(); } mCondition = condition; Loading @@ -332,15 +338,17 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { vector<std::shared_ptr<LogEvent>> allData; if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) { if (allData.size() == 0) { return; } for (const auto& data : allData) { if (mEventMatcherWizard->matchLogEvent( *data, mWhatMatcherIndex) == MatchingState::kMatched) { onMatchedLogEventLocked(mWhatMatcherIndex, *data); } } mHasGlobalBase = true; } else { // for pulled data, every pull is needed. So we reset the base if any // pull fails. resetBase(); } } Loading Loading @@ -376,6 +384,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven onMatchedLogEventLocked(mWhatMatcherIndex, *data); } } mHasGlobalBase = true; } else { VLOG("No need to commit data on condition false."); } Loading Loading @@ -486,12 +495,19 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn } if (mUseDiff) { // no base. just update base and return. if (!interval.hasBase) { if (mHasGlobalBase && mUseZeroDefaultBase) { // The bucket has global base. This key does not. // Optionally use zero as base. interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE); interval.hasBase = true; } else { // no base. just update base and return. interval.base = value; interval.hasBase = true; return; } } Value diff; switch (mValueDirection) { case ValueMetric::INCREASING: Loading Loading @@ -580,11 +596,7 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); // take base again in future good bucket. for (auto& slice : mCurrentSlicedBucket) { for (auto& interval : slice.second) { interval.hasBase = false; } } resetBase(); } VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, (long long)mCurrentBucketStartTimeNs); Loading cmds/statsd/src/metrics/ValueMetricProducer.h +17 −0 Original line number Diff line number Diff line Loading @@ -148,6 +148,9 @@ private: void pullAndMatchEventsLocked(const int64_t timestampNs); // Reset diff base and mHasGlobalBase void resetBase(); static const size_t kBucketSize = sizeof(ValueBucket{}); const size_t mDimensionSoftLimit; Loading @@ -164,6 +167,18 @@ private: const bool mSkipZeroDiffOutput; // If true, use a zero value as base to compute the diff. // This is used for new keys which are present in the new data but was not // present in the base data. // The default base will only be used if we have a global base. const bool mUseZeroDefaultBase; // For pulled metrics, this is always set to true whenever a pull succeeds. // It is set to false when a pull fails, or upon condition change to false. // This is used to decide if we have the right base data to compute the // diff against. bool mHasGlobalBase; FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset); Loading @@ -185,6 +200,8 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket); FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime); FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); }; } // namespace statsd Loading cmds/statsd/src/statsd_config.proto +2 −0 Original line number Diff line number Diff line Loading @@ -274,6 +274,8 @@ message ValueMetric { optional bool use_diff = 12; optional bool use_zero_default_base = 15 [default = false]; enum ValueDirection { UNKNOWN = 0; INCREASING = 1; Loading Loading
cmds/statsd/src/external/StatsPuller.cpp +7 −1 Original line number Diff line number Diff line Loading @@ -59,15 +59,21 @@ bool StatsPuller::Pull(const int64_t elapsedTimeNs, std::vector<std::shared_ptr< mLastPullTimeNs = elapsedTimeNs; int64_t pullStartTimeNs = getElapsedRealtimeNs(); bool ret = PullInternal(&mCachedData); if (!ret) { mCachedData.clear(); return false; } StatsdStats::getInstance().notePullTime(mTagId, getElapsedRealtimeNs() - pullStartTimeNs); for (const shared_ptr<LogEvent>& data : mCachedData) { data->setElapsedTimestampNs(elapsedTimeNs); data->setLogdWallClockTimestampNs(wallClockTimeNs); } if (ret && mCachedData.size() > 0) { if (mCachedData.size() > 0) { mapAndMergeIsolatedUidsToHostUid(mCachedData, mUidMap, mTagId); (*data) = mCachedData; } StatsdStats::getInstance().notePullDelay(mTagId, getElapsedRealtimeNs() - elapsedTimeNs); return ret; } Loading
cmds/statsd/src/external/StatsPuller.h +1 −0 Original line number Diff line number Diff line Loading @@ -39,6 +39,7 @@ public: // Pulls the data. The returned data will have elapsedTimeNs set as timeNs // and will have wallClockTimeNs set as current wall clock time. // Return true if the pull is successful. bool Pull(const int64_t timeNs, std::vector<std::shared_ptr<LogEvent>>* data); // Clear cache immediately Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +41 −29 Original line number Diff line number Diff line Loading @@ -72,17 +72,15 @@ const int FIELD_ID_BUCKET_NUM = 4; const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; const Value ZERO_LONG((int64_t)0); const Value ZERO_DOUBLE((int64_t)0); // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric& metric, const int conditionIndex, const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex, const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs, const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager) ValueMetricProducer::ValueMetricProducer( const ConfigKey& key, const ValueMetric& metric, const int conditionIndex, const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex, const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs, const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager) : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard), mWhatMatcherIndex(whatMatcherIndex), mEventMatcherWizard(matcherWizard), Loading @@ -102,7 +100,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, mAggregationType(metric.aggregation_type()), mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)), mValueDirection(metric.value_direction()), mSkipZeroDiffOutput(metric.skip_zero_diff_output()) { mSkipZeroDiffOutput(metric.skip_zero_diff_output()), mUseZeroDefaultBase(metric.use_zero_default_base()), mHasGlobalBase(false) { int64_t bucketSizeMills = 0; if (metric.has_bucket()) { bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); Loading Loading @@ -302,6 +302,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, } } void ValueMetricProducer::resetBase() { for (auto& slice : mCurrentSlicedBucket) { for (auto& interval : slice.second) { interval.hasBase = false; } } mHasGlobalBase = false; } void ValueMetricProducer::onConditionChangedLocked(const bool condition, const int64_t eventTimeNs) { if (eventTimeNs < mCurrentBucketStartTimeNs) { Loading @@ -317,13 +326,10 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, pullAndMatchEventsLocked(eventTimeNs); } // when condition change from true to false, clear diff base // when condition change from true to false, clear diff base but don't // reset other counters as we may accumulate more value in the bucket. if (mUseDiff && mCondition && !condition) { for (auto& slice : mCurrentSlicedBucket) { for (auto& interval : slice.second) { interval.hasBase = false; } } resetBase(); } mCondition = condition; Loading @@ -332,15 +338,17 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { vector<std::shared_ptr<LogEvent>> allData; if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) { if (allData.size() == 0) { return; } for (const auto& data : allData) { if (mEventMatcherWizard->matchLogEvent( *data, mWhatMatcherIndex) == MatchingState::kMatched) { onMatchedLogEventLocked(mWhatMatcherIndex, *data); } } mHasGlobalBase = true; } else { // for pulled data, every pull is needed. So we reset the base if any // pull fails. resetBase(); } } Loading Loading @@ -376,6 +384,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven onMatchedLogEventLocked(mWhatMatcherIndex, *data); } } mHasGlobalBase = true; } else { VLOG("No need to commit data on condition false."); } Loading Loading @@ -486,12 +495,19 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn } if (mUseDiff) { // no base. just update base and return. if (!interval.hasBase) { if (mHasGlobalBase && mUseZeroDefaultBase) { // The bucket has global base. This key does not. // Optionally use zero as base. interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE); interval.hasBase = true; } else { // no base. just update base and return. interval.base = value; interval.hasBase = true; return; } } Value diff; switch (mValueDirection) { case ValueMetric::INCREASING: Loading Loading @@ -580,11 +596,7 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); // take base again in future good bucket. for (auto& slice : mCurrentSlicedBucket) { for (auto& interval : slice.second) { interval.hasBase = false; } } resetBase(); } VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, (long long)mCurrentBucketStartTimeNs); Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +17 −0 Original line number Diff line number Diff line Loading @@ -148,6 +148,9 @@ private: void pullAndMatchEventsLocked(const int64_t timestampNs); // Reset diff base and mHasGlobalBase void resetBase(); static const size_t kBucketSize = sizeof(ValueBucket{}); const size_t mDimensionSoftLimit; Loading @@ -164,6 +167,18 @@ private: const bool mSkipZeroDiffOutput; // If true, use a zero value as base to compute the diff. // This is used for new keys which are present in the new data but was not // present in the base data. // The default base will only be used if we have a global base. const bool mUseZeroDefaultBase; // For pulled metrics, this is always set to true whenever a pull succeeds. // It is set to false when a pull fails, or upon condition change to false. // This is used to decide if we have the right base data to compute the // diff against. bool mHasGlobalBase; FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset); Loading @@ -185,6 +200,8 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket); FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime); FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); }; } // namespace statsd Loading
cmds/statsd/src/statsd_config.proto +2 −0 Original line number Diff line number Diff line Loading @@ -274,6 +274,8 @@ message ValueMetric { optional bool use_diff = 12; optional bool use_zero_default_base = 15 [default = false]; enum ValueDirection { UNKNOWN = 0; INCREASING = 1; Loading