Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 8de6939c authored by Yangster's avatar Yangster
Browse files

Thread-safe metric producers.

Test: unit test passed
Change-Id: Ie47404e8649b63ee8ac32e40189a47f6cb7a9def
parent d12e276f
Loading
Loading
Loading
Loading
+43 −31
Original line number Diff line number Diff line
@@ -96,6 +96,8 @@ CountMetricProducer::~CountMetricProducer() {
}

void CountMetricProducer::startNewProtoOutputStream(long long startTime) {
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);

    mProto = std::make_unique<ProtoOutputStream>();
    mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -109,13 +111,8 @@ void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
    VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}

std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
    long long endTime = time(nullptr) * NS_PER_SEC;

    // Dump current bucket if it's stale.
    // If current bucket is still on-going, don't force dump current bucket.
    // In finish(), We can force dump current bucket.
    flushIfNeeded(endTime);
void CountMetricProducer::serializeBuckets() {
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
    VLOG("metric %s dump report now...", mMetric.name().c_str());

    for (const auto& counter : mPastBuckets) {
@@ -161,28 +158,40 @@ std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
        }
        mProto->end(wrapperToken);
    }

    mProto->end(mProtoToken);
    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                  (long long)mCurrentBucketStartTimeNs);

    mPastBuckets.clear();
    // TODO: Clear mDimensionKeyMap once the report is dumped.
}

std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
    long long endTime = time(nullptr) * NS_PER_SEC;
    VLOG("metric %s dump report now...", mMetric.name().c_str());
    // Dump current bucket if it's stale.
    // If current bucket is still on-going, don't force dump current bucket.
    // In finish(), We can force dump current bucket.
    flushIfNeeded(endTime);

    // TODO(yanglu): merge these three functions to one to avoid three locks.
    serializeBuckets();

    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();

    startNewProtoOutputStream(endTime);
    mPastBuckets.clear();

    return buffer;

    // TODO: Clear mDimensionKeyMap once the report is dumped.
}

void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
    VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
    mCondition = conditionMet;
}

bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
    if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) {
        return false;
    }
@@ -210,38 +219,40 @@ void CountMetricProducer::onMatchedLogEventInternal(

    flushIfNeeded(eventTimeNs);

    if (condition == false) {
    // ===========GuardRail==============
    if (hitGuardRail(eventKey)) {
        return;
    }

    auto it = mCurrentSlicedCounter->find(eventKey);

    if (it == mCurrentSlicedCounter->end()) {
        // ===========GuardRail==============
        if (hitGuardRail(eventKey)) {
    // TODO(yanglu): move the following logic to a seperate function to make it lockable.
    {
        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
        if (condition == false) {
            return;
        }

        auto it = mCurrentSlicedCounter->find(eventKey);
        if (it == mCurrentSlicedCounter->end()) {
            // create a counter for the new key
        (*mCurrentSlicedCounter)[eventKey] = 1;
            mCurrentSlicedCounter->insert({eventKey, 1});
        } else {
            // increment the existing value
            auto& count = it->second;
            count++;
        }

        const int64_t& count = mCurrentSlicedCounter->find(eventKey)->second;
        for (auto& tracker : mAnomalyTrackers) {
        tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
                                         mCurrentSlicedCounter->find(eventKey)->second);
            tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, count);
        }
        VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(count));
    }

    VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(),
         (long long)(*mCurrentSlicedCounter)[eventKey]);
}

// When a new matched event comes in, we check if event falls into the current
// bucket. If not, flush the old counter to past buckets and initialize the new bucket.
void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);

    if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
        return;
    }
@@ -275,6 +286,7 @@ void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
// greater than actual data size as it contains each dimension of
// CountMetricData is  duplicated.
size_t CountMetricProducer::byteSize() const {
    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
    size_t totalSize = 0;
    for (const auto& pair : mPastBuckets) {
        totalSize += pair.second.size() * kBucketSize;
+2 −0
Original line number Diff line number Diff line
@@ -75,6 +75,8 @@ protected:
    void startNewProtoOutputStream(long long timestamp) override;

private:
    void serializeBuckets();

    const CountMetric mMetric;

    // TODO: Add a lock to mPastBuckets.
+45 −27
Original line number Diff line number Diff line
@@ -104,6 +104,7 @@ DurationMetricProducer::~DurationMetricProducer() {
}

void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
    mProto = std::make_unique<ProtoOutputStream>();
    mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,7 +112,7 @@ void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
}

unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
        const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) {
        const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const {
    switch (mMetric.aggregation_type()) {
        case DurationMetric_AggregationType_SUM:
            return make_unique<OringDurationTracker>(
@@ -130,6 +131,7 @@ void DurationMetricProducer::finish() {
}

void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
    VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
    flushIfNeeded(eventTime);
    // Now for each of the on-going event, check if the condition has changed for them.
@@ -139,6 +141,7 @@ void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime
}

void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
    VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
    mCondition = conditionMet;
    flushIfNeeded(eventTime);
@@ -149,15 +152,8 @@ void DurationMetricProducer::onConditionChanged(const bool conditionMet, const u
    }
}

std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
    long long endTime = time(nullptr) * NS_PER_SEC;

    // Dump current bucket if it's stale.
    // If current bucket is still on-going, don't force dump current bucket.
    // In finish(), We can force dump current bucket.
    flushIfNeeded(endTime);
    VLOG("metric %s dump report now...", mMetric.name().c_str());

void DurationMetricProducer::SerializeBuckets() {
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
    for (const auto& pair : mPastBuckets) {
        const HashableDimensionKey& hashableKey = pair.first;
        VLOG("  dimension key %s", hashableKey.c_str());
@@ -214,13 +210,29 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
    mProto->end(mProtoToken);
    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                  (long long)mCurrentBucketStartTimeNs);
}

std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
    VLOG("metric %s dump report now...", mMetric.name().c_str());

    long long endTime = time(nullptr) * NS_PER_SEC;

    // Dump current bucket if it's stale.
    // If current bucket is still on-going, don't force dump current bucket.
    // In finish(), We can force dump current bucket.
    flushIfNeeded(endTime);

    SerializeBuckets();

    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();

    startNewProtoOutputStream(endTime);
    // TODO: Properly clear the old buckets.
    return buffer;
}

void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
    if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
        return;
    }
@@ -240,6 +252,7 @@ void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
}

bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
    // the key is not new, we are good.
    if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) {
        return false;
@@ -265,6 +278,9 @@ void DurationMetricProducer::onMatchedLogEventInternal(
        const LogEvent& event, bool scheduledPull) {
    flushIfNeeded(event.GetTimestampNs());

    // TODO(yanglu): move the following logic to a seperate function to make it lockable.
    {
        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
        if (matcherIndex == mStopAllIndex) {
            for (auto& pair : mCurrentSlicedDuration) {
                pair.second->noteStopAll(event.GetTimestampNs());
@@ -278,9 +294,9 @@ void DurationMetricProducer::onMatchedLogEventInternal(
            if (hitGuardRail(eventKey)) {
                return;
            }
        mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]);
            mCurrentSlicedDuration[eventKey] =
                    createDurationTracker(eventKey, mPastBuckets[eventKey]);
        }

        auto it = mCurrentSlicedDuration.find(eventKey);

        if (matcherIndex == mStartIndex) {
@@ -289,8 +305,10 @@ void DurationMetricProducer::onMatchedLogEventInternal(
            it->second->noteStop(atomKey, event.GetTimestampNs(), false);
        }
    }
}

size_t DurationMetricProducer::byteSize() const {
    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
    size_t totalSize = 0;
    for (const auto& pair : mPastBuckets) {
        totalSize += pair.second.size() * kBucketSize;
+4 −2
Original line number Diff line number Diff line
@@ -71,6 +71,8 @@ protected:
    void startNewProtoOutputStream(long long timestamp) override;

private:
    void SerializeBuckets();

    const DurationMetric mMetric;

    // Index of the SimpleLogEntryMatcher which defines the start.
@@ -96,8 +98,8 @@ private:
    std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>>
            mCurrentSlicedDuration;

    std::unique_ptr<DurationTracker> createDurationTracker(const HashableDimensionKey& eventKey,
                                                           std::vector<DurationBucket>& bucket);
    std::unique_ptr<DurationTracker> createDurationTracker(
            const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const;
    bool hitGuardRail(const HashableDimensionKey& newKey);

    static const size_t kBucketSize = sizeof(DurationBucket{});
+13 −4
Original line number Diff line number Diff line
@@ -73,6 +73,7 @@ EventMetricProducer::~EventMetricProducer() {
}

void EventMetricProducer::startNewProtoOutputStream(long long startTime) {
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
    mProto = std::make_unique<ProtoOutputStream>();
    // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData,
    // and StatsEvent.
@@ -89,11 +90,16 @@ void EventMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {

std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() {
    long long endTime = time(nullptr) * NS_PER_SEC;
    // TODO(yanglu): make this section to an util function.
    {
        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
        mProto->end(mProtoToken);
        mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);

        size_t bufferSize = mProto->size();
        VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
    }

    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();

    startNewProtoOutputStream(endTime);
@@ -103,6 +109,7 @@ std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() {

void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
    VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
    mCondition = conditionMet;
}

@@ -110,6 +117,7 @@ void EventMetricProducer::onMatchedLogEventInternal(
        const size_t matcherIndex, const HashableDimensionKey& eventKey,
        const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
        const LogEvent& event, bool scheduledPull) {
    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
    if (!condition) {
        return;
    }
@@ -124,6 +132,7 @@ void EventMetricProducer::onMatchedLogEventInternal(
}

size_t EventMetricProducer::byteSize() const {
    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
    return mProto->bytesWritten();
}

Loading