Loading cmds/statsd/Android.mk +1 −0 Original line number Diff line number Diff line Loading @@ -167,6 +167,7 @@ LOCAL_SRC_FILES := \ tests/metrics/OringDurationTracker_test.cpp \ tests/metrics/MaxDurationTracker_test.cpp \ tests/metrics/CountMetricProducer_test.cpp \ tests/metrics/DurationMetricProducer_test.cpp \ tests/metrics/EventMetricProducer_test.cpp \ tests/metrics/ValueMetricProducer_test.cpp \ tests/guardrail/StatsdStats_test.cpp Loading cmds/statsd/src/StatsLogProcessor.h +1 −1 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ public: const std::function<void(const ConfigKey&)>& sendBroadcast); virtual ~StatsLogProcessor(); virtual void OnLogEvent(const LogEvent& event); void OnLogEvent(const LogEvent& event); void OnConfigUpdated(const ConfigKey& key, const StatsdConfig& config); void OnConfigRemoved(const ConfigKey& key); Loading cmds/statsd/src/metrics/CountMetricProducer.cpp +16 −14 Original line number Diff line number Diff line Loading @@ -83,7 +83,7 @@ CountMetricProducer::CountMetricProducer(const ConfigKey& key, const CountMetric mConditionSliced = true; } startNewProtoOutputStream(mStartTimeNs); startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); Loading @@ -93,7 +93,7 @@ CountMetricProducer::~CountMetricProducer() { VLOG("~CountMetricProducer() called"); } void CountMetricProducer::startNewProtoOutputStream(long long startTime) { void CountMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -103,17 +103,17 @@ void CountMetricProducer::startNewProtoOutputStream(long long startTime) { void CountMetricProducer::finish() { } void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { void CountMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); } std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReportLocked() { long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); flushIfNeededLocked(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& counter : mPastBuckets) { Loading Loading @@ -165,9 +165,9 @@ std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { (long long)mCurrentBucketStartTimeNs); VLOG("metric %s dump report now...", mMetric.name().c_str()); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(); startNewProtoOutputStream(endTime); startNewProtoOutputStreamLocked(endTime); mPastBuckets.clear(); return buffer; Loading @@ -175,12 +175,13 @@ std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { // TODO: Clear mDimensionKeyMap once the report is dumped. } void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { void CountMetricProducer::onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); mCondition = conditionMet; } bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { bool CountMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) { if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) { return false; } Loading @@ -200,13 +201,14 @@ bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { return false; } void CountMetricProducer::onMatchedLogEventInternal( void CountMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const map<string, HashableDimensionKey>& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) { uint64_t eventTimeNs = event.GetTimestampNs(); flushIfNeeded(eventTimeNs); flushIfNeededLocked(eventTimeNs); if (condition == false) { return; Loading @@ -216,7 +218,7 @@ void CountMetricProducer::onMatchedLogEventInternal( if (it == mCurrentSlicedCounter->end()) { // ===========GuardRail============== if (hitGuardRail(eventKey)) { if (hitGuardRailLocked(eventKey)) { return; } Loading @@ -239,7 +241,7 @@ void CountMetricProducer::onMatchedLogEventInternal( // When a new matched event comes in, we check if event falls into the current // bucket. If not, flush the old counter to past buckets and initialize the new bucket. void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { void CountMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { return; } Loading Loading @@ -272,7 +274,7 @@ void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { // Rough estimate of CountMetricProducer buffer stored. This number will be // greater than actual data size as it contains each dimension of // CountMetricData is duplicated. size_t CountMetricProducer::byteSize() const { size_t CountMetricProducer::byteSizeLocked() const { size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; Loading cmds/statsd/src/metrics/CountMetricProducer.h +23 −18 Original line number Diff line number Diff line Loading @@ -48,33 +48,38 @@ public: virtual ~CountMetricProducer(); void onConditionChanged(const bool conditionMet, const uint64_t eventTime) override; void finish() override; void flushIfNeeded(const uint64_t newEventTime) override; // TODO: Pass a timestamp as a parameter in onDumpReport. std::unique_ptr<std::vector<uint8_t>> onDumpReport() override; void onSlicedConditionMayChange(const uint64_t eventTime) override; size_t byteSize() const override; // TODO: Implement this later. virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{}; // TODO: Implement this later. virtual void notifyAppRemoved(const string& apk, const int uid) override{}; protected: void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey, const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) override; void startNewProtoOutputStream(long long timestamp) override; void onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) override; private: // TODO: Pass a timestamp as a parameter in onDumpReport. std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override; // Internal interface to handle condition change. void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; // Internal interface to handle sliced condition change. void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override; // Internal function to calculate the current used bytes. size_t byteSizeLocked() const override; // Util function to flush the old packet. void flushIfNeededLocked(const uint64_t& newEventTime); // Util function to init/reset the proto output stream. void startNewProtoOutputStreamLocked(long long timestamp); const CountMetric mMetric; // TODO: Add a lock to mPastBuckets. Loading @@ -85,7 +90,7 @@ private: static const size_t kBucketSize = sizeof(CountBucket{}); bool hitGuardRail(const HashableDimensionKey& newKey); bool hitGuardRailLocked(const HashableDimensionKey& newKey); FRIEND_TEST(CountMetricProducerTest, TestNonDimensionalEvents); FRIEND_TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition); Loading cmds/statsd/src/metrics/DurationMetricProducer.cpp +18 −17 Original line number Diff line number Diff line Loading @@ -93,7 +93,7 @@ DurationMetricProducer::DurationMetricProducer(const ConfigKey& key, const Durat mConditionSliced = true; } startNewProtoOutputStream(mStartTimeNs); startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); Loading @@ -103,7 +103,7 @@ DurationMetricProducer::~DurationMetricProducer() { VLOG("~DurationMetric() called"); } void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -111,7 +111,7 @@ void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { } unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) { const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const { switch (mMetric.aggregation_type()) { case DurationMetric_AggregationType_SUM: return make_unique<OringDurationTracker>( Loading @@ -129,19 +129,20 @@ void DurationMetricProducer::finish() { // DropboxWriter. } void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); flushIfNeeded(eventTime); flushIfNeededLocked(eventTime); // Now for each of the on-going event, check if the condition has changed for them. for (auto& pair : mCurrentSlicedDuration) { pair.second->onSlicedConditionMayChange(eventTime); } } void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { void DurationMetricProducer::onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); mCondition = conditionMet; flushIfNeeded(eventTime); flushIfNeededLocked(eventTime); // TODO: need to populate the condition change time from the event which triggers the condition // change, instead of using current time. for (auto& pair : mCurrentSlicedDuration) { Loading @@ -149,13 +150,13 @@ void DurationMetricProducer::onConditionChanged(const bool conditionMet, const u } } std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked() { long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); flushIfNeededLocked(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& pair : mPastBuckets) { Loading Loading @@ -214,13 +215,13 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(); startNewProtoOutputStreamLocked(endTime); // TODO: Properly clear the old buckets. return buffer; } void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) { if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) { return; } Loading @@ -239,7 +240,7 @@ void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { mCurrentBucketNum += numBucketsForward; } bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { bool DurationMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) { // the key is not new, we are good. if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) { return false; Loading @@ -259,11 +260,11 @@ bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { return false; } void DurationMetricProducer::onMatchedLogEventInternal( void DurationMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const map<string, HashableDimensionKey>& conditionKeys, bool condition, const LogEvent& event, bool scheduledPull) { flushIfNeeded(event.GetTimestampNs()); flushIfNeededLocked(event.GetTimestampNs()); if (matcherIndex == mStopAllIndex) { for (auto& pair : mCurrentSlicedDuration) { Loading @@ -275,7 +276,7 @@ void DurationMetricProducer::onMatchedLogEventInternal( HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension)); if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) { if (hitGuardRail(eventKey)) { if (hitGuardRailLocked(eventKey)) { return; } mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]); Loading @@ -290,7 +291,7 @@ void DurationMetricProducer::onMatchedLogEventInternal( } } size_t DurationMetricProducer::byteSize() const { size_t DurationMetricProducer::byteSizeLocked() const { size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; Loading Loading
cmds/statsd/Android.mk +1 −0 Original line number Diff line number Diff line Loading @@ -167,6 +167,7 @@ LOCAL_SRC_FILES := \ tests/metrics/OringDurationTracker_test.cpp \ tests/metrics/MaxDurationTracker_test.cpp \ tests/metrics/CountMetricProducer_test.cpp \ tests/metrics/DurationMetricProducer_test.cpp \ tests/metrics/EventMetricProducer_test.cpp \ tests/metrics/ValueMetricProducer_test.cpp \ tests/guardrail/StatsdStats_test.cpp Loading
cmds/statsd/src/StatsLogProcessor.h +1 −1 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ public: const std::function<void(const ConfigKey&)>& sendBroadcast); virtual ~StatsLogProcessor(); virtual void OnLogEvent(const LogEvent& event); void OnLogEvent(const LogEvent& event); void OnConfigUpdated(const ConfigKey& key, const StatsdConfig& config); void OnConfigRemoved(const ConfigKey& key); Loading
cmds/statsd/src/metrics/CountMetricProducer.cpp +16 −14 Original line number Diff line number Diff line Loading @@ -83,7 +83,7 @@ CountMetricProducer::CountMetricProducer(const ConfigKey& key, const CountMetric mConditionSliced = true; } startNewProtoOutputStream(mStartTimeNs); startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); Loading @@ -93,7 +93,7 @@ CountMetricProducer::~CountMetricProducer() { VLOG("~CountMetricProducer() called"); } void CountMetricProducer::startNewProtoOutputStream(long long startTime) { void CountMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -103,17 +103,17 @@ void CountMetricProducer::startNewProtoOutputStream(long long startTime) { void CountMetricProducer::finish() { } void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { void CountMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); } std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReportLocked() { long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); flushIfNeededLocked(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& counter : mPastBuckets) { Loading Loading @@ -165,9 +165,9 @@ std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { (long long)mCurrentBucketStartTimeNs); VLOG("metric %s dump report now...", mMetric.name().c_str()); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(); startNewProtoOutputStream(endTime); startNewProtoOutputStreamLocked(endTime); mPastBuckets.clear(); return buffer; Loading @@ -175,12 +175,13 @@ std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { // TODO: Clear mDimensionKeyMap once the report is dumped. } void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { void CountMetricProducer::onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); mCondition = conditionMet; } bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { bool CountMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) { if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) { return false; } Loading @@ -200,13 +201,14 @@ bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { return false; } void CountMetricProducer::onMatchedLogEventInternal( void CountMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const map<string, HashableDimensionKey>& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) { uint64_t eventTimeNs = event.GetTimestampNs(); flushIfNeeded(eventTimeNs); flushIfNeededLocked(eventTimeNs); if (condition == false) { return; Loading @@ -216,7 +218,7 @@ void CountMetricProducer::onMatchedLogEventInternal( if (it == mCurrentSlicedCounter->end()) { // ===========GuardRail============== if (hitGuardRail(eventKey)) { if (hitGuardRailLocked(eventKey)) { return; } Loading @@ -239,7 +241,7 @@ void CountMetricProducer::onMatchedLogEventInternal( // When a new matched event comes in, we check if event falls into the current // bucket. If not, flush the old counter to past buckets and initialize the new bucket. void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { void CountMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { return; } Loading Loading @@ -272,7 +274,7 @@ void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { // Rough estimate of CountMetricProducer buffer stored. This number will be // greater than actual data size as it contains each dimension of // CountMetricData is duplicated. size_t CountMetricProducer::byteSize() const { size_t CountMetricProducer::byteSizeLocked() const { size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; Loading
cmds/statsd/src/metrics/CountMetricProducer.h +23 −18 Original line number Diff line number Diff line Loading @@ -48,33 +48,38 @@ public: virtual ~CountMetricProducer(); void onConditionChanged(const bool conditionMet, const uint64_t eventTime) override; void finish() override; void flushIfNeeded(const uint64_t newEventTime) override; // TODO: Pass a timestamp as a parameter in onDumpReport. std::unique_ptr<std::vector<uint8_t>> onDumpReport() override; void onSlicedConditionMayChange(const uint64_t eventTime) override; size_t byteSize() const override; // TODO: Implement this later. virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{}; // TODO: Implement this later. virtual void notifyAppRemoved(const string& apk, const int uid) override{}; protected: void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey, const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) override; void startNewProtoOutputStream(long long timestamp) override; void onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) override; private: // TODO: Pass a timestamp as a parameter in onDumpReport. std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override; // Internal interface to handle condition change. void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; // Internal interface to handle sliced condition change. void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override; // Internal function to calculate the current used bytes. size_t byteSizeLocked() const override; // Util function to flush the old packet. void flushIfNeededLocked(const uint64_t& newEventTime); // Util function to init/reset the proto output stream. void startNewProtoOutputStreamLocked(long long timestamp); const CountMetric mMetric; // TODO: Add a lock to mPastBuckets. Loading @@ -85,7 +90,7 @@ private: static const size_t kBucketSize = sizeof(CountBucket{}); bool hitGuardRail(const HashableDimensionKey& newKey); bool hitGuardRailLocked(const HashableDimensionKey& newKey); FRIEND_TEST(CountMetricProducerTest, TestNonDimensionalEvents); FRIEND_TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition); Loading
cmds/statsd/src/metrics/DurationMetricProducer.cpp +18 −17 Original line number Diff line number Diff line Loading @@ -93,7 +93,7 @@ DurationMetricProducer::DurationMetricProducer(const ConfigKey& key, const Durat mConditionSliced = true; } startNewProtoOutputStream(mStartTimeNs); startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); Loading @@ -103,7 +103,7 @@ DurationMetricProducer::~DurationMetricProducer() { VLOG("~DurationMetric() called"); } void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -111,7 +111,7 @@ void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { } unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) { const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const { switch (mMetric.aggregation_type()) { case DurationMetric_AggregationType_SUM: return make_unique<OringDurationTracker>( Loading @@ -129,19 +129,20 @@ void DurationMetricProducer::finish() { // DropboxWriter. } void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); flushIfNeeded(eventTime); flushIfNeededLocked(eventTime); // Now for each of the on-going event, check if the condition has changed for them. for (auto& pair : mCurrentSlicedDuration) { pair.second->onSlicedConditionMayChange(eventTime); } } void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { void DurationMetricProducer::onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); mCondition = conditionMet; flushIfNeeded(eventTime); flushIfNeededLocked(eventTime); // TODO: need to populate the condition change time from the event which triggers the condition // change, instead of using current time. for (auto& pair : mCurrentSlicedDuration) { Loading @@ -149,13 +150,13 @@ void DurationMetricProducer::onConditionChanged(const bool conditionMet, const u } } std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked() { long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); flushIfNeededLocked(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& pair : mPastBuckets) { Loading Loading @@ -214,13 +215,13 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(); startNewProtoOutputStreamLocked(endTime); // TODO: Properly clear the old buckets. return buffer; } void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) { if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) { return; } Loading @@ -239,7 +240,7 @@ void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { mCurrentBucketNum += numBucketsForward; } bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { bool DurationMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) { // the key is not new, we are good. if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) { return false; Loading @@ -259,11 +260,11 @@ bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { return false; } void DurationMetricProducer::onMatchedLogEventInternal( void DurationMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const map<string, HashableDimensionKey>& conditionKeys, bool condition, const LogEvent& event, bool scheduledPull) { flushIfNeeded(event.GetTimestampNs()); flushIfNeededLocked(event.GetTimestampNs()); if (matcherIndex == mStopAllIndex) { for (auto& pair : mCurrentSlicedDuration) { Loading @@ -275,7 +276,7 @@ void DurationMetricProducer::onMatchedLogEventInternal( HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension)); if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) { if (hitGuardRail(eventKey)) { if (hitGuardRailLocked(eventKey)) { return; } mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]); Loading @@ -290,7 +291,7 @@ void DurationMetricProducer::onMatchedLogEventInternal( } } size_t DurationMetricProducer::byteSize() const { size_t DurationMetricProducer::byteSizeLocked() const { size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; Loading