Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 32134a20 authored by TreeHugger Robot's avatar TreeHugger Robot Committed by Android (Google) Code Review
Browse files

Merge "Partial buckets on app upgrade and fix duration."

parents fbde6988 27785a8a
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -100,7 +100,8 @@ private:
    FRIEND_TEST(StatsLogProcessorTest, TestRateLimitBroadcast);
    FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge);
    FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge);
    FRIEND_TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensions);
    FRIEND_TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensionsForSumDuration);
    FRIEND_TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensionsForMaxDuration);
    FRIEND_TEST(MetricConditionLinkE2eTest, TestMultiplePredicatesAndLinks);
    FRIEND_TEST(AttributionE2eTest, TestAttributionMatchAndSlice);
    FRIEND_TEST(GaugeMetricE2eTest, TestMultipleFieldsForPushedEvent);
+1 −3
Original line number Diff line number Diff line
@@ -35,9 +35,7 @@ namespace statsd {

// TODO: Get rid of bucketNumbers, and return to the original circular array method.
AnomalyTracker::AnomalyTracker(const Alert& alert, const ConfigKey& configKey)
    : mAlert(alert),
      mConfigKey(configKey),
      mNumOfPastBuckets(mAlert.num_buckets() - 1) {
    : mAlert(alert), mConfigKey(configKey), mNumOfPastBuckets(mAlert.num_buckets() - 1) {
    VLOG("AnomalyTracker() called");
    if (mAlert.num_buckets() <= 0) {
        ALOGE("Cannot create AnomalyTracker with %lld buckets",
+50 −17
Original line number Diff line number Diff line
@@ -170,7 +170,6 @@ void CountMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);

    mPastBuckets.clear();
    mStartTimeNs = mCurrentBucketStartTimeNs;

    // TODO: Clear mDimensionKeyMap once the report is dumped.
}
@@ -214,13 +213,11 @@ void CountMetricProducer::onMatchedLogEventInternalLocked(
    }

    auto it = mCurrentSlicedCounter->find(eventKey);

    if (it == mCurrentSlicedCounter->end()) {
        // ===========GuardRail==============
        if (hitGuardRailLocked(eventKey)) {
            return;
        }

        // create a counter for the new key
        (*mCurrentSlicedCounter)[eventKey] = 1;
    } else {
@@ -228,10 +225,14 @@ void CountMetricProducer::onMatchedLogEventInternalLocked(
        auto& count = it->second;
        count++;
    }

    for (auto& tracker : mAnomalyTrackers) {
        int64_t countWholeBucket = mCurrentSlicedCounter->find(eventKey)->second;
        auto prev = mCurrentFullCounters->find(eventKey);
        if (prev != mCurrentFullCounters->end()) {
            countWholeBucket += prev->second;
        }
        tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
                                         mCurrentSlicedCounter->find(eventKey)->second);
                                         countWholeBucket);
    }

    VLOG("metric %lld %s->%lld", (long long)mMetricId, eventKey.c_str(),
@@ -241,33 +242,65 @@ void CountMetricProducer::onMatchedLogEventInternalLocked(
// When a new matched event comes in, we check if event falls into the current
// bucket. If not, flush the old counter to past buckets and initialize the new bucket.
void CountMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
    if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
    uint64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
    if (eventTimeNs < currentBucketEndTimeNs) {
        return;
    }

    flushCurrentBucketLocked(eventTimeNs);
    // Setup the bucket start time and number.
    uint64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
    mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
    mCurrentBucketNum += numBucketsForward;
    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
         (long long)mCurrentBucketStartTimeNs);
}

void CountMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) {
    uint64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
    CountBucket info;
    info.mBucketStartNs = mCurrentBucketStartTimeNs;
    info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs;
    if (eventTimeNs < fullBucketEndTimeNs) {
        info.mBucketEndNs = eventTimeNs;
    } else {
        info.mBucketEndNs = fullBucketEndTimeNs;
    }
    info.mBucketNum = mCurrentBucketNum;
    for (const auto& counter : *mCurrentSlicedCounter) {
        info.mCount = counter.second;
        auto& bucketList = mPastBuckets[counter.first];
        bucketList.push_back(info);
        VLOG("metric %lld, dump key value: %s -> %lld",
            (long long)mMetricId, counter.first.c_str(), (long long)counter.second);
        VLOG("metric %lld, dump key value: %s -> %lld", (long long)mMetricId, counter.first.c_str(),
             (long long)counter.second);
    }

    // If we have finished a full bucket, then send this to anomaly tracker.
    if (eventTimeNs > fullBucketEndTimeNs) {
        // Accumulate partial buckets with current value and then send to anomaly tracker.
        if (mCurrentFullCounters->size() > 0) {
            for (const auto& keyValuePair : *mCurrentSlicedCounter) {
                (*mCurrentFullCounters)[keyValuePair.first] += keyValuePair.second;
            }
            for (auto& tracker : mAnomalyTrackers) {
                tracker->addPastBucket(mCurrentFullCounters, mCurrentBucketNum);
            }
            mCurrentFullCounters = std::make_shared<DimToValMap>();
        } else {
            // Skip aggregating the partial buckets since there's no previous partial bucket.
            for (auto& tracker : mAnomalyTrackers) {
                tracker->addPastBucket(mCurrentSlicedCounter, mCurrentBucketNum);
            }
        }
    } else {
        // Accumulate partial bucket.
        for (const auto& keyValuePair : *mCurrentSlicedCounter) {
            (*mCurrentFullCounters)[keyValuePair.first] += keyValuePair.second;
        }
    }

    // Reset counters (do not clear, since the old one is still referenced in mAnomalyTrackers).
    // Only resets the counters, but doesn't setup the times nor numbers.
    // (Do not clear since the old one is still referenced in mAnomalyTrackers).
    mCurrentSlicedCounter = std::make_shared<DimToValMap>();
    uint64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
    mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs;
    mCurrentBucketNum += numBucketsForward;
    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
         (long long)mCurrentBucketStartTimeNs);
}

// Rough estimate of CountMetricProducer buffer stored. This number will be
+10 −2
Original line number Diff line number Diff line
@@ -71,14 +71,20 @@ private:
    void dumpStatesLocked(FILE* out, bool verbose) const override{};

    // Util function to flush the old packet.
    void flushIfNeededLocked(const uint64_t& newEventTime);
    void flushIfNeededLocked(const uint64_t& newEventTime) override;

    void flushCurrentBucketLocked(const uint64_t& eventTimeNs) override;

    // TODO: Add a lock to mPastBuckets.
    std::unordered_map<MetricDimensionKey, std::vector<CountBucket>> mPastBuckets;

    // The current bucket.
    // The current bucket (may be a partial bucket).
    std::shared_ptr<DimToValMap> mCurrentSlicedCounter = std::make_shared<DimToValMap>();

    // The sum of previous partial buckets in the current full bucket (excluding the current
    // partial bucket). This is only updated while flushing the current bucket.
    std::shared_ptr<DimToValMap> mCurrentFullCounters = std::make_shared<DimToValMap>();

    static const size_t kBucketSize = sizeof(CountBucket{});

    bool hitGuardRailLocked(const MetricDimensionKey& newKey);
@@ -87,6 +93,8 @@ private:
    FRIEND_TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition);
    FRIEND_TEST(CountMetricProducerTest, TestEventsWithSlicedCondition);
    FRIEND_TEST(CountMetricProducerTest, TestAnomalyDetectionUnSliced);
    FRIEND_TEST(CountMetricProducerTest, TestEventWithAppUpgrade);
    FRIEND_TEST(CountMetricProducerTest, TestEventWithAppUpgradeInNextBucket);
};

}  // namespace statsd
+23 −10
Original line number Diff line number Diff line
@@ -121,13 +121,13 @@ unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
        case DurationMetric_AggregationType_SUM:
            return make_unique<OringDurationTracker>(
                    mConfigKey, mMetricId, eventKey, mWizard, mConditionTrackerIndex,
                    mDimensionsInCondition, mNested,
                    mCurrentBucketStartTimeNs, mBucketSizeNs, mConditionSliced, mAnomalyTrackers);
                    mDimensionsInCondition, mNested, mCurrentBucketStartTimeNs, mCurrentBucketNum,
                    mStartTimeNs, mBucketSizeNs, mConditionSliced, mAnomalyTrackers);
        case DurationMetric_AggregationType_MAX_SPARSE:
            return make_unique<MaxDurationTracker>(
                    mConfigKey, mMetricId, eventKey, mWizard, mConditionTrackerIndex,
                    mDimensionsInCondition, mNested,
                    mCurrentBucketStartTimeNs, mBucketSizeNs, mConditionSliced, mAnomalyTrackers);
                    mDimensionsInCondition, mNested, mCurrentBucketStartTimeNs, mCurrentBucketNum,
                    mStartTimeNs, mBucketSizeNs, mConditionSliced, mAnomalyTrackers);
    }
}

@@ -252,17 +252,18 @@ void DurationMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
    protoOutput->end(protoToken);
    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
    mPastBuckets.clear();
    mStartTimeNs = mCurrentBucketStartTimeNs;
}

void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) {
    if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
    uint64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();

    if (currentBucketEndTimeNs > eventTimeNs) {
        return;
    }
    VLOG("flushing...........");
    for (auto it = mCurrentSlicedDurationTrackerMap.begin();
            it != mCurrentSlicedDurationTrackerMap.end();) {
        if (it->second->flushIfNeeded(eventTime, &mPastBuckets)) {
        if (it->second->flushIfNeeded(eventTimeNs, &mPastBuckets)) {
            VLOG("erase bucket for key %s", it->first.c_str());
            it = mCurrentSlicedDurationTrackerMap.erase(it);
        } else {
@@ -270,11 +271,23 @@ void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) {
        }
    }

    int numBucketsForward = (eventTime - mCurrentBucketStartTimeNs) / mBucketSizeNs;
    mCurrentBucketStartTimeNs += numBucketsForward * mBucketSizeNs;
    int numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
    mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
    mCurrentBucketNum += numBucketsForward;
}

void DurationMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) {
    for (auto it = mCurrentSlicedDurationTrackerMap.begin();
         it != mCurrentSlicedDurationTrackerMap.end();) {
        if (it->second->flushCurrentBucket(eventTimeNs, &mPastBuckets)) {
            VLOG("erase bucket for key %s", it->first.c_str());
            it = mCurrentSlicedDurationTrackerMap.erase(it);
        } else {
            ++it;
        }
    }
}

void DurationMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
    if (mCurrentSlicedDurationTrackerMap.size() == 0) {
        return;
Loading