Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f275f612 authored by Chenjie Yu's avatar Chenjie Yu
Browse files

optional default base for ValueMetric

For pulled atoms, if use_zero_default_base is set to true, and if use_diff, the first data
piece will assume 0 value as base.
This requires a successful previous pull to set mHasGlobalBase.
mHasGlobalBase is reset when condition changes to false or we skip more
than 1 bucket.

Bug: 120476027
Bug: 120129928
Test: unit test
Change-Id: Id01a7bf8796394777f02ba2c9bfdc860f528b98f
parent 5e9f426f
Loading
Loading
Loading
Loading
+7 −1
Original line number Diff line number Diff line
@@ -59,15 +59,21 @@ bool StatsPuller::Pull(const int64_t elapsedTimeNs, std::vector<std::shared_ptr<
    mLastPullTimeNs = elapsedTimeNs;
    int64_t pullStartTimeNs = getElapsedRealtimeNs();
    bool ret = PullInternal(&mCachedData);
    if (!ret) {
        mCachedData.clear();
        return false;
    }
    StatsdStats::getInstance().notePullTime(mTagId, getElapsedRealtimeNs() - pullStartTimeNs);
    for (const shared_ptr<LogEvent>& data : mCachedData) {
        data->setElapsedTimestampNs(elapsedTimeNs);
        data->setLogdWallClockTimestampNs(wallClockTimeNs);
    }
    if (ret && mCachedData.size() > 0) {

    if (mCachedData.size() > 0) {
        mapAndMergeIsolatedUidsToHostUid(mCachedData, mUidMap, mTagId);
        (*data) = mCachedData;
    }

    StatsdStats::getInstance().notePullDelay(mTagId, getElapsedRealtimeNs() - elapsedTimeNs);
    return ret;
}
+1 −0
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ public:

    // Pulls the data. The returned data will have elapsedTimeNs set as timeNs
    // and will have wallClockTimeNs set as current wall clock time.
    // Return true if the pull is successful.
    bool Pull(const int64_t timeNs, std::vector<std::shared_ptr<LogEvent>>* data);

    // Clear cache immediately
+41 −29
Original line number Diff line number Diff line
@@ -72,17 +72,15 @@ const int FIELD_ID_BUCKET_NUM = 4;
const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;

const Value ZERO_LONG((int64_t)0);
const Value ZERO_DOUBLE((int64_t)0);

// ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
ValueMetricProducer::ValueMetricProducer(const ConfigKey& key,
                                         const ValueMetric& metric,
                                         const int conditionIndex,
                                         const sp<ConditionWizard>& conditionWizard,
                                         const int whatMatcherIndex,
                                         const sp<EventMatcherWizard>& matcherWizard,
                                         const int pullTagId,
                                         const int64_t timeBaseNs,
                                         const int64_t startTimeNs,
                                         const sp<StatsPullerManager>& pullerManager)
ValueMetricProducer::ValueMetricProducer(
        const ConfigKey& key, const ValueMetric& metric, const int conditionIndex,
        const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex,
        const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs,
        const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager)
    : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard),
      mWhatMatcherIndex(whatMatcherIndex),
      mEventMatcherWizard(matcherWizard),
@@ -102,7 +100,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key,
      mAggregationType(metric.aggregation_type()),
      mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
      mValueDirection(metric.value_direction()),
      mSkipZeroDiffOutput(metric.skip_zero_diff_output()) {
      mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
      mUseZeroDefaultBase(metric.use_zero_default_base()),
      mHasGlobalBase(false) {
    int64_t bucketSizeMills = 0;
    if (metric.has_bucket()) {
        bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -302,6 +302,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
    }
}

void ValueMetricProducer::resetBase() {
    for (auto& slice : mCurrentSlicedBucket) {
        for (auto& interval : slice.second) {
            interval.hasBase = false;
        }
    }
    mHasGlobalBase = false;
}

void ValueMetricProducer::onConditionChangedLocked(const bool condition,
                                                   const int64_t eventTimeNs) {
    if (eventTimeNs < mCurrentBucketStartTimeNs) {
@@ -317,13 +326,10 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
        pullAndMatchEventsLocked(eventTimeNs);
    }

    // when condition change from true to false, clear diff base
    // when condition change from true to false, clear diff base but don't
    // reset other counters as we may accumulate more value in the bucket.
    if (mUseDiff && mCondition && !condition) {
        for (auto& slice : mCurrentSlicedBucket) {
            for (auto& interval : slice.second) {
                interval.hasBase = false;
            }
        }
        resetBase();
    }

    mCondition = condition;
@@ -332,15 +338,17 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
    vector<std::shared_ptr<LogEvent>> allData;
    if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
        if (allData.size() == 0) {
            return;
        }
        for (const auto& data : allData) {
            if (mEventMatcherWizard->matchLogEvent(
                *data, mWhatMatcherIndex) == MatchingState::kMatched) {
                onMatchedLogEventLocked(mWhatMatcherIndex, *data);
            }
        }
        mHasGlobalBase = true;
    } else {
        // for pulled data, every pull is needed. So we reset the base if any
        // pull fails.
        resetBase();
    }
}

@@ -376,6 +384,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
                onMatchedLogEventLocked(mWhatMatcherIndex, *data);
            }
        }
        mHasGlobalBase = true;
    } else {
        VLOG("No need to commit data on condition false.");
    }
@@ -486,12 +495,19 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
        }

        if (mUseDiff) {
            // no base. just update base and return.
            if (!interval.hasBase) {
                if (mHasGlobalBase && mUseZeroDefaultBase) {
                    // The bucket has global base. This key does not.
                    // Optionally use zero as base.
                    interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
                    interval.hasBase = true;
                } else {
                    // no base. just update base and return.
                    interval.base = value;
                    interval.hasBase = true;
                    return;
                }
            }
            Value diff;
            switch (mValueDirection) {
                case ValueMetric::INCREASING:
@@ -580,11 +596,7 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
    if (numBucketsForward > 1) {
        VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
        // take base again in future good bucket.
        for (auto& slice : mCurrentSlicedBucket) {
            for (auto& interval : slice.second) {
                interval.hasBase = false;
            }
        }
        resetBase();
    }
    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
         (long long)mCurrentBucketStartTimeNs);
+17 −0
Original line number Diff line number Diff line
@@ -148,6 +148,9 @@ private:

    void pullAndMatchEventsLocked(const int64_t timestampNs);

    // Reset diff base and mHasGlobalBase
    void resetBase();

    static const size_t kBucketSize = sizeof(ValueBucket{});

    const size_t mDimensionSoftLimit;
@@ -164,6 +167,18 @@ private:

    const bool mSkipZeroDiffOutput;

    // If true, use a zero value as base to compute the diff.
    // This is used for new keys which are present in the new data but was not
    // present in the base data.
    // The default base will only be used if we have a global base.
    const bool mUseZeroDefaultBase;

    // For pulled metrics, this is always set to true whenever a pull succeeds.
    // It is set to false when a pull fails, or upon condition change to false.
    // This is used to decide if we have the right base data to compute the
    // diff against.
    bool mHasGlobalBase;

    FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering);
    FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
@@ -185,6 +200,8 @@ private:
    FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket);
    FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime);
    FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput);
    FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
    FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
};

}  // namespace statsd
+2 −0
Original line number Diff line number Diff line
@@ -274,6 +274,8 @@ message ValueMetric {

  optional bool use_diff = 12;

  optional bool use_zero_default_base = 15 [default = false];

  enum ValueDirection {
      UNKNOWN = 0;
      INCREASING = 1;
Loading