Loading cmds/statsd/src/metrics/DurationMetricProducer.cpp +6 −13 Original line number Diff line number Diff line Loading @@ -122,16 +122,16 @@ void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime } unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const { const HashableDimensionKey& eventKey) const { switch (mMetric.aggregation_type()) { case DurationMetric_AggregationType_SUM: return make_unique<OringDurationTracker>( mConfigKey, mMetric.name(), eventKey, mWizard, mConditionTrackerIndex, mNested, mCurrentBucketStartTimeNs, mBucketSizeNs, mAnomalyTrackers, bucket); mCurrentBucketStartTimeNs, mBucketSizeNs, mAnomalyTrackers); case DurationMetric_AggregationType_MAX_SPARSE: return make_unique<MaxDurationTracker>( mConfigKey, mMetric.name(), eventKey, mWizard, mConditionTrackerIndex, mNested, mCurrentBucketStartTimeNs, mBucketSizeNs, mAnomalyTrackers, bucket); mCurrentBucketStartTimeNs, mBucketSizeNs, mAnomalyTrackers); } } Loading Loading @@ -179,13 +179,6 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked continue; } // If there is no duration bucket info for this key, don't include it in the report. // For example, duration started, but condition is never turned to true. // TODO: Only add the key to the map when we add duration buckets info for it. if (pair.second.size() == 0) { continue; } long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); Loading Loading @@ -228,7 +221,7 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked (long long)mCurrentBucketStartTimeNs); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(); startNewProtoOutputStreamLocked(endTime); // TODO: Properly clear the old buckets. mPastBuckets.clear(); return buffer; } Loading @@ -238,7 +231,7 @@ void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) { } VLOG("flushing..........."); for (auto it = mCurrentSlicedDuration.begin(); it != mCurrentSlicedDuration.end();) { if (it->second->flushIfNeeded(eventTime)) { if (it->second->flushIfNeeded(eventTime, &mPastBuckets)) { VLOG("erase bucket for key %s", it->first.c_str()); it = mCurrentSlicedDuration.erase(it); } else { Loading Loading @@ -290,7 +283,7 @@ void DurationMetricProducer::onMatchedLogEventInternalLocked( if (hitGuardRailLocked(eventKey)) { return; } mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]); mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey); } auto it = mCurrentSlicedDuration.find(eventKey); Loading cmds/statsd/src/metrics/DurationMetricProducer.h +1 −1 Original line number Diff line number Diff line Loading @@ -106,7 +106,7 @@ private: // Helper function to create a duration tracker given the metric aggregation type. std::unique_ptr<DurationTracker> createDurationTracker( const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const; const HashableDimensionKey& eventKey) const; // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const HashableDimensionKey& newKey); Loading cmds/statsd/src/metrics/duration_helper/DurationTracker.h +4 −6 Original line number Diff line number Diff line Loading @@ -63,8 +63,7 @@ public: DurationTracker(const ConfigKey& key, const string& name, const HashableDimensionKey& eventKey, sp<ConditionWizard> wizard, int conditionIndex, bool nesting, uint64_t currentBucketStartNs, uint64_t bucketSizeNs, const std::vector<sp<AnomalyTracker>>& anomalyTrackers, std::vector<DurationBucket>& bucket) const std::vector<sp<AnomalyTracker>>& anomalyTrackers) : mConfigKey(key), mName(name), mEventKey(eventKey), Loading @@ -73,7 +72,6 @@ public: mBucketSizeNs(bucketSizeNs), mNested(nesting), mCurrentBucketStartTimeNs(currentBucketStartNs), mBucket(bucket), mDuration(0), mCurrentBucketNum(0), mAnomalyTrackers(anomalyTrackers){}; Loading @@ -91,7 +89,9 @@ public: // Flush stale buckets if needed, and return true if the tracker has no on-going duration // events, so that the owner can safely remove the tracker. virtual bool flushIfNeeded(uint64_t timestampNs) = 0; virtual bool flushIfNeeded( uint64_t timestampNs, std::unordered_map<HashableDimensionKey, std::vector<DurationBucket>>* output) = 0; // Predict the anomaly timestamp given the current status. virtual int64_t predictAnomalyTimestampNs(const AnomalyTracker& anomalyTracker, Loading Loading @@ -159,8 +159,6 @@ protected: uint64_t mCurrentBucketStartTimeNs; std::vector<DurationBucket>& mBucket; // where to write output int64_t mDuration; // current recorded duration result uint64_t mCurrentBucketNum; Loading cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.cpp +6 −6 Original line number Diff line number Diff line Loading @@ -28,10 +28,9 @@ MaxDurationTracker::MaxDurationTracker(const ConfigKey& key, const string& name, const HashableDimensionKey& eventKey, sp<ConditionWizard> wizard, int conditionIndex, bool nesting, uint64_t currentBucketStartNs, uint64_t bucketSizeNs, const std::vector<sp<AnomalyTracker>>& anomalyTrackers, std::vector<DurationBucket>& bucket) const std::vector<sp<AnomalyTracker>>& anomalyTrackers) : DurationTracker(key, name, eventKey, wizard, conditionIndex, nesting, currentBucketStartNs, bucketSizeNs, anomalyTrackers, bucket) { bucketSizeNs, anomalyTrackers) { } bool MaxDurationTracker::hitGuardRail(const HashableDimensionKey& newKey) { Loading Loading @@ -145,7 +144,8 @@ void MaxDurationTracker::noteStopAll(const uint64_t eventTime) { } } bool MaxDurationTracker::flushIfNeeded(uint64_t eventTime) { bool MaxDurationTracker::flushIfNeeded( uint64_t eventTime, unordered_map<HashableDimensionKey, vector<DurationBucket>>* output) { if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) { return false; } Loading Loading @@ -202,7 +202,7 @@ bool MaxDurationTracker::flushIfNeeded(uint64_t eventTime) { if (mDuration != 0) { info.mDuration = mDuration; mBucket.push_back(info); (*output)[mEventKey].push_back(info); addPastBucketToAnomalyTrackers(info.mDuration, info.mBucketNum); VLOG(" final duration for last bucket: %lld", (long long)mDuration); } Loading @@ -215,7 +215,7 @@ bool MaxDurationTracker::flushIfNeeded(uint64_t eventTime) { info.mBucketEndNs = endTime + mBucketSizeNs * i; info.mBucketNum = mCurrentBucketNum + i; info.mDuration = mBucketSizeNs; mBucket.push_back(info); (*output)[mEventKey].push_back(info); addPastBucketToAnomalyTrackers(info.mDuration, info.mBucketNum); VLOG(" filling gap bucket with duration %lld", (long long)mBucketSizeNs); } Loading cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.h +4 −3 Original line number Diff line number Diff line Loading @@ -32,15 +32,16 @@ public: const HashableDimensionKey& eventKey, sp<ConditionWizard> wizard, int conditionIndex, bool nesting, uint64_t currentBucketStartNs, uint64_t bucketSizeNs, const std::vector<sp<AnomalyTracker>>& anomalyTrackers, std::vector<DurationBucket>& bucket); const std::vector<sp<AnomalyTracker>>& anomalyTrackers); void noteStart(const HashableDimensionKey& key, bool condition, const uint64_t eventTime, const ConditionKey& conditionKey) override; void noteStop(const HashableDimensionKey& key, const uint64_t eventTime, const bool stopAll) override; void noteStopAll(const uint64_t eventTime) override; bool flushIfNeeded(uint64_t timestampNs) override; bool flushIfNeeded( uint64_t timestampNs, std::unordered_map<HashableDimensionKey, std::vector<DurationBucket>>* output) override; void onSlicedConditionMayChange(const uint64_t timestamp) override; void onConditionChanged(bool condition, const uint64_t timestamp) override; Loading Loading
cmds/statsd/src/metrics/DurationMetricProducer.cpp +6 −13 Original line number Diff line number Diff line Loading @@ -122,16 +122,16 @@ void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime } unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const { const HashableDimensionKey& eventKey) const { switch (mMetric.aggregation_type()) { case DurationMetric_AggregationType_SUM: return make_unique<OringDurationTracker>( mConfigKey, mMetric.name(), eventKey, mWizard, mConditionTrackerIndex, mNested, mCurrentBucketStartTimeNs, mBucketSizeNs, mAnomalyTrackers, bucket); mCurrentBucketStartTimeNs, mBucketSizeNs, mAnomalyTrackers); case DurationMetric_AggregationType_MAX_SPARSE: return make_unique<MaxDurationTracker>( mConfigKey, mMetric.name(), eventKey, mWizard, mConditionTrackerIndex, mNested, mCurrentBucketStartTimeNs, mBucketSizeNs, mAnomalyTrackers, bucket); mCurrentBucketStartTimeNs, mBucketSizeNs, mAnomalyTrackers); } } Loading Loading @@ -179,13 +179,6 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked continue; } // If there is no duration bucket info for this key, don't include it in the report. // For example, duration started, but condition is never turned to true. // TODO: Only add the key to the map when we add duration buckets info for it. if (pair.second.size() == 0) { continue; } long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); Loading Loading @@ -228,7 +221,7 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked (long long)mCurrentBucketStartTimeNs); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(); startNewProtoOutputStreamLocked(endTime); // TODO: Properly clear the old buckets. mPastBuckets.clear(); return buffer; } Loading @@ -238,7 +231,7 @@ void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) { } VLOG("flushing..........."); for (auto it = mCurrentSlicedDuration.begin(); it != mCurrentSlicedDuration.end();) { if (it->second->flushIfNeeded(eventTime)) { if (it->second->flushIfNeeded(eventTime, &mPastBuckets)) { VLOG("erase bucket for key %s", it->first.c_str()); it = mCurrentSlicedDuration.erase(it); } else { Loading Loading @@ -290,7 +283,7 @@ void DurationMetricProducer::onMatchedLogEventInternalLocked( if (hitGuardRailLocked(eventKey)) { return; } mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]); mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey); } auto it = mCurrentSlicedDuration.find(eventKey); Loading
cmds/statsd/src/metrics/DurationMetricProducer.h +1 −1 Original line number Diff line number Diff line Loading @@ -106,7 +106,7 @@ private: // Helper function to create a duration tracker given the metric aggregation type. std::unique_ptr<DurationTracker> createDurationTracker( const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const; const HashableDimensionKey& eventKey) const; // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const HashableDimensionKey& newKey); Loading
cmds/statsd/src/metrics/duration_helper/DurationTracker.h +4 −6 Original line number Diff line number Diff line Loading @@ -63,8 +63,7 @@ public: DurationTracker(const ConfigKey& key, const string& name, const HashableDimensionKey& eventKey, sp<ConditionWizard> wizard, int conditionIndex, bool nesting, uint64_t currentBucketStartNs, uint64_t bucketSizeNs, const std::vector<sp<AnomalyTracker>>& anomalyTrackers, std::vector<DurationBucket>& bucket) const std::vector<sp<AnomalyTracker>>& anomalyTrackers) : mConfigKey(key), mName(name), mEventKey(eventKey), Loading @@ -73,7 +72,6 @@ public: mBucketSizeNs(bucketSizeNs), mNested(nesting), mCurrentBucketStartTimeNs(currentBucketStartNs), mBucket(bucket), mDuration(0), mCurrentBucketNum(0), mAnomalyTrackers(anomalyTrackers){}; Loading @@ -91,7 +89,9 @@ public: // Flush stale buckets if needed, and return true if the tracker has no on-going duration // events, so that the owner can safely remove the tracker. virtual bool flushIfNeeded(uint64_t timestampNs) = 0; virtual bool flushIfNeeded( uint64_t timestampNs, std::unordered_map<HashableDimensionKey, std::vector<DurationBucket>>* output) = 0; // Predict the anomaly timestamp given the current status. virtual int64_t predictAnomalyTimestampNs(const AnomalyTracker& anomalyTracker, Loading Loading @@ -159,8 +159,6 @@ protected: uint64_t mCurrentBucketStartTimeNs; std::vector<DurationBucket>& mBucket; // where to write output int64_t mDuration; // current recorded duration result uint64_t mCurrentBucketNum; Loading
cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.cpp +6 −6 Original line number Diff line number Diff line Loading @@ -28,10 +28,9 @@ MaxDurationTracker::MaxDurationTracker(const ConfigKey& key, const string& name, const HashableDimensionKey& eventKey, sp<ConditionWizard> wizard, int conditionIndex, bool nesting, uint64_t currentBucketStartNs, uint64_t bucketSizeNs, const std::vector<sp<AnomalyTracker>>& anomalyTrackers, std::vector<DurationBucket>& bucket) const std::vector<sp<AnomalyTracker>>& anomalyTrackers) : DurationTracker(key, name, eventKey, wizard, conditionIndex, nesting, currentBucketStartNs, bucketSizeNs, anomalyTrackers, bucket) { bucketSizeNs, anomalyTrackers) { } bool MaxDurationTracker::hitGuardRail(const HashableDimensionKey& newKey) { Loading Loading @@ -145,7 +144,8 @@ void MaxDurationTracker::noteStopAll(const uint64_t eventTime) { } } bool MaxDurationTracker::flushIfNeeded(uint64_t eventTime) { bool MaxDurationTracker::flushIfNeeded( uint64_t eventTime, unordered_map<HashableDimensionKey, vector<DurationBucket>>* output) { if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) { return false; } Loading Loading @@ -202,7 +202,7 @@ bool MaxDurationTracker::flushIfNeeded(uint64_t eventTime) { if (mDuration != 0) { info.mDuration = mDuration; mBucket.push_back(info); (*output)[mEventKey].push_back(info); addPastBucketToAnomalyTrackers(info.mDuration, info.mBucketNum); VLOG(" final duration for last bucket: %lld", (long long)mDuration); } Loading @@ -215,7 +215,7 @@ bool MaxDurationTracker::flushIfNeeded(uint64_t eventTime) { info.mBucketEndNs = endTime + mBucketSizeNs * i; info.mBucketNum = mCurrentBucketNum + i; info.mDuration = mBucketSizeNs; mBucket.push_back(info); (*output)[mEventKey].push_back(info); addPastBucketToAnomalyTrackers(info.mDuration, info.mBucketNum); VLOG(" filling gap bucket with duration %lld", (long long)mBucketSizeNs); } Loading
cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.h +4 −3 Original line number Diff line number Diff line Loading @@ -32,15 +32,16 @@ public: const HashableDimensionKey& eventKey, sp<ConditionWizard> wizard, int conditionIndex, bool nesting, uint64_t currentBucketStartNs, uint64_t bucketSizeNs, const std::vector<sp<AnomalyTracker>>& anomalyTrackers, std::vector<DurationBucket>& bucket); const std::vector<sp<AnomalyTracker>>& anomalyTrackers); void noteStart(const HashableDimensionKey& key, bool condition, const uint64_t eventTime, const ConditionKey& conditionKey) override; void noteStop(const HashableDimensionKey& key, const uint64_t eventTime, const bool stopAll) override; void noteStopAll(const uint64_t eventTime) override; bool flushIfNeeded(uint64_t timestampNs) override; bool flushIfNeeded( uint64_t timestampNs, std::unordered_map<HashableDimensionKey, std::vector<DurationBucket>>* output) override; void onSlicedConditionMayChange(const uint64_t timestamp) override; void onConditionChanged(bool condition, const uint64_t timestamp) override; Loading