Loading cmds/statsd/src/condition/ConditionTimer.h +36 −12 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ class ConditionTimer { public: explicit ConditionTimer(bool initCondition, int64_t bucketStartNs) : mCondition(initCondition) { if (initCondition) { mLastConditionTrueTimestampNs = bucketStartNs; mLastConditionChangeTimestampNs = bucketStartNs; } }; Loading @@ -44,21 +44,46 @@ public: // When a new bucket is created, this value will be reset to 0. int64_t mTimerNs = 0; // Last elapsed real timestamp when condition turned to true // When a new bucket is created and the condition is true, then the timestamp is set // to be the bucket start timestamp. int64_t mLastConditionTrueTimestampNs = 0; // Last elapsed real timestamp when condition changed. int64_t mLastConditionChangeTimestampNs = 0; bool mCondition = false; int64_t newBucketStart(int64_t nextBucketStartNs) { if (mCondition) { mTimerNs += (nextBucketStartNs - mLastConditionTrueTimestampNs); mLastConditionTrueTimestampNs = nextBucketStartNs; // Normally, the next bucket happens after the last condition // change. In this case, add the time between the condition becoming // true to the next bucket start time. // Otherwise, the next bucket start time is before the last // condition change time, this means that the condition was false at // the bucket boundary before the condition became true, so the // timer should not get updated and the last condition change time // remains as is. if (nextBucketStartNs >= mLastConditionChangeTimestampNs) { mTimerNs += (nextBucketStartNs - mLastConditionChangeTimestampNs); mLastConditionChangeTimestampNs = nextBucketStartNs; } } else if (mLastConditionChangeTimestampNs > nextBucketStartNs) { // The next bucket start time is before the last condition change // time, this means that the condition was true at the bucket // boundary before the condition became false, so adjust the timer // to match how long the condition was true to the bucket boundary. // This means remove the amount the condition stayed true in the // next bucket from the current bucket. mTimerNs -= (mLastConditionChangeTimestampNs - nextBucketStartNs); } int64_t temp = mTimerNs; mTimerNs = 0; if (!mCondition && (mLastConditionChangeTimestampNs > nextBucketStartNs)) { // The next bucket start time is before the last condition change // time, this means that the condition was true at the bucket // boundary and remained true in the next bucket up to the condition // change to false, so adjust the timer to match how long the // condition stayed true in the next bucket (now the current bucket). mTimerNs = mLastConditionChangeTimestampNs - nextBucketStartNs; } return temp; } Loading @@ -67,11 +92,10 @@ public: return; } mCondition = newCondition; if (newCondition) { mLastConditionTrueTimestampNs = timestampNs; } else { mTimerNs += (timestampNs - mLastConditionTrueTimestampNs); if (newCondition == false) { mTimerNs += (timestampNs - mLastConditionChangeTimestampNs); } mLastConditionChangeTimestampNs = timestampNs; } FRIEND_TEST(ConditionTimerTest, TestTimer_Inital_False); Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +96 −19 Original line number Diff line number Diff line Loading @@ -301,7 +301,6 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME, (long long)(NanoToMillis(dropEvent.dropTimeNs))); ; protoOutput->end(dropEventToken); } protoOutput->end(wrapperToken); Loading Loading @@ -346,8 +345,11 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } // only write the condition timer value if the metric has a condition. if (mConditionTrackerIndex >= 0) { // We only write the condition timer value if the metric has a // condition and/or is sliced by state. // If the metric is sliced by state, the condition timer value is // also sliced by state to reflect time spent in that state. if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS, (long long)bucket.mConditionTrueNs); } Loading Loading @@ -454,6 +456,8 @@ void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) // Let condition timer know of new active state. mConditionTimer.onConditionChanged(mIsActive, eventTimeNs); updateCurrentSlicedBucketConditionTimers(mIsActive, eventTimeNs); } void ValueMetricProducer::onConditionChangedLocked(const bool condition, Loading @@ -476,6 +480,8 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); mCondition = ConditionState::kUnknown; mConditionTimer.onConditionChanged(mCondition, eventTimeNs); updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs); return; } Loading Loading @@ -517,6 +523,29 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, flushIfNeededLocked(eventTimeNs); mConditionTimer.onConditionChanged(mCondition, eventTimeNs); updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs); } void ValueMetricProducer::updateCurrentSlicedBucketConditionTimers(bool newCondition, int64_t eventTimeNs) { if (mSlicedStateAtoms.empty()) { return; } // Utilize the current state key of each DimensionsInWhat key to determine // which condition timers to update. // // Assumes that the MetricDimensionKey exists in `mCurrentSlicedBucket`. bool inPulledData; for (const auto& [dimensionInWhatKey, dimensionInWhatInfo] : mCurrentBaseInfo) { // If the new condition is true, turn ON the condition timer only if // the DimensionInWhat key was present in the pulled data. inPulledData = dimensionInWhatInfo.hasCurrentState; mCurrentSlicedBucket[MetricDimensionKey(dimensionInWhatKey, dimensionInWhatInfo.currentState)] .conditionTimer.onConditionChanged(newCondition && inPulledData, eventTimeNs); } } void ValueMetricProducer::prepareFirstBucketLocked() { Loading Loading @@ -618,8 +647,8 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log // 2. A superset of the current mStateChangePrimaryKey // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys) // then we need to reset the base. for (auto& slice : mCurrentSlicedBucket) { const auto& whatKey = slice.first.getDimensionKeyInWhat(); for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat(); bool presentInPulledData = mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end(); if (!presentInPulledData && whatKey.contains(mStateChangePrimaryKey.second)) { Loading @@ -627,6 +656,12 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log for (auto& baseInfo : it->second.baseInfos) { baseInfo.hasBase = false; } // Set to false when DimensionInWhat key is not present in a pull. // Used in onMatchedLogEventInternalLocked() to ensure the condition // timer is turned on the next pull when data is present. it->second.hasCurrentState = false; // Turn OFF condition timer for keys not present in pulled data. currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs); } } mMatchedMetricDimensionKeys.clear(); Loading Loading @@ -789,21 +824,26 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( return; } DimensionsInWhatInfo& dimensionsInWhatInfo = mCurrentBaseInfo[whatKey]; const auto& returnVal = mCurrentBaseInfo.emplace(whatKey, DimensionsInWhatInfo(getUnknownStateKey())); DimensionsInWhatInfo& dimensionsInWhatInfo = returnVal.first->second; const HashableDimensionKey oldStateKey = dimensionsInWhatInfo.currentState; vector<BaseInfo>& baseInfos = dimensionsInWhatInfo.baseInfos; if (baseInfos.size() < mFieldMatchers.size()) { VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); baseInfos.resize(mFieldMatchers.size()); } // Ensure we turn on the condition timer in the case where dimensions // were missing on a previous pull due to a state change. bool stateChange = oldStateKey != stateKey; if (!dimensionsInWhatInfo.hasCurrentState) { dimensionsInWhatInfo.currentState = getUnknownStateKey(); stateChange = true; dimensionsInWhatInfo.hasCurrentState = true; } // We need to get the intervals stored with the previous state key so we can // close these value intervals. const auto oldStateKey = dimensionsInWhatInfo.currentState; vector<Interval>& intervals = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)].intervals; if (intervals.size() < mFieldMatchers.size()) { Loading Loading @@ -916,6 +956,17 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( interval.sampleSize += 1; } // State change. if (!mSlicedStateAtoms.empty() && stateChange) { // Turn OFF the condition timer for the previous state key. mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)] .conditionTimer.onConditionChanged(false, eventTimeNs); // Turn ON the condition timer for the new state key. mCurrentSlicedBucket[MetricDimensionKey(whatKey, stateKey)] .conditionTimer.onConditionChanged(true, eventTimeNs); } // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due // to MULTIPLE_BUCKETS_SKIPPED. if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) { Loading Loading @@ -990,12 +1041,18 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, if (!mCurrentBucketIsSkipped) { bool bucketHasData = false; // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { PastValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second.intervals); for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { PastValueBucket bucket = buildPartialBucket(bucketEndTime, currentValueBucket.intervals); if (!mSlicedStateAtoms.empty()) { bucket.mConditionTrueNs = currentValueBucket.conditionTimer.newBucketStart(bucketEndTime); } else { bucket.mConditionTrueNs = conditionTrueDuration; } // it will auto create new vector of ValuebucketInfo if the key is not found. if (bucket.valueIndex.size() > 0) { auto& bucketList = mPastBuckets[slice.first]; auto& bucketList = mPastBuckets[metricDimensionKey]; bucketList.push_back(bucket); bucketHasData = true; } Loading Loading @@ -1023,11 +1080,18 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA)); mSkippedBuckets.emplace_back(bucketInGap); } appendToFullBucket(eventTimeNs > fullBucketEndTimeNs); initCurrentSlicedBucket(nextBucketStartTimeNs); // Update the condition timer again, in case we skipped buckets. mConditionTimer.newBucketStart(nextBucketStartTimeNs); // NOTE: Update the condition timers in `mCurrentSlicedBucket` only when slicing // by state. Otherwise, the "global" condition timer will be used. if (!mSlicedStateAtoms.empty()) { for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { currentValueBucket.conditionTimer.newBucketStart(nextBucketStartTimeNs); } } mCurrentBucketNum += numBucketsForward; } Loading Loading @@ -1069,6 +1133,17 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) interval.seenNewData = false; } if (obsolete && !mSlicedStateAtoms.empty()) { // When slicing by state, only delete the MetricDimensionKey when the // state key in the MetricDimensionKey is not the current state key. const HashableDimensionKey& dimensionInWhatKey = it->first.getDimensionKeyInWhat(); const auto& currentBaseInfoItr = mCurrentBaseInfo.find(dimensionInWhatKey); if ((currentBaseInfoItr != mCurrentBaseInfo.end()) && (it->first.getStateValuesKey() == currentBaseInfoItr->second.currentState)) { obsolete = false; } } if (obsolete) { it = mCurrentSlicedBucket.erase(it); } else { Loading Loading @@ -1104,7 +1179,7 @@ void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { // Accumulate partial buckets with current value and then send to anomaly tracker. if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { if (hitFullBucketGuardRailLocked(slice.first)) { if (hitFullBucketGuardRailLocked(slice.first) || slice.second.intervals.empty()) { continue; } // TODO: fix this when anomaly can accept double values Loading @@ -1125,7 +1200,7 @@ void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { // Skip aggregating the partial buckets since there's no previous partial bucket. for (const auto& slice : mCurrentSlicedBucket) { for (auto& tracker : mAnomalyTrackers) { if (tracker != nullptr) { if (tracker != nullptr && !slice.second.intervals.empty()) { // TODO: fix this when anomaly can accept double values auto& interval = slice.second.intervals[0]; if (interval.hasValue) { Loading @@ -1139,6 +1214,7 @@ void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { } else { // Accumulate partial bucket. for (const auto& slice : mCurrentSlicedBucket) { if (!slice.second.intervals.empty()) { // TODO: fix this when anomaly can accept double values auto& interval = slice.second.intervals[0]; if (interval.hasValue) { Loading @@ -1147,6 +1223,7 @@ void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { } } } } size_t ValueMetricProducer::byteSizeLocked() const { size_t totalSize = 0; Loading cmds/statsd/src/metrics/ValueMetricProducer.h +19 −1 Original line number Diff line number Diff line Loading @@ -193,8 +193,14 @@ private: // Internal state of an ongoing aggregation bucket. typedef struct CurrentValueBucket { // If the `MetricDimensionKey` state key is the current state key, then // the condition timer will be updated later (e.g. condition/state/active // state change) with the correct condition and time. CurrentValueBucket() : intervals(), conditionTimer(ConditionTimer(false, 0)) {} // Value information for each value field of the metric. std::vector<Interval> intervals; // Tracks how long the condition is true. ConditionTimer conditionTimer; } CurrentValueBucket; // Holds base information for diffing values from one value field. Loading @@ -206,7 +212,10 @@ private: } BaseInfo; // State key and base information for a specific DimensionsInWhat key. typedef struct { typedef struct DimensionsInWhatInfo { DimensionsInWhatInfo(const HashableDimensionKey& stateKey) : baseInfos(), currentState(stateKey), hasCurrentState(false) { } std::vector<BaseInfo> baseInfos; // Last seen state value(s). HashableDimensionKey currentState; Loading Loading @@ -252,6 +261,10 @@ private: // Reset diff base and mHasGlobalBase void resetBase(); // Updates the condition timers in the current sliced bucket when there is a // condition change or an active state change. void updateCurrentSlicedBucketConditionTimers(bool newCondition, int64_t eventTimeNs); static const size_t kBucketSize = sizeof(PastValueBucket{}); const size_t mDimensionSoftLimit; Loading Loading @@ -337,6 +350,11 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithMultipleDimensions); FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithMissingDataInStateChange); FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithDataMissingInConditionChange); FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithMissingDataThenFlushBucket); FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithNoPullOnBucketBoundary); FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenOneConditionFailed); FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenInitialPullFailed); Loading cmds/statsd/tests/condition/ConditionTimer_test.cpp +3 −3 Original line number Diff line number Diff line Loading @@ -35,11 +35,11 @@ TEST(ConditionTimerTest, TestTimer_Inital_False) { EXPECT_EQ(0, timer.mTimerNs); timer.onConditionChanged(true, ct_start_time + 5); EXPECT_EQ(ct_start_time + 5, timer.mLastConditionTrueTimestampNs); EXPECT_EQ(ct_start_time + 5, timer.mLastConditionChangeTimestampNs); EXPECT_EQ(true, timer.mCondition); EXPECT_EQ(95, timer.newBucketStart(ct_start_time + 100)); EXPECT_EQ(ct_start_time + 100, timer.mLastConditionTrueTimestampNs); EXPECT_EQ(ct_start_time + 100, timer.mLastConditionChangeTimestampNs); EXPECT_EQ(true, timer.mCondition); } Loading @@ -51,7 +51,7 @@ TEST(ConditionTimerTest, TestTimer_Inital_True) { EXPECT_EQ(ct_start_time - time_base, timer.newBucketStart(ct_start_time)); EXPECT_EQ(true, timer.mCondition); EXPECT_EQ(0, timer.mTimerNs); EXPECT_EQ(ct_start_time, timer.mLastConditionTrueTimestampNs); EXPECT_EQ(ct_start_time, timer.mLastConditionChangeTimestampNs); timer.onConditionChanged(false, ct_start_time + 5); EXPECT_EQ(5, timer.mTimerNs); Loading cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +2015 −136 File changed.Preview size limit exceeded, changes collapsed. Show changes Loading
cmds/statsd/src/condition/ConditionTimer.h +36 −12 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ class ConditionTimer { public: explicit ConditionTimer(bool initCondition, int64_t bucketStartNs) : mCondition(initCondition) { if (initCondition) { mLastConditionTrueTimestampNs = bucketStartNs; mLastConditionChangeTimestampNs = bucketStartNs; } }; Loading @@ -44,21 +44,46 @@ public: // When a new bucket is created, this value will be reset to 0. int64_t mTimerNs = 0; // Last elapsed real timestamp when condition turned to true // When a new bucket is created and the condition is true, then the timestamp is set // to be the bucket start timestamp. int64_t mLastConditionTrueTimestampNs = 0; // Last elapsed real timestamp when condition changed. int64_t mLastConditionChangeTimestampNs = 0; bool mCondition = false; int64_t newBucketStart(int64_t nextBucketStartNs) { if (mCondition) { mTimerNs += (nextBucketStartNs - mLastConditionTrueTimestampNs); mLastConditionTrueTimestampNs = nextBucketStartNs; // Normally, the next bucket happens after the last condition // change. In this case, add the time between the condition becoming // true to the next bucket start time. // Otherwise, the next bucket start time is before the last // condition change time, this means that the condition was false at // the bucket boundary before the condition became true, so the // timer should not get updated and the last condition change time // remains as is. if (nextBucketStartNs >= mLastConditionChangeTimestampNs) { mTimerNs += (nextBucketStartNs - mLastConditionChangeTimestampNs); mLastConditionChangeTimestampNs = nextBucketStartNs; } } else if (mLastConditionChangeTimestampNs > nextBucketStartNs) { // The next bucket start time is before the last condition change // time, this means that the condition was true at the bucket // boundary before the condition became false, so adjust the timer // to match how long the condition was true to the bucket boundary. // This means remove the amount the condition stayed true in the // next bucket from the current bucket. mTimerNs -= (mLastConditionChangeTimestampNs - nextBucketStartNs); } int64_t temp = mTimerNs; mTimerNs = 0; if (!mCondition && (mLastConditionChangeTimestampNs > nextBucketStartNs)) { // The next bucket start time is before the last condition change // time, this means that the condition was true at the bucket // boundary and remained true in the next bucket up to the condition // change to false, so adjust the timer to match how long the // condition stayed true in the next bucket (now the current bucket). mTimerNs = mLastConditionChangeTimestampNs - nextBucketStartNs; } return temp; } Loading @@ -67,11 +92,10 @@ public: return; } mCondition = newCondition; if (newCondition) { mLastConditionTrueTimestampNs = timestampNs; } else { mTimerNs += (timestampNs - mLastConditionTrueTimestampNs); if (newCondition == false) { mTimerNs += (timestampNs - mLastConditionChangeTimestampNs); } mLastConditionChangeTimestampNs = timestampNs; } FRIEND_TEST(ConditionTimerTest, TestTimer_Inital_False); Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +96 −19 Original line number Diff line number Diff line Loading @@ -301,7 +301,6 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME, (long long)(NanoToMillis(dropEvent.dropTimeNs))); ; protoOutput->end(dropEventToken); } protoOutput->end(wrapperToken); Loading Loading @@ -346,8 +345,11 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } // only write the condition timer value if the metric has a condition. if (mConditionTrackerIndex >= 0) { // We only write the condition timer value if the metric has a // condition and/or is sliced by state. // If the metric is sliced by state, the condition timer value is // also sliced by state to reflect time spent in that state. if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS, (long long)bucket.mConditionTrueNs); } Loading Loading @@ -454,6 +456,8 @@ void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) // Let condition timer know of new active state. mConditionTimer.onConditionChanged(mIsActive, eventTimeNs); updateCurrentSlicedBucketConditionTimers(mIsActive, eventTimeNs); } void ValueMetricProducer::onConditionChangedLocked(const bool condition, Loading @@ -476,6 +480,8 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); mCondition = ConditionState::kUnknown; mConditionTimer.onConditionChanged(mCondition, eventTimeNs); updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs); return; } Loading Loading @@ -517,6 +523,29 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, flushIfNeededLocked(eventTimeNs); mConditionTimer.onConditionChanged(mCondition, eventTimeNs); updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs); } void ValueMetricProducer::updateCurrentSlicedBucketConditionTimers(bool newCondition, int64_t eventTimeNs) { if (mSlicedStateAtoms.empty()) { return; } // Utilize the current state key of each DimensionsInWhat key to determine // which condition timers to update. // // Assumes that the MetricDimensionKey exists in `mCurrentSlicedBucket`. bool inPulledData; for (const auto& [dimensionInWhatKey, dimensionInWhatInfo] : mCurrentBaseInfo) { // If the new condition is true, turn ON the condition timer only if // the DimensionInWhat key was present in the pulled data. inPulledData = dimensionInWhatInfo.hasCurrentState; mCurrentSlicedBucket[MetricDimensionKey(dimensionInWhatKey, dimensionInWhatInfo.currentState)] .conditionTimer.onConditionChanged(newCondition && inPulledData, eventTimeNs); } } void ValueMetricProducer::prepareFirstBucketLocked() { Loading Loading @@ -618,8 +647,8 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log // 2. A superset of the current mStateChangePrimaryKey // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys) // then we need to reset the base. for (auto& slice : mCurrentSlicedBucket) { const auto& whatKey = slice.first.getDimensionKeyInWhat(); for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat(); bool presentInPulledData = mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end(); if (!presentInPulledData && whatKey.contains(mStateChangePrimaryKey.second)) { Loading @@ -627,6 +656,12 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log for (auto& baseInfo : it->second.baseInfos) { baseInfo.hasBase = false; } // Set to false when DimensionInWhat key is not present in a pull. // Used in onMatchedLogEventInternalLocked() to ensure the condition // timer is turned on the next pull when data is present. it->second.hasCurrentState = false; // Turn OFF condition timer for keys not present in pulled data. currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs); } } mMatchedMetricDimensionKeys.clear(); Loading Loading @@ -789,21 +824,26 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( return; } DimensionsInWhatInfo& dimensionsInWhatInfo = mCurrentBaseInfo[whatKey]; const auto& returnVal = mCurrentBaseInfo.emplace(whatKey, DimensionsInWhatInfo(getUnknownStateKey())); DimensionsInWhatInfo& dimensionsInWhatInfo = returnVal.first->second; const HashableDimensionKey oldStateKey = dimensionsInWhatInfo.currentState; vector<BaseInfo>& baseInfos = dimensionsInWhatInfo.baseInfos; if (baseInfos.size() < mFieldMatchers.size()) { VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); baseInfos.resize(mFieldMatchers.size()); } // Ensure we turn on the condition timer in the case where dimensions // were missing on a previous pull due to a state change. bool stateChange = oldStateKey != stateKey; if (!dimensionsInWhatInfo.hasCurrentState) { dimensionsInWhatInfo.currentState = getUnknownStateKey(); stateChange = true; dimensionsInWhatInfo.hasCurrentState = true; } // We need to get the intervals stored with the previous state key so we can // close these value intervals. const auto oldStateKey = dimensionsInWhatInfo.currentState; vector<Interval>& intervals = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)].intervals; if (intervals.size() < mFieldMatchers.size()) { Loading Loading @@ -916,6 +956,17 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( interval.sampleSize += 1; } // State change. if (!mSlicedStateAtoms.empty() && stateChange) { // Turn OFF the condition timer for the previous state key. mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)] .conditionTimer.onConditionChanged(false, eventTimeNs); // Turn ON the condition timer for the new state key. mCurrentSlicedBucket[MetricDimensionKey(whatKey, stateKey)] .conditionTimer.onConditionChanged(true, eventTimeNs); } // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due // to MULTIPLE_BUCKETS_SKIPPED. if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) { Loading Loading @@ -990,12 +1041,18 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, if (!mCurrentBucketIsSkipped) { bool bucketHasData = false; // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { PastValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second.intervals); for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { PastValueBucket bucket = buildPartialBucket(bucketEndTime, currentValueBucket.intervals); if (!mSlicedStateAtoms.empty()) { bucket.mConditionTrueNs = currentValueBucket.conditionTimer.newBucketStart(bucketEndTime); } else { bucket.mConditionTrueNs = conditionTrueDuration; } // it will auto create new vector of ValuebucketInfo if the key is not found. if (bucket.valueIndex.size() > 0) { auto& bucketList = mPastBuckets[slice.first]; auto& bucketList = mPastBuckets[metricDimensionKey]; bucketList.push_back(bucket); bucketHasData = true; } Loading Loading @@ -1023,11 +1080,18 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA)); mSkippedBuckets.emplace_back(bucketInGap); } appendToFullBucket(eventTimeNs > fullBucketEndTimeNs); initCurrentSlicedBucket(nextBucketStartTimeNs); // Update the condition timer again, in case we skipped buckets. mConditionTimer.newBucketStart(nextBucketStartTimeNs); // NOTE: Update the condition timers in `mCurrentSlicedBucket` only when slicing // by state. Otherwise, the "global" condition timer will be used. if (!mSlicedStateAtoms.empty()) { for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { currentValueBucket.conditionTimer.newBucketStart(nextBucketStartTimeNs); } } mCurrentBucketNum += numBucketsForward; } Loading Loading @@ -1069,6 +1133,17 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) interval.seenNewData = false; } if (obsolete && !mSlicedStateAtoms.empty()) { // When slicing by state, only delete the MetricDimensionKey when the // state key in the MetricDimensionKey is not the current state key. const HashableDimensionKey& dimensionInWhatKey = it->first.getDimensionKeyInWhat(); const auto& currentBaseInfoItr = mCurrentBaseInfo.find(dimensionInWhatKey); if ((currentBaseInfoItr != mCurrentBaseInfo.end()) && (it->first.getStateValuesKey() == currentBaseInfoItr->second.currentState)) { obsolete = false; } } if (obsolete) { it = mCurrentSlicedBucket.erase(it); } else { Loading Loading @@ -1104,7 +1179,7 @@ void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { // Accumulate partial buckets with current value and then send to anomaly tracker. if (mCurrentFullBucket.size() > 0) { for (const auto& slice : mCurrentSlicedBucket) { if (hitFullBucketGuardRailLocked(slice.first)) { if (hitFullBucketGuardRailLocked(slice.first) || slice.second.intervals.empty()) { continue; } // TODO: fix this when anomaly can accept double values Loading @@ -1125,7 +1200,7 @@ void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { // Skip aggregating the partial buckets since there's no previous partial bucket. for (const auto& slice : mCurrentSlicedBucket) { for (auto& tracker : mAnomalyTrackers) { if (tracker != nullptr) { if (tracker != nullptr && !slice.second.intervals.empty()) { // TODO: fix this when anomaly can accept double values auto& interval = slice.second.intervals[0]; if (interval.hasValue) { Loading @@ -1139,6 +1214,7 @@ void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { } else { // Accumulate partial bucket. for (const auto& slice : mCurrentSlicedBucket) { if (!slice.second.intervals.empty()) { // TODO: fix this when anomaly can accept double values auto& interval = slice.second.intervals[0]; if (interval.hasValue) { Loading @@ -1147,6 +1223,7 @@ void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { } } } } size_t ValueMetricProducer::byteSizeLocked() const { size_t totalSize = 0; Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +19 −1 Original line number Diff line number Diff line Loading @@ -193,8 +193,14 @@ private: // Internal state of an ongoing aggregation bucket. typedef struct CurrentValueBucket { // If the `MetricDimensionKey` state key is the current state key, then // the condition timer will be updated later (e.g. condition/state/active // state change) with the correct condition and time. CurrentValueBucket() : intervals(), conditionTimer(ConditionTimer(false, 0)) {} // Value information for each value field of the metric. std::vector<Interval> intervals; // Tracks how long the condition is true. ConditionTimer conditionTimer; } CurrentValueBucket; // Holds base information for diffing values from one value field. Loading @@ -206,7 +212,10 @@ private: } BaseInfo; // State key and base information for a specific DimensionsInWhat key. typedef struct { typedef struct DimensionsInWhatInfo { DimensionsInWhatInfo(const HashableDimensionKey& stateKey) : baseInfos(), currentState(stateKey), hasCurrentState(false) { } std::vector<BaseInfo> baseInfos; // Last seen state value(s). HashableDimensionKey currentState; Loading Loading @@ -252,6 +261,10 @@ private: // Reset diff base and mHasGlobalBase void resetBase(); // Updates the condition timers in the current sliced bucket when there is a // condition change or an active state change. void updateCurrentSlicedBucketConditionTimers(bool newCondition, int64_t eventTimeNs); static const size_t kBucketSize = sizeof(PastValueBucket{}); const size_t mDimensionSoftLimit; Loading Loading @@ -337,6 +350,11 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithMultipleDimensions); FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithMissingDataInStateChange); FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithDataMissingInConditionChange); FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithMissingDataThenFlushBucket); FRIEND_TEST(ValueMetricProducerTest, TestSlicedStateWithNoPullOnBucketBoundary); FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenOneConditionFailed); FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenInitialPullFailed); Loading
cmds/statsd/tests/condition/ConditionTimer_test.cpp +3 −3 Original line number Diff line number Diff line Loading @@ -35,11 +35,11 @@ TEST(ConditionTimerTest, TestTimer_Inital_False) { EXPECT_EQ(0, timer.mTimerNs); timer.onConditionChanged(true, ct_start_time + 5); EXPECT_EQ(ct_start_time + 5, timer.mLastConditionTrueTimestampNs); EXPECT_EQ(ct_start_time + 5, timer.mLastConditionChangeTimestampNs); EXPECT_EQ(true, timer.mCondition); EXPECT_EQ(95, timer.newBucketStart(ct_start_time + 100)); EXPECT_EQ(ct_start_time + 100, timer.mLastConditionTrueTimestampNs); EXPECT_EQ(ct_start_time + 100, timer.mLastConditionChangeTimestampNs); EXPECT_EQ(true, timer.mCondition); } Loading @@ -51,7 +51,7 @@ TEST(ConditionTimerTest, TestTimer_Inital_True) { EXPECT_EQ(ct_start_time - time_base, timer.newBucketStart(ct_start_time)); EXPECT_EQ(true, timer.mCondition); EXPECT_EQ(0, timer.mTimerNs); EXPECT_EQ(ct_start_time, timer.mLastConditionTrueTimestampNs); EXPECT_EQ(ct_start_time, timer.mLastConditionChangeTimestampNs); timer.onConditionChanged(false, ct_start_time + 5); EXPECT_EQ(5, timer.mTimerNs); Loading
cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +2015 −136 File changed.Preview size limit exceeded, changes collapsed. Show changes