Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 47a9efce authored by Olivier Gaillard's avatar Olivier Gaillard
Browse files

Fix usage of flush buckets.

- For pulled metrics, buckets should only be flushed when we did a pull.
Without a pull, we will have incomplete data.
- Flush buckets at the end of each incoming event (on data pull,
app upgrade, condition change). That way, we can ensure that buckets
will be in sync with the conditions.
- Do not throw away on data pull events which are late as long as they
are in the correct bucket. This is expected since we are using
AlarmManager.

Test: atest statsd_test
Change-Id: If1222b6005a0b88bbdae1b4690921c24acd75542
parent 8e95f0bf
Loading
Loading
Loading
Loading
+104 −60
Original line number Diff line number Diff line
@@ -174,13 +174,17 @@ void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition
}

void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
    flushIfNeededLocked(dropTimeNs);
    StatsdStats::getInstance().noteBucketDropped(mMetricId);
    mPastBuckets.clear();
    // We are going to flush the data without doing a pull first so we need to invalidte the data.
    bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
    if (pullNeeded) {
        invalidateCurrentBucket();
    }
    flushIfNeededLocked(dropTimeNs);
    clearPastBucketsLocked(dropTimeNs);
}

void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
    flushIfNeededLocked(dumpTimeNs);
    mPastBuckets.clear();
    mSkippedBuckets.clear();
}
@@ -192,7 +196,6 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                             std::set<string> *str_set,
                                             ProtoOutputStream* protoOutput) {
    VLOG("metric %lld dump report now...", (long long)mMetricId);
    flushIfNeededLocked(dumpTimeNs);
    if (include_current_partial_bucket) {
        // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
        // current bucket will have incomplete data and the next will have the wrong snapshot to do
@@ -325,12 +328,16 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
    }
}

void ValueMetricProducer::invalidateCurrentBucket() {
void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase() {
    if (!mCurrentBucketIsInvalid) {
        // Only report once per invalid bucket.
        StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
    }
    mCurrentBucketIsInvalid = true;
}

void ValueMetricProducer::invalidateCurrentBucket() {
    invalidateCurrentBucketWithoutResetBase();
    resetBase();
}

@@ -345,42 +352,58 @@ void ValueMetricProducer::resetBase() {

void ValueMetricProducer::onConditionChangedLocked(const bool condition,
                                                   const int64_t eventTimeNs) {
    if (eventTimeNs < mCurrentBucketStartTimeNs) {
        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
             (long long)mCurrentBucketStartTimeNs);
        StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
    bool isEventTooLate  = eventTimeNs < mCurrentBucketStartTimeNs;
    if (!isEventTooLate) {
        if (mCondition == ConditionState::kUnknown) {
            // If the condition was unknown, we mark the bucket as invalid since the bucket will
            // contain partial data. For instance, the condition change might happen close to the
            // end of the bucket and we might miss lots of data.
            //
            // We still want to pull to set the base.
            invalidateCurrentBucket();
        return;
        }

    flushIfNeededLocked(eventTimeNs);

    if (mCondition != ConditionState::kUnknown) {
        // Pull on condition changes.
        bool conditionChanged = mCondition != condition;
        bool conditionChanged =
                (mCondition == ConditionState::kTrue && condition == ConditionState::kFalse)
                || (mCondition == ConditionState::kFalse && condition == ConditionState::kTrue);
        // We do not need to pull when we go from unknown to false.
        if (mIsPulled && conditionChanged) {
        //
        // We also pull if the condition was already true in order to be able to flush the bucket at
        // the end if needed.
        //
        // onConditionChangedLocked might happen on bucket boundaries if this is called before
        // #onDataPulled.
        if (mIsPulled && (conditionChanged || condition)) {
            pullAndMatchEventsLocked(eventTimeNs);
        }

        // when condition change from true to false, clear diff base but don't
        // When condition change from true to false, clear diff base but don't
        // reset other counters as we may accumulate more value in the bucket.
        if (mUseDiff && mCondition == ConditionState::kTrue && condition == ConditionState::kFalse) {
        if (mUseDiff && mCondition == ConditionState::kTrue
                && condition == ConditionState::kFalse) {
            resetBase();
        }
        mCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;

    } else {
        // If the condition was unknown, we mark the bucket as invalid since the bucket will contain
        // partial data. For instance, the condition change might happen close to the end of the
        // bucket and we might miss lots of data.
        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
             (long long)mCurrentBucketStartTimeNs);
        StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
        invalidateCurrentBucket();
        // Something weird happened. If we received another event if the future, the condition might
        // be wrong.
        mCondition = ConditionState::kUnknown;
    }
    mCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;

    // This part should alway be called.
    flushIfNeededLocked(eventTimeNs);
}

void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
    vector<std::shared_ptr<LogEvent>> allData;
    if (!mPullerManager->Pull(mPullTagId, &allData)) {
        ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
        ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
        invalidateCurrentBucket();
        return;
    }
@@ -392,35 +415,43 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime
    return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
}

// By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
// to be delayed. Other events like condition changes or app upgrade which are not based on
// AlarmManager might have arrived earlier and close the bucket.
void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
                                       bool pullSuccess, int64_t originalPullTimeNs) {
    std::lock_guard<std::mutex> lock(mMutex);
        if (mCondition == ConditionState::kTrue) {
        if (!pullSuccess) {
            // If the pull failed, we won't be able to compute a diff.
            if (!pullSuccess) {
                invalidateCurrentBucket();
            return;
        }

            } else {
                bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
                if (isEventLate) {
                    // If the event is late, we are in the middle of a bucket. Just
                    // process the data without trying to snap the data to the nearest bucket.
                    accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
                } else {
                    // For scheduled pulled data, the effective event time is snap to the nearest
                    // bucket end. In the case of waking up from a deep sleep state, we will
        // attribute to the previous bucket end. If the sleep was long but not very long, we
        // will be in the immediate next bucket. Previous bucket may get a larger number as
        // we pull at a later time than real bucket end.
        // If the sleep was very long, we skip more than one bucket before sleep. In this case,
        // if the diff base will be cleared and this new data will serve as new diff base.
                    // attribute to the previous bucket end. If the sleep was long but not very
                    // long, we will be in the immediate next bucket. Previous bucket may get a
                    // larger number as we pull at a later time than real bucket end.
                    //
                    // If the sleep was very long, we skip more than one bucket before sleep. In
                    // this case, if the diff base will be cleared and this new data will serve as
                    // new diff base.
                    int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
                    StatsdStats::getInstance().noteBucketBoundaryDelayNs(
                            mMetricId, originalPullTimeNs - bucketEndTime);
                    accumulateEvents(allData, originalPullTimeNs, bucketEndTime);
                }
            }
        }

    // We can probably flush the bucket. Since we used bucketEndTime when calling
    // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
    flushIfNeededLocked(originalPullTimeNs);

    } else {
        VLOG("No need to commit data on condition false.");
    }
}

void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
@@ -587,7 +618,10 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
    }
    mMatchedMetricDimensionKeys.insert(eventKey);

    if (!mIsPulled) {
        // We cannot flush without doing a pull first.
        flushIfNeededLocked(eventTimeNs);
    }

    // For pulled data, we already check condition when we decide to pull or
    // in onDataPulled. So take all of them.
@@ -722,26 +756,26 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
    }
}

// For pulled metrics, we always need to make sure we do a pull before flushing the bucket
// if mCondition is true!
void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
    int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();

    if (eventTimeNs < currentBucketEndTimeNs) {
        VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
             (long long)(currentBucketEndTimeNs));
        return;
    }

    int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
    int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
    int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
    flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
}

    mCurrentBucketNum += numBucketsForward;
    if (numBucketsForward > 1) {
        VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
        StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
        // take base again in future good bucket.
        resetBase();
int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const {
    int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
    if (eventTimeNs < currentBucketEndTimeNs) {
        return 0;
    }
    return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
}

void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
@@ -750,6 +784,16 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
        StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
    }

    int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
    mCurrentBucketNum += numBucketsForward;
    if (numBucketsForward > 1) {
        VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
        StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
        // Something went wrong. Maybe the device was sleeping for a long time. It is better
        // to mark the current bucket as invalid. The last pull might have been successful through.
        invalidateCurrentBucketWithoutResetBase();
    }

    VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
         (int)mCurrentSlicedBucket.size());
    int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
+20 −3
Original line number Diff line number Diff line
@@ -39,6 +39,13 @@ struct ValueBucket {
    std::vector<Value> values;
};


// Aggregates values within buckets.
//
// There are different events that might complete a bucket
// - a condition change
// - an app upgrade
// - an alarm set to the end of the bucket
class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver {
public:
    ValueMetricProducer(const ConfigKey& key, const ValueMetric& valueMetric,
@@ -61,9 +68,8 @@ public:
        if (!mSplitBucketForAppUpgrade) {
            return;
        }
        flushIfNeededLocked(eventTimeNs - 1);
        if (mIsPulled && mCondition) {
            pullAndMatchEventsLocked(eventTimeNs - 1);
            pullAndMatchEventsLocked(eventTimeNs);
        }
        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
    };
@@ -94,9 +100,12 @@ private:

    void dumpStatesLocked(FILE* out, bool verbose) const override;

    // Util function to flush the old packet.
    // For pulled metrics, this method should only be called if a pulled have be done. Else we will
    // not have complete data for the bucket.
    void flushIfNeededLocked(const int64_t& eventTime) override;

    // For pulled metrics, this method should only be called if a pulled have be done. Else we will
    // not have complete data for the bucket.
    void flushCurrentBucketLocked(const int64_t& eventTimeNs,
                                  const int64_t& nextBucketStartTimeNs) override;

@@ -105,8 +114,12 @@ private:
    // Calculate previous bucket end time based on current time.
    int64_t calcPreviousBucketEndTime(const int64_t currentTimeNs);

    // Calculate how many buckets are present between the current bucket and eventTimeNs.
    int64_t calcBucketsForwardCount(const int64_t& eventTimeNs) const;

    // Mark the data as invalid.
    void invalidateCurrentBucket();
    void invalidateCurrentBucketWithoutResetBase();

    const int mWhatMatcherIndex;

@@ -256,6 +269,10 @@ private:
    FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary);
    FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange);
    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade);
    FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff);
    FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff);
    friend class ValueMetricProducerTestHelper;
};

+199 −79

File changed.

Preview size limit exceeded, changes collapsed.