Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 11203df2 authored by Olivier Gaillard's avatar Olivier Gaillard
Browse files

Unify the way we process events on condition change and bucket

boundaries.

- We now always process empty data and events that exceeds the max delay
in a consistent way

- Fix one bug which caused base data not to be reset, e.g.
bucket h+1 on condition change to true, we pull the following data (key A / value 1, key B / value 2)
bucket h+2 on bucket boundary, we pull the following data (key A / value 2)
In this case the previous code, did not reset Key B. It should be reset
since it is not present in the most recent data.

Test: atest statsd_test
Bug: 124046337
Change-Id: I19456bbe1529e72befbb621be185ce24bd65077a
parent f0cdc787
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -32,9 +32,10 @@ class PullDataReceiver : virtual public RefBase{
   * @param data The pulled data.
   * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the
   * bucket should be invalidated.
   * @param originalPullTimeNs This is when all the pulls have been initiated (elapsed time).
   */
  virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, 
                            bool pullSuccess) = 0;
                            bool pullSuccess, int64_t originalPullTimeNs) = 0;
};

}  // namespace statsd
+1 −1
Original line number Diff line number Diff line
@@ -381,7 +381,7 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
        for (const auto& receiverInfo : pullInfo.second) {
            sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
            if (receiverPtr != nullptr) {
                receiverPtr->onDataPulled(data, pullSuccess);
                receiverPtr->onDataPulled(data, pullSuccess, elapsedTimeNs);
                // We may have just come out of a coma, compute next pull time.
                int numBucketsAhead =
                        (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
+1 −1
Original line number Diff line number Diff line
@@ -407,7 +407,7 @@ std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const Lo
}

void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
                                       bool pullSuccess) {
                                       bool pullSuccess, int64_t originalPullTimeNs) {
    std::lock_guard<std::mutex> lock(mMutex);
    if (!pullSuccess || allData.size() == 0) {
        return;
+1 −1
Original line number Diff line number Diff line
@@ -68,7 +68,7 @@ public:

    // Handles when the pulled data arrives.
    void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
                      bool pullSuccess) override;
                      bool pullSuccess, int64_t originalPullTimeNs) override;

    // GaugeMetric needs to immediately trigger another pull when we create the partial bucket.
    void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
+59 −52
Original line number Diff line number Diff line
@@ -361,34 +361,8 @@ void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
        invalidateCurrentBucket();
        return;
    }
    const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
    StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
    if (pullDelayNs > mMaxPullDelayNs) {
        ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
              (long long)mMaxPullDelayNs);
        StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
        // We are missing one pull from the bucket which means we will not have a complete view of
        // what's going on.
        invalidateCurrentBucket();
        return;
    }

    if (timestampNs < mCurrentBucketStartTimeNs) {
        // The data will be skipped in onMatchedLogEventInternalLocked, but we don't want to report
        // for every event, just the pull
        StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
    }

    for (const auto& data : allData) {
        // make a copy before doing and changes
        LogEvent localCopy = data->makeCopy();
        localCopy.setElapsedTimestampNs(timestampNs);
        if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
            MatchingState::kMatched) {
            onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
        }
    }
    mHasGlobalBase = true;
    accumulateEvents(allData, timestampNs, timestampNs);
}

int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
@@ -396,7 +370,7 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime
}

void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
                                       bool pullSuccess) {
                                       bool pullSuccess, int64_t originalPullTimeNs) {
    std::lock_guard<std::mutex> lock(mMutex);
    if (mCondition) {
        if (!pullSuccess) {
@@ -405,11 +379,6 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
            return;
        }

        if (allData.size() == 0) {
            VLOG("Data pulled is empty");
            StatsdStats::getInstance().noteEmptyData(mPullTagId);
            return;
        }
        // For scheduled pulled data, the effective event time is snap to the nearest
        // bucket end. In the case of waking up from a deep sleep state, we will
        // attribute to the previous bucket end. If the sleep was long but not very long, we
@@ -417,32 +386,69 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
        // we pull at a later time than real bucket end.
        // If the sleep was very long, we skip more than one bucket before sleep. In this case,
        // if the diff base will be cleared and this new data will serve as new diff base.
        int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
        int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
        bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs;
        int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
        accumulateEvents(allData, originalPullTimeNs, bucketEndTime);

        // We can probably flush the bucket. Since we used bucketEndTime when calling
        // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
        flushIfNeededLocked(originalPullTimeNs);

    } else {
        VLOG("No need to commit data on condition false.");
    }
}

void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, 
                                           int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) {
    bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
    if (isEventLate) {
            VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
                 (long long)mCurrentBucketStartTimeNs);
        VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
             (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
        StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
        invalidateCurrentBucket();
        return;
    }

    const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs;
    StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
    if (pullDelayNs > mMaxPullDelayNs) {
        ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
              (long long)mMaxPullDelayNs);
        StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
        // We are missing one pull from the bucket which means we will not have a complete view of
        // what's going on.
        invalidateCurrentBucket();
        return;
    }

    if (allData.size() == 0) {
        VLOG("Data pulled is empty");
        StatsdStats::getInstance().noteEmptyData(mPullTagId);
    }

    mMatchedMetricDimensionKeys.clear();
    for (const auto& data : allData) {
        LogEvent localCopy = data->makeCopy();
        if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
            MatchingState::kMatched) {
                localCopy.setElapsedTimestampNs(bucketEndTime);
            localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
            onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
        }
    }
        mHasGlobalBase = true;

        // We can probably flush the bucket. Since we used bucketEndTime when calling
        // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
        flushIfNeededLocked(realEventTime);
    } else {
        VLOG("No need to commit data on condition false.");
    // If the new pulled data does not contains some keys we track in our intervals, we need to
    // reset the base.
    for (auto& slice : mCurrentSlicedBucket) {
        bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first) 
                != mMatchedMetricDimensionKeys.end();
        if (!presentInPulledData) {
            for (auto& interval : slice.second) {
                interval.hasBase = false;
            }
        }
    }
    mMatchedMetricDimensionKeys.clear();
    mHasGlobalBase = true;
}

void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
    if (mCurrentSlicedBucket.size() == 0) {
@@ -539,6 +545,7 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
             (long long)mCurrentBucketStartTimeNs);
        return;
    }
    mMatchedMetricDimensionKeys.insert(eventKey);

    flushIfNeededLocked(eventTimeNs);

Loading