Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a8b70115 authored by Olivier Gaillard's avatar Olivier Gaillard
Browse files

Invalidate the bucket when global base is missing.

For diffs, we need a global base at the beginning of the bucket. If we
do not have a global base, it means the bucket will contain incomplete
data.

Test: atest statsd_test
Change-Id: Ifea7ce09e31d7c5c44b1820b528dfda492dd0dc9
parent 47a9efce
Loading
Loading
Loading
Loading
+28 −19
Original line number Diff line number Diff line
@@ -155,7 +155,7 @@ ValueMetricProducer::ValueMetricProducer(
    mCurrentBucketStartTimeNs = startTimeNs;
    // Kicks off the puller immediately if condition is true and diff based.
    if (mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
        pullAndMatchEventsLocked(startTimeNs);
        pullAndMatchEventsLocked(startTimeNs, mCondition);
    }
    VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
         (long long)mBucketSizeNs, (long long)mTimeBaseNs);
@@ -208,7 +208,7 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                    invalidateCurrentBucket();
                    break;
                case NO_TIME_CONSTRAINTS:
                    pullAndMatchEventsLocked(dumpTimeNs);
                    pullAndMatchEventsLocked(dumpTimeNs, mCondition);
                    break;
            }
        }
@@ -364,9 +364,10 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
        }

        // Pull on condition changes.
        ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
        bool conditionChanged =
                (mCondition == ConditionState::kTrue && condition == ConditionState::kFalse)
                || (mCondition == ConditionState::kFalse && condition == ConditionState::kTrue);
                (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)
                || (mCondition == ConditionState::kFalse && newCondition == ConditionState::kTrue);
        // We do not need to pull when we go from unknown to false.
        //
        // We also pull if the condition was already true in order to be able to flush the bucket at
@@ -375,16 +376,16 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
        // onConditionChangedLocked might happen on bucket boundaries if this is called before
        // #onDataPulled.
        if (mIsPulled && (conditionChanged || condition)) {
            pullAndMatchEventsLocked(eventTimeNs);
            pullAndMatchEventsLocked(eventTimeNs, newCondition);
        }

        // When condition change from true to false, clear diff base but don't
        // reset other counters as we may accumulate more value in the bucket.
        if (mUseDiff && mCondition == ConditionState::kTrue
                && condition == ConditionState::kFalse) {
                && newCondition == ConditionState::kFalse) {
            resetBase();
        }
        mCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
        mCondition = newCondition;

    } else {
        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
@@ -400,7 +401,7 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
    flushIfNeededLocked(eventTimeNs);
}

void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs, ConditionState condition) {
    vector<std::shared_ptr<LogEvent>> allData;
    if (!mPullerManager->Pull(mPullTagId, &allData)) {
        ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
@@ -408,7 +409,7 @@ void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
        return;
    }

    accumulateEvents(allData, timestampNs, timestampNs);
    accumulateEvents(allData, timestampNs, timestampNs, condition);
}

int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
@@ -430,7 +431,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
                if (isEventLate) {
                    // If the event is late, we are in the middle of a bucket. Just
                    // process the data without trying to snap the data to the nearest bucket.
                    accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
                    accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs, mCondition);
                } else {
                    // For scheduled pulled data, the effective event time is snap to the nearest
                    // bucket end. In the case of waking up from a deep sleep state, we will
@@ -444,7 +445,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
                    int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
                    StatsdStats::getInstance().noteBucketBoundaryDelayNs(
                            mMetricId, originalPullTimeNs - bucketEndTime);
                    accumulateEvents(allData, originalPullTimeNs, bucketEndTime);
                    accumulateEvents(allData, originalPullTimeNs, bucketEndTime, mCondition);
                }
            }
        }
@@ -455,7 +456,8 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
}

void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
                                           int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) {
                                           int64_t originalPullTimeNs, int64_t eventElapsedTimeNs,
                                           ConditionState condition) {
    bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
    if (isEventLate) {
        VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
@@ -817,12 +819,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
    if (!mCurrentBucketIsInvalid) {
        appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
    }
    StatsdStats::getInstance().noteBucketCount(mMetricId);
    initCurrentSlicedBucket();
    mCurrentBucketIsInvalid = false;
    mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
         (long long)mCurrentBucketStartTimeNs);
    initCurrentSlicedBucket(nextBucketStartTimeNs);
}

ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
@@ -849,7 +846,9 @@ ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
    return bucket;
}

void ValueMetricProducer::initCurrentSlicedBucket() {
void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) {
    StatsdStats::getInstance().noteBucketCount(mMetricId);
    // Cleanup data structure to aggregate values.
    for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
        bool obsolete = true;
        for (auto& interval : it->second) {
@@ -867,6 +866,16 @@ void ValueMetricProducer::initCurrentSlicedBucket() {
            it++;
        }
    }

    mCurrentBucketIsInvalid = false;
    // If we do not have a global base when the condition is true,
    // we will have incomplete bucket for the next bucket.
    if (mUseDiff && !mHasGlobalBase && mCondition) {
        mCurrentBucketIsInvalid = false;
    }
    mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
         (long long)mCurrentBucketStartTimeNs);
}

void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
+42 −39
Original line number Diff line number Diff line
@@ -69,7 +69,7 @@ public:
            return;
        }
        if (mIsPulled && mCondition) {
            pullAndMatchEventsLocked(eventTimeNs);
            pullAndMatchEventsLocked(eventTimeNs, mCondition);
        }
        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
    };
@@ -100,7 +100,7 @@ private:

    void dumpStatesLocked(FILE* out, bool verbose) const override;

    // For pulled metrics, this method should only be called if a pulled have be done. Else we will
    // For pulled metrics, this method should only be called if a pull has be done. Else we will
    // not have complete data for the bucket.
    void flushIfNeededLocked(const int64_t& eventTime) override;

@@ -176,14 +176,15 @@ private:

    bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey);

    void pullAndMatchEventsLocked(const int64_t timestampNs);
    void pullAndMatchEventsLocked(const int64_t timestampNs, ConditionState condition);

    void accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, 
                          int64_t originalPullTimeNs, int64_t eventElapsedTimeNs);
                          int64_t originalPullTimeNs, int64_t eventElapsedTimeNs,
                          ConditionState condition);

    ValueBucket buildPartialBucket(int64_t bucketEndTime,
                                   const std::vector<Interval>& intervals);
    void initCurrentSlicedBucket();
    void initCurrentSlicedBucket(int64_t nextBucketStartTimeNs);
    void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs);

    // Reset diff base and mHasGlobalBase
@@ -227,52 +228,54 @@ private:

    const bool mSplitBucketForAppUpgrade;

    FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection);
    FRIEND_TEST(ValueMetricProducerTest, TestBaseSetOnConditionChange);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketInvalidIfGlobalBaseIsNotSet);
    FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime);
    FRIEND_TEST(ValueMetricProducerTest, TestDataIsNotUpdatedWhenNoConditionChanged);
    FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary);
    FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onConditionChanged);
    FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled);
    FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket);
    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit);
    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed);
    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed);
    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed);
    FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff);
    FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff);
    FRIEND_TEST(ValueMetricProducerTest, TestPartialBucketCreated);
    FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries);
    FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering);
    FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
    FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset);
    FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade);
    FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering);
    FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade);
    FRIEND_TEST(ValueMetricProducerTest, TestPartialBucketCreated);
    FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse);
    FRIEND_TEST(ValueMetricProducerTest, TestPulledWithAppUpgradeDisabled);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateAvg);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSum);
    FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket);
    FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade);
    FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullDelayExceeded);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
    FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput);
    FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutputMultiValue);
    FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
    FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
    FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
    FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed);
    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit);
    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed);
    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed);
    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullDelayExceeded);
    FRIEND_TEST(ValueMetricProducerTest, TestBaseSetOnConditionChange);
    FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled);
    FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onConditionChanged);
    FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary);
    FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade);
    FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff);
    FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff);
    friend class ValueMetricProducerTestHelper;
};

+83 −1
Original line number Diff line number Diff line
@@ -55,8 +55,12 @@ double epsilon = 0.001;
static void assertPastBucketValuesSingleKey(
        const std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>>& mPastBuckets,
        const std::initializer_list<int>& expectedValuesList) {

    std::vector<int> expectedValues(expectedValuesList);
    if (expectedValues.size() == 0) {
        ASSERT_EQ(0, mPastBuckets.size());
        return;
    }

    ASSERT_EQ(1, mPastBuckets.size());
    ASSERT_EQ(expectedValues.size(), mPastBuckets.begin()->second.size());

@@ -2613,6 +2617,84 @@ TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade) {
    assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {9});
}

TEST(ValueMetricProducerTest, TestDataIsNotUpdatedWhenNoConditionChanged) {
    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();

    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
    EXPECT_CALL(*pullerManager, Pull(tagId, _))
            // First on condition changed.
            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
                data->clear();
                data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1));
                return true;
            }))
            // Second on condition changed.
            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
                data->clear();
                data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 3));
                return true;
            }));

    sp<ValueMetricProducer> valueProducer =
            ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);

    valueProducer->onConditionChanged(true, bucketStartTimeNs + 8);
    valueProducer->onConditionChanged(false, bucketStartTimeNs + 10);
    valueProducer->onConditionChanged(false, bucketStartTimeNs + 10);

    EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size());
    auto curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0];
    EXPECT_EQ(true, curInterval.hasValue);
    EXPECT_EQ(2, curInterval.value.long_value);
}

TEST(ValueMetricProducerTest, TestBucketInvalidIfGlobalBaseIsNotSet) {
    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();

    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
    EXPECT_CALL(*pullerManager, Pull(tagId, _))
            // First condition change.
            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
                data->clear();
                data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1));
                return true;
            }))
            // 2nd condition change.
            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
                data->clear();
                data->push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 1));
                return true;
            }))
            // 3rd condition change.
            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
                data->clear();
                data->push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 1));
                return true;
            }));

    sp<ValueMetricProducer> valueProducer =
            ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
    valueProducer->onConditionChanged(true, bucket2StartTimeNs + 10);

    vector<shared_ptr<LogEvent>> allData;
    allData.push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs + 3, 10));
    valueProducer->onDataPulled(allData, /** succeed */ false, bucketStartTimeNs + 3);

    allData.clear();
    allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 20));
    valueProducer->onDataPulled(allData, /** succeed */ false, bucket2StartTimeNs);

    valueProducer->onConditionChanged(false, bucket2StartTimeNs + 8);
    valueProducer->onConditionChanged(true, bucket2StartTimeNs + 10);

    allData.clear();
    allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket3StartTimeNs, 30));
    valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);

    // There was not global base available so all buckets are invalid.
    assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {});
}

static StatsLogReport outputStreamToProto(ProtoOutputStream* proto) {
    vector<uint8_t> bytes;
    bytes.resize(proto->size());