Loading cmds/statsd/src/metrics/CountMetricProducer.cpp +43 −31 Original line number Diff line number Diff line Loading @@ -94,6 +94,8 @@ CountMetricProducer::~CountMetricProducer() { } void CountMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -107,13 +109,8 @@ void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); } std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); void CountMetricProducer::serializeBuckets() { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& counter : mPastBuckets) { Loading Loading @@ -159,28 +156,40 @@ std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { } mProto->end(wrapperToken); } mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); mPastBuckets.clear(); // TODO: Clear mDimensionKeyMap once the report is dumped. } std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; VLOG("metric %s dump report now...", mMetric.name().c_str()); // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); // TODO(yanglu): merge these three functions to one to avoid three locks. serializeBuckets(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); mPastBuckets.clear(); return buffer; // TODO: Clear mDimensionKeyMap once the report is dumped. } void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mCondition = conditionMet; } bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) { return false; } Loading Loading @@ -208,38 +217,40 @@ void CountMetricProducer::onMatchedLogEventInternal( flushIfNeeded(eventTimeNs); if (condition == false) { // ===========GuardRail============== if (hitGuardRail(eventKey)) { return; } auto it = mCurrentSlicedCounter->find(eventKey); if (it == mCurrentSlicedCounter->end()) { // ===========GuardRail============== if (hitGuardRail(eventKey)) { // TODO(yanglu): move the following logic to a seperate function to make it lockable. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (condition == false) { return; } auto it = mCurrentSlicedCounter->find(eventKey); if (it == mCurrentSlicedCounter->end()) { // create a counter for the new key (*mCurrentSlicedCounter)[eventKey] = 1; mCurrentSlicedCounter->insert({eventKey, 1}); } else { // increment the existing value auto& count = it->second; count++; } const int64_t& count = mCurrentSlicedCounter->find(eventKey)->second; for (auto& tracker : mAnomalyTrackers) { tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, mCurrentSlicedCounter->find(eventKey)->second); tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, count); } VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(count)); } VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(*mCurrentSlicedCounter)[eventKey]); } // When a new matched event comes in, we check if event falls into the current // bucket. If not, flush the old counter to past buckets and initialize the new bucket. void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { return; } Loading Loading @@ -273,6 +284,7 @@ void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { // greater than actual data size as it contains each dimension of // CountMetricData is duplicated. size_t CountMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; Loading cmds/statsd/src/metrics/CountMetricProducer.h +2 −0 Original line number Diff line number Diff line Loading @@ -75,6 +75,8 @@ protected: void startNewProtoOutputStream(long long timestamp) override; private: void serializeBuckets(); const CountMetric mMetric; // TODO: Add a lock to mPastBuckets. Loading cmds/statsd/src/metrics/DurationMetricProducer.cpp +45 −27 Original line number Diff line number Diff line Loading @@ -104,6 +104,7 @@ DurationMetricProducer::~DurationMetricProducer() { } void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -111,7 +112,7 @@ void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { } unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) { const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const { switch (mMetric.aggregation_type()) { case DurationMetric_AggregationType_SUM: return make_unique<OringDurationTracker>( Loading @@ -130,6 +131,7 @@ void DurationMetricProducer::finish() { } void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); flushIfNeeded(eventTime); // Now for each of the on-going event, check if the condition has changed for them. Loading @@ -139,6 +141,7 @@ void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime } void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); mCondition = conditionMet; flushIfNeeded(eventTime); Loading @@ -149,15 +152,8 @@ void DurationMetricProducer::onConditionChanged(const bool conditionMet, const u } } std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); void DurationMetricProducer::SerializeBuckets() { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); for (const auto& pair : mPastBuckets) { const HashableDimensionKey& hashableKey = pair.first; VLOG(" dimension key %s", hashableKey.c_str()); Loading Loading @@ -214,13 +210,29 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); } std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { VLOG("metric %s dump report now...", mMetric.name().c_str()); long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); SerializeBuckets(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); // TODO: Properly clear the old buckets. return buffer; } void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) { return; } Loading @@ -240,6 +252,7 @@ void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { } bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); // the key is not new, we are good. if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) { return false; Loading @@ -265,6 +278,9 @@ void DurationMetricProducer::onMatchedLogEventInternal( const LogEvent& event, bool scheduledPull) { flushIfNeeded(event.GetTimestampNs()); // TODO(yanglu): move the following logic to a seperate function to make it lockable. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (matcherIndex == mStopAllIndex) { for (auto& pair : mCurrentSlicedDuration) { pair.second->noteStopAll(event.GetTimestampNs()); Loading @@ -278,9 +294,9 @@ void DurationMetricProducer::onMatchedLogEventInternal( if (hitGuardRail(eventKey)) { return; } mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]); mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]); } auto it = mCurrentSlicedDuration.find(eventKey); if (matcherIndex == mStartIndex) { Loading @@ -289,8 +305,10 @@ void DurationMetricProducer::onMatchedLogEventInternal( it->second->noteStop(atomKey, event.GetTimestampNs(), false); } } } size_t DurationMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; Loading cmds/statsd/src/metrics/DurationMetricProducer.h +4 −2 Original line number Diff line number Diff line Loading @@ -71,6 +71,8 @@ protected: void startNewProtoOutputStream(long long timestamp) override; private: void SerializeBuckets(); const DurationMetric mMetric; // Index of the SimpleLogEntryMatcher which defines the start. Loading @@ -96,8 +98,8 @@ private: std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>> mCurrentSlicedDuration; std::unique_ptr<DurationTracker> createDurationTracker(const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket); std::unique_ptr<DurationTracker> createDurationTracker( const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const; bool hitGuardRail(const HashableDimensionKey& newKey); static const size_t kBucketSize = sizeof(DurationBucket{}); Loading cmds/statsd/src/metrics/EventMetricProducer.cpp +13 −4 Original line number Diff line number Diff line Loading @@ -73,6 +73,7 @@ EventMetricProducer::~EventMetricProducer() { } void EventMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData, // and StatsEvent. Loading @@ -89,11 +90,16 @@ void EventMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; // TODO(yanglu): make this section to an util function. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime); size_t bufferSize = mProto->size(); VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize); } std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); Loading @@ -103,6 +109,7 @@ std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() { void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mCondition = conditionMet; } Loading @@ -110,6 +117,7 @@ void EventMetricProducer::onMatchedLogEventInternal( const size_t matcherIndex, const HashableDimensionKey& eventKey, const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (!condition) { return; } Loading @@ -124,6 +132,7 @@ void EventMetricProducer::onMatchedLogEventInternal( } size_t EventMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); return mProto->bytesWritten(); } Loading Loading
cmds/statsd/src/metrics/CountMetricProducer.cpp +43 −31 Original line number Diff line number Diff line Loading @@ -94,6 +94,8 @@ CountMetricProducer::~CountMetricProducer() { } void CountMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -107,13 +109,8 @@ void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); } std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); void CountMetricProducer::serializeBuckets() { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& counter : mPastBuckets) { Loading Loading @@ -159,28 +156,40 @@ std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { } mProto->end(wrapperToken); } mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); mPastBuckets.clear(); // TODO: Clear mDimensionKeyMap once the report is dumped. } std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; VLOG("metric %s dump report now...", mMetric.name().c_str()); // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); // TODO(yanglu): merge these three functions to one to avoid three locks. serializeBuckets(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); mPastBuckets.clear(); return buffer; // TODO: Clear mDimensionKeyMap once the report is dumped. } void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mCondition = conditionMet; } bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) { return false; } Loading Loading @@ -208,38 +217,40 @@ void CountMetricProducer::onMatchedLogEventInternal( flushIfNeeded(eventTimeNs); if (condition == false) { // ===========GuardRail============== if (hitGuardRail(eventKey)) { return; } auto it = mCurrentSlicedCounter->find(eventKey); if (it == mCurrentSlicedCounter->end()) { // ===========GuardRail============== if (hitGuardRail(eventKey)) { // TODO(yanglu): move the following logic to a seperate function to make it lockable. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (condition == false) { return; } auto it = mCurrentSlicedCounter->find(eventKey); if (it == mCurrentSlicedCounter->end()) { // create a counter for the new key (*mCurrentSlicedCounter)[eventKey] = 1; mCurrentSlicedCounter->insert({eventKey, 1}); } else { // increment the existing value auto& count = it->second; count++; } const int64_t& count = mCurrentSlicedCounter->find(eventKey)->second; for (auto& tracker : mAnomalyTrackers) { tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, mCurrentSlicedCounter->find(eventKey)->second); tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, count); } VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(count)); } VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(*mCurrentSlicedCounter)[eventKey]); } // When a new matched event comes in, we check if event falls into the current // bucket. If not, flush the old counter to past buckets and initialize the new bucket. void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { return; } Loading Loading @@ -273,6 +284,7 @@ void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { // greater than actual data size as it contains each dimension of // CountMetricData is duplicated. size_t CountMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; Loading
cmds/statsd/src/metrics/CountMetricProducer.h +2 −0 Original line number Diff line number Diff line Loading @@ -75,6 +75,8 @@ protected: void startNewProtoOutputStream(long long timestamp) override; private: void serializeBuckets(); const CountMetric mMetric; // TODO: Add a lock to mPastBuckets. Loading
cmds/statsd/src/metrics/DurationMetricProducer.cpp +45 −27 Original line number Diff line number Diff line Loading @@ -104,6 +104,7 @@ DurationMetricProducer::~DurationMetricProducer() { } void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -111,7 +112,7 @@ void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { } unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) { const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const { switch (mMetric.aggregation_type()) { case DurationMetric_AggregationType_SUM: return make_unique<OringDurationTracker>( Loading @@ -130,6 +131,7 @@ void DurationMetricProducer::finish() { } void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); flushIfNeeded(eventTime); // Now for each of the on-going event, check if the condition has changed for them. Loading @@ -139,6 +141,7 @@ void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime } void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); mCondition = conditionMet; flushIfNeeded(eventTime); Loading @@ -149,15 +152,8 @@ void DurationMetricProducer::onConditionChanged(const bool conditionMet, const u } } std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); void DurationMetricProducer::SerializeBuckets() { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); for (const auto& pair : mPastBuckets) { const HashableDimensionKey& hashableKey = pair.first; VLOG(" dimension key %s", hashableKey.c_str()); Loading Loading @@ -214,13 +210,29 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); } std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { VLOG("metric %s dump report now...", mMetric.name().c_str()); long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); SerializeBuckets(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); // TODO: Properly clear the old buckets. return buffer; } void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) { return; } Loading @@ -240,6 +252,7 @@ void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { } bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); // the key is not new, we are good. if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) { return false; Loading @@ -265,6 +278,9 @@ void DurationMetricProducer::onMatchedLogEventInternal( const LogEvent& event, bool scheduledPull) { flushIfNeeded(event.GetTimestampNs()); // TODO(yanglu): move the following logic to a seperate function to make it lockable. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (matcherIndex == mStopAllIndex) { for (auto& pair : mCurrentSlicedDuration) { pair.second->noteStopAll(event.GetTimestampNs()); Loading @@ -278,9 +294,9 @@ void DurationMetricProducer::onMatchedLogEventInternal( if (hitGuardRail(eventKey)) { return; } mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]); mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]); } auto it = mCurrentSlicedDuration.find(eventKey); if (matcherIndex == mStartIndex) { Loading @@ -289,8 +305,10 @@ void DurationMetricProducer::onMatchedLogEventInternal( it->second->noteStop(atomKey, event.GetTimestampNs(), false); } } } size_t DurationMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; Loading
cmds/statsd/src/metrics/DurationMetricProducer.h +4 −2 Original line number Diff line number Diff line Loading @@ -71,6 +71,8 @@ protected: void startNewProtoOutputStream(long long timestamp) override; private: void SerializeBuckets(); const DurationMetric mMetric; // Index of the SimpleLogEntryMatcher which defines the start. Loading @@ -96,8 +98,8 @@ private: std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>> mCurrentSlicedDuration; std::unique_ptr<DurationTracker> createDurationTracker(const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket); std::unique_ptr<DurationTracker> createDurationTracker( const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const; bool hitGuardRail(const HashableDimensionKey& newKey); static const size_t kBucketSize = sizeof(DurationBucket{}); Loading
cmds/statsd/src/metrics/EventMetricProducer.cpp +13 −4 Original line number Diff line number Diff line Loading @@ -73,6 +73,7 @@ EventMetricProducer::~EventMetricProducer() { } void EventMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData, // and StatsEvent. Loading @@ -89,11 +90,16 @@ void EventMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; // TODO(yanglu): make this section to an util function. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime); size_t bufferSize = mProto->size(); VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize); } std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); Loading @@ -103,6 +109,7 @@ std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() { void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mCondition = conditionMet; } Loading @@ -110,6 +117,7 @@ void EventMetricProducer::onMatchedLogEventInternal( const size_t matcherIndex, const HashableDimensionKey& eventKey, const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (!condition) { return; } Loading @@ -124,6 +132,7 @@ void EventMetricProducer::onMatchedLogEventInternal( } size_t EventMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); return mProto->bytesWritten(); } Loading