Loading cmds/statsd/src/metrics/CountMetricProducer.cpp +31 −43 Original line number Original line Diff line number Diff line Loading @@ -94,8 +94,6 @@ CountMetricProducer::~CountMetricProducer() { } } void CountMetricProducer::startNewProtoOutputStream(long long startTime) { void CountMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -109,8 +107,13 @@ void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); } } void CountMetricProducer::serializeBuckets() { std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& counter : mPastBuckets) { for (const auto& counter : mPastBuckets) { Loading Loading @@ -156,40 +159,28 @@ void CountMetricProducer::serializeBuckets() { } } mProto->end(wrapperToken); mProto->end(wrapperToken); } } mProto->end(mProtoToken); mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); (long long)mCurrentBucketStartTimeNs); mPastBuckets.clear(); // TODO: Clear mDimensionKeyMap once the report is dumped. } std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; VLOG("metric %s dump report now...", mMetric.name().c_str()); VLOG("metric %s dump report now...", mMetric.name().c_str()); // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); // TODO(yanglu): merge these three functions to one to avoid three locks. serializeBuckets(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); startNewProtoOutputStream(endTime); mPastBuckets.clear(); return buffer; return buffer; // TODO: Clear mDimensionKeyMap once the report is dumped. } } void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mCondition = conditionMet; mCondition = conditionMet; } } bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) { if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) { return false; return false; } } Loading Loading @@ -217,40 +208,38 @@ void CountMetricProducer::onMatchedLogEventInternal( flushIfNeeded(eventTimeNs); flushIfNeeded(eventTimeNs); // ===========GuardRail============== if (hitGuardRail(eventKey)) { return; } // TODO(yanglu): move the following logic to a seperate function to make it lockable. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (condition == false) { if (condition == false) { return; return; } } auto it = mCurrentSlicedCounter->find(eventKey); auto it = mCurrentSlicedCounter->find(eventKey); if (it == mCurrentSlicedCounter->end()) { if (it == mCurrentSlicedCounter->end()) { // ===========GuardRail============== if (hitGuardRail(eventKey)) { return; } // create a counter for the new key // create a counter for the new key mCurrentSlicedCounter->insert({eventKey, 1}); (*mCurrentSlicedCounter)[eventKey] = 1; } else { } else { // increment the existing value // increment the existing value auto& count = it->second; auto& count = it->second; count++; count++; } } const int64_t& count = mCurrentSlicedCounter->find(eventKey)->second; for (auto& tracker : mAnomalyTrackers) { for (auto& tracker : mAnomalyTrackers) { tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, count); tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, } mCurrentSlicedCounter->find(eventKey)->second); VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(count)); } } VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(*mCurrentSlicedCounter)[eventKey]); } } // When a new matched event comes in, we check if event falls into the current // When a new matched event comes in, we check if event falls into the current // bucket. If not, flush the old counter to past buckets and initialize the new bucket. // bucket. If not, flush the old counter to past buckets and initialize the new bucket. void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { return; return; } } Loading Loading @@ -284,7 +273,6 @@ void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { // greater than actual data size as it contains each dimension of // greater than actual data size as it contains each dimension of // CountMetricData is duplicated. // CountMetricData is duplicated. size_t CountMetricProducer::byteSize() const { size_t CountMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); size_t totalSize = 0; size_t totalSize = 0; for (const auto& pair : mPastBuckets) { for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; totalSize += pair.second.size() * kBucketSize; Loading cmds/statsd/src/metrics/CountMetricProducer.h +0 −2 Original line number Original line Diff line number Diff line Loading @@ -75,8 +75,6 @@ protected: void startNewProtoOutputStream(long long timestamp) override; void startNewProtoOutputStream(long long timestamp) override; private: private: void serializeBuckets(); const CountMetric mMetric; const CountMetric mMetric; // TODO: Add a lock to mPastBuckets. // TODO: Add a lock to mPastBuckets. Loading cmds/statsd/src/metrics/DurationMetricProducer.cpp +27 −45 Original line number Original line Diff line number Diff line Loading @@ -104,7 +104,6 @@ DurationMetricProducer::~DurationMetricProducer() { } } void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -112,7 +111,7 @@ void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { } } unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const { const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) { switch (mMetric.aggregation_type()) { switch (mMetric.aggregation_type()) { case DurationMetric_AggregationType_SUM: case DurationMetric_AggregationType_SUM: return make_unique<OringDurationTracker>( return make_unique<OringDurationTracker>( Loading @@ -131,7 +130,6 @@ void DurationMetricProducer::finish() { } } void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); flushIfNeeded(eventTime); flushIfNeeded(eventTime); // Now for each of the on-going event, check if the condition has changed for them. // Now for each of the on-going event, check if the condition has changed for them. Loading @@ -141,7 +139,6 @@ void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime } } void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); mCondition = conditionMet; mCondition = conditionMet; flushIfNeeded(eventTime); flushIfNeeded(eventTime); Loading @@ -152,8 +149,15 @@ void DurationMetricProducer::onConditionChanged(const bool conditionMet, const u } } } } void DurationMetricProducer::SerializeBuckets() { std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& pair : mPastBuckets) { for (const auto& pair : mPastBuckets) { const HashableDimensionKey& hashableKey = pair.first; const HashableDimensionKey& hashableKey = pair.first; VLOG(" dimension key %s", hashableKey.c_str()); VLOG(" dimension key %s", hashableKey.c_str()); Loading Loading @@ -210,29 +214,13 @@ void DurationMetricProducer::SerializeBuckets() { mProto->end(mProtoToken); mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); (long long)mCurrentBucketStartTimeNs); } std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { VLOG("metric %s dump report now...", mMetric.name().c_str()); long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); SerializeBuckets(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); startNewProtoOutputStream(endTime); // TODO: Properly clear the old buckets. // TODO: Properly clear the old buckets. return buffer; return buffer; } } void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) { if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) { return; return; } } Loading @@ -252,7 +240,6 @@ void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { } } bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); // the key is not new, we are good. // the key is not new, we are good. if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) { if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) { return false; return false; Loading @@ -278,9 +265,6 @@ void DurationMetricProducer::onMatchedLogEventInternal( const LogEvent& event, bool scheduledPull) { const LogEvent& event, bool scheduledPull) { flushIfNeeded(event.GetTimestampNs()); flushIfNeeded(event.GetTimestampNs()); // TODO(yanglu): move the following logic to a seperate function to make it lockable. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (matcherIndex == mStopAllIndex) { if (matcherIndex == mStopAllIndex) { for (auto& pair : mCurrentSlicedDuration) { for (auto& pair : mCurrentSlicedDuration) { pair.second->noteStopAll(event.GetTimestampNs()); pair.second->noteStopAll(event.GetTimestampNs()); Loading @@ -294,9 +278,9 @@ void DurationMetricProducer::onMatchedLogEventInternal( if (hitGuardRail(eventKey)) { if (hitGuardRail(eventKey)) { return; return; } } mCurrentSlicedDuration[eventKey] = mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]); createDurationTracker(eventKey, mPastBuckets[eventKey]); } } auto it = mCurrentSlicedDuration.find(eventKey); auto it = mCurrentSlicedDuration.find(eventKey); if (matcherIndex == mStartIndex) { if (matcherIndex == mStartIndex) { Loading @@ -305,10 +289,8 @@ void DurationMetricProducer::onMatchedLogEventInternal( it->second->noteStop(atomKey, event.GetTimestampNs(), false); it->second->noteStop(atomKey, event.GetTimestampNs(), false); } } } } } size_t DurationMetricProducer::byteSize() const { size_t DurationMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); size_t totalSize = 0; size_t totalSize = 0; for (const auto& pair : mPastBuckets) { for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; totalSize += pair.second.size() * kBucketSize; Loading cmds/statsd/src/metrics/DurationMetricProducer.h +2 −4 Original line number Original line Diff line number Diff line Loading @@ -71,8 +71,6 @@ protected: void startNewProtoOutputStream(long long timestamp) override; void startNewProtoOutputStream(long long timestamp) override; private: private: void SerializeBuckets(); const DurationMetric mMetric; const DurationMetric mMetric; // Index of the SimpleLogEntryMatcher which defines the start. // Index of the SimpleLogEntryMatcher which defines the start. Loading @@ -98,8 +96,8 @@ private: std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>> std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>> mCurrentSlicedDuration; mCurrentSlicedDuration; std::unique_ptr<DurationTracker> createDurationTracker( std::unique_ptr<DurationTracker> createDurationTracker(const HashableDimensionKey& eventKey, const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const; std::vector<DurationBucket>& bucket); bool hitGuardRail(const HashableDimensionKey& newKey); bool hitGuardRail(const HashableDimensionKey& newKey); static const size_t kBucketSize = sizeof(DurationBucket{}); static const size_t kBucketSize = sizeof(DurationBucket{}); Loading cmds/statsd/src/metrics/EventMetricProducer.cpp +4 −13 Original line number Original line Diff line number Diff line Loading @@ -73,7 +73,6 @@ EventMetricProducer::~EventMetricProducer() { } } void EventMetricProducer::startNewProtoOutputStream(long long startTime) { void EventMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); mProto = std::make_unique<ProtoOutputStream>(); // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData, // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData, // and StatsEvent. // and StatsEvent. Loading @@ -90,16 +89,11 @@ void EventMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() { std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; long long endTime = time(nullptr) * NS_PER_SEC; // TODO(yanglu): make this section to an util function. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto->end(mProtoToken); mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime); size_t bufferSize = mProto->size(); size_t bufferSize = mProto->size(); VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize); VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize); } std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); startNewProtoOutputStream(endTime); Loading @@ -109,7 +103,6 @@ std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() { void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mCondition = conditionMet; mCondition = conditionMet; } } Loading @@ -117,7 +110,6 @@ void EventMetricProducer::onMatchedLogEventInternal( const size_t matcherIndex, const HashableDimensionKey& eventKey, const size_t matcherIndex, const HashableDimensionKey& eventKey, const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition, const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) { const LogEvent& event, bool scheduledPull) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (!condition) { if (!condition) { return; return; } } Loading @@ -132,7 +124,6 @@ void EventMetricProducer::onMatchedLogEventInternal( } } size_t EventMetricProducer::byteSize() const { size_t EventMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); return mProto->bytesWritten(); return mProto->bytesWritten(); } } Loading Loading
cmds/statsd/src/metrics/CountMetricProducer.cpp +31 −43 Original line number Original line Diff line number Diff line Loading @@ -94,8 +94,6 @@ CountMetricProducer::~CountMetricProducer() { } } void CountMetricProducer::startNewProtoOutputStream(long long startTime) { void CountMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -109,8 +107,13 @@ void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); } } void CountMetricProducer::serializeBuckets() { std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& counter : mPastBuckets) { for (const auto& counter : mPastBuckets) { Loading Loading @@ -156,40 +159,28 @@ void CountMetricProducer::serializeBuckets() { } } mProto->end(wrapperToken); mProto->end(wrapperToken); } } mProto->end(mProtoToken); mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); (long long)mCurrentBucketStartTimeNs); mPastBuckets.clear(); // TODO: Clear mDimensionKeyMap once the report is dumped. } std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; VLOG("metric %s dump report now...", mMetric.name().c_str()); VLOG("metric %s dump report now...", mMetric.name().c_str()); // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); // TODO(yanglu): merge these three functions to one to avoid three locks. serializeBuckets(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); startNewProtoOutputStream(endTime); mPastBuckets.clear(); return buffer; return buffer; // TODO: Clear mDimensionKeyMap once the report is dumped. } } void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mCondition = conditionMet; mCondition = conditionMet; } } bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) { if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) { return false; return false; } } Loading Loading @@ -217,40 +208,38 @@ void CountMetricProducer::onMatchedLogEventInternal( flushIfNeeded(eventTimeNs); flushIfNeeded(eventTimeNs); // ===========GuardRail============== if (hitGuardRail(eventKey)) { return; } // TODO(yanglu): move the following logic to a seperate function to make it lockable. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (condition == false) { if (condition == false) { return; return; } } auto it = mCurrentSlicedCounter->find(eventKey); auto it = mCurrentSlicedCounter->find(eventKey); if (it == mCurrentSlicedCounter->end()) { if (it == mCurrentSlicedCounter->end()) { // ===========GuardRail============== if (hitGuardRail(eventKey)) { return; } // create a counter for the new key // create a counter for the new key mCurrentSlicedCounter->insert({eventKey, 1}); (*mCurrentSlicedCounter)[eventKey] = 1; } else { } else { // increment the existing value // increment the existing value auto& count = it->second; auto& count = it->second; count++; count++; } } const int64_t& count = mCurrentSlicedCounter->find(eventKey)->second; for (auto& tracker : mAnomalyTrackers) { for (auto& tracker : mAnomalyTrackers) { tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, count); tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, } mCurrentSlicedCounter->find(eventKey)->second); VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(count)); } } VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(*mCurrentSlicedCounter)[eventKey]); } } // When a new matched event comes in, we check if event falls into the current // When a new matched event comes in, we check if event falls into the current // bucket. If not, flush the old counter to past buckets and initialize the new bucket. // bucket. If not, flush the old counter to past buckets and initialize the new bucket. void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { return; return; } } Loading Loading @@ -284,7 +273,6 @@ void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { // greater than actual data size as it contains each dimension of // greater than actual data size as it contains each dimension of // CountMetricData is duplicated. // CountMetricData is duplicated. size_t CountMetricProducer::byteSize() const { size_t CountMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); size_t totalSize = 0; size_t totalSize = 0; for (const auto& pair : mPastBuckets) { for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; totalSize += pair.second.size() * kBucketSize; Loading
cmds/statsd/src/metrics/CountMetricProducer.h +0 −2 Original line number Original line Diff line number Diff line Loading @@ -75,8 +75,6 @@ protected: void startNewProtoOutputStream(long long timestamp) override; void startNewProtoOutputStream(long long timestamp) override; private: private: void serializeBuckets(); const CountMetric mMetric; const CountMetric mMetric; // TODO: Add a lock to mPastBuckets. // TODO: Add a lock to mPastBuckets. Loading
cmds/statsd/src/metrics/DurationMetricProducer.cpp +27 −45 Original line number Original line Diff line number Diff line Loading @@ -104,7 +104,6 @@ DurationMetricProducer::~DurationMetricProducer() { } } void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); Loading @@ -112,7 +111,7 @@ void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { } } unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const { const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) { switch (mMetric.aggregation_type()) { switch (mMetric.aggregation_type()) { case DurationMetric_AggregationType_SUM: case DurationMetric_AggregationType_SUM: return make_unique<OringDurationTracker>( return make_unique<OringDurationTracker>( Loading @@ -131,7 +130,6 @@ void DurationMetricProducer::finish() { } } void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); flushIfNeeded(eventTime); flushIfNeeded(eventTime); // Now for each of the on-going event, check if the condition has changed for them. // Now for each of the on-going event, check if the condition has changed for them. Loading @@ -141,7 +139,6 @@ void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime } } void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); mCondition = conditionMet; mCondition = conditionMet; flushIfNeeded(eventTime); flushIfNeeded(eventTime); Loading @@ -152,8 +149,15 @@ void DurationMetricProducer::onConditionChanged(const bool conditionMet, const u } } } } void DurationMetricProducer::SerializeBuckets() { std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& pair : mPastBuckets) { for (const auto& pair : mPastBuckets) { const HashableDimensionKey& hashableKey = pair.first; const HashableDimensionKey& hashableKey = pair.first; VLOG(" dimension key %s", hashableKey.c_str()); VLOG(" dimension key %s", hashableKey.c_str()); Loading Loading @@ -210,29 +214,13 @@ void DurationMetricProducer::SerializeBuckets() { mProto->end(mProtoToken); mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); (long long)mCurrentBucketStartTimeNs); } std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() { VLOG("metric %s dump report now...", mMetric.name().c_str()); long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeeded(endTime); SerializeBuckets(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); startNewProtoOutputStream(endTime); // TODO: Properly clear the old buckets. // TODO: Properly clear the old buckets. return buffer; return buffer; } } void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) { if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) { return; return; } } Loading @@ -252,7 +240,6 @@ void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { } } bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); // the key is not new, we are good. // the key is not new, we are good. if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) { if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) { return false; return false; Loading @@ -278,9 +265,6 @@ void DurationMetricProducer::onMatchedLogEventInternal( const LogEvent& event, bool scheduledPull) { const LogEvent& event, bool scheduledPull) { flushIfNeeded(event.GetTimestampNs()); flushIfNeeded(event.GetTimestampNs()); // TODO(yanglu): move the following logic to a seperate function to make it lockable. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (matcherIndex == mStopAllIndex) { if (matcherIndex == mStopAllIndex) { for (auto& pair : mCurrentSlicedDuration) { for (auto& pair : mCurrentSlicedDuration) { pair.second->noteStopAll(event.GetTimestampNs()); pair.second->noteStopAll(event.GetTimestampNs()); Loading @@ -294,9 +278,9 @@ void DurationMetricProducer::onMatchedLogEventInternal( if (hitGuardRail(eventKey)) { if (hitGuardRail(eventKey)) { return; return; } } mCurrentSlicedDuration[eventKey] = mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]); createDurationTracker(eventKey, mPastBuckets[eventKey]); } } auto it = mCurrentSlicedDuration.find(eventKey); auto it = mCurrentSlicedDuration.find(eventKey); if (matcherIndex == mStartIndex) { if (matcherIndex == mStartIndex) { Loading @@ -305,10 +289,8 @@ void DurationMetricProducer::onMatchedLogEventInternal( it->second->noteStop(atomKey, event.GetTimestampNs(), false); it->second->noteStop(atomKey, event.GetTimestampNs(), false); } } } } } size_t DurationMetricProducer::byteSize() const { size_t DurationMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); size_t totalSize = 0; size_t totalSize = 0; for (const auto& pair : mPastBuckets) { for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; totalSize += pair.second.size() * kBucketSize; Loading
cmds/statsd/src/metrics/DurationMetricProducer.h +2 −4 Original line number Original line Diff line number Diff line Loading @@ -71,8 +71,6 @@ protected: void startNewProtoOutputStream(long long timestamp) override; void startNewProtoOutputStream(long long timestamp) override; private: private: void SerializeBuckets(); const DurationMetric mMetric; const DurationMetric mMetric; // Index of the SimpleLogEntryMatcher which defines the start. // Index of the SimpleLogEntryMatcher which defines the start. Loading @@ -98,8 +96,8 @@ private: std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>> std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>> mCurrentSlicedDuration; mCurrentSlicedDuration; std::unique_ptr<DurationTracker> createDurationTracker( std::unique_ptr<DurationTracker> createDurationTracker(const HashableDimensionKey& eventKey, const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const; std::vector<DurationBucket>& bucket); bool hitGuardRail(const HashableDimensionKey& newKey); bool hitGuardRail(const HashableDimensionKey& newKey); static const size_t kBucketSize = sizeof(DurationBucket{}); static const size_t kBucketSize = sizeof(DurationBucket{}); Loading
cmds/statsd/src/metrics/EventMetricProducer.cpp +4 −13 Original line number Original line Diff line number Diff line Loading @@ -73,7 +73,6 @@ EventMetricProducer::~EventMetricProducer() { } } void EventMetricProducer::startNewProtoOutputStream(long long startTime) { void EventMetricProducer::startNewProtoOutputStream(long long startTime) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto = std::make_unique<ProtoOutputStream>(); mProto = std::make_unique<ProtoOutputStream>(); // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData, // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData, // and StatsEvent. // and StatsEvent. Loading @@ -90,16 +89,11 @@ void EventMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() { std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() { long long endTime = time(nullptr) * NS_PER_SEC; long long endTime = time(nullptr) * NS_PER_SEC; // TODO(yanglu): make this section to an util function. { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mProto->end(mProtoToken); mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime); size_t bufferSize = mProto->size(); size_t bufferSize = mProto->size(); VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize); VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize); } std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); startNewProtoOutputStream(endTime); startNewProtoOutputStream(endTime); Loading @@ -109,7 +103,6 @@ std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() { void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); mCondition = conditionMet; mCondition = conditionMet; } } Loading @@ -117,7 +110,6 @@ void EventMetricProducer::onMatchedLogEventInternal( const size_t matcherIndex, const HashableDimensionKey& eventKey, const size_t matcherIndex, const HashableDimensionKey& eventKey, const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition, const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) { const LogEvent& event, bool scheduledPull) { std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex); if (!condition) { if (!condition) { return; return; } } Loading @@ -132,7 +124,6 @@ void EventMetricProducer::onMatchedLogEventInternal( } } size_t EventMetricProducer::byteSize() const { size_t EventMetricProducer::byteSize() const { std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex); return mProto->bytesWritten(); return mProto->bytesWritten(); } } Loading