Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c5f11c40 authored by Olivier Gaillard's avatar Olivier Gaillard
Browse files

Reset the base when pull fails.

Without resetting the base, we will compute the wrong diff when we have
a condition change.

Move the responsability of handling failures to the PullReceivers. This
is more consistent with #onConditionChanged which does handle the
failure cases.

We also always compute nextPullTimeNs in order to only call onDataPulled
on bucket boundaries. The current code does not update nextPullTimeNs
which means a new alarm might trigger a pull and onDataPulled in the
middle of the bucket. The behavior in this case is undefined.

Bug: 123181864
Test: atest statsd_test
Change-Id: I0910b7db26a0de764436c46c8d7d11cafa52dcd9
parent af41d5f7
Loading
Loading
Loading
Loading
+8 −2
Original line number Diff line number Diff line
@@ -28,7 +28,13 @@ namespace statsd {
class PullDataReceiver : virtual public RefBase{
 public:
  virtual ~PullDataReceiver() {}
  virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) = 0;
  /**
   * @param data The pulled data.
   * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the
   * bucket should be invalidated.
   */
  virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, 
                            bool pullSuccess) = 0;
};

}  // namespace statsd
+7 −6
Original line number Diff line number Diff line
@@ -358,12 +358,13 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {

    for (const auto& pullInfo : needToPull) {
        vector<shared_ptr<LogEvent>> data;
        if (!Pull(pullInfo.first, &data)) {
        bool pullSuccess = Pull(pullInfo.first, &data);
        if (pullSuccess) {
            StatsdStats::getInstance().notePullDelay(
                    pullInfo.first, getElapsedRealtimeNs() - elapsedTimeNs);
        } else {
            VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
            continue;
        }
        StatsdStats::getInstance().notePullDelay(pullInfo.first,
                                                 getElapsedRealtimeNs() - elapsedTimeNs);

        // Convention is to mark pull atom timestamp at request time.
        // If we pull at t0, puller starts at t1, finishes at t2, and send back
@@ -380,8 +381,8 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
        for (const auto& receiverInfo : pullInfo.second) {
            sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
            if (receiverPtr != nullptr) {
                receiverPtr->onDataPulled(data);
                // we may have just come out of a coma, compute next pull time
                receiverPtr->onDataPulled(data, pullSuccess);
                // We may have just come out of a coma, compute next pull time.
                int numBucketsAhead =
                        (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
                receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
+3 −2
Original line number Diff line number Diff line
@@ -406,9 +406,10 @@ std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const Lo
    return gaugeFields;
}

void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
                                       bool pullSuccess) {
    std::lock_guard<std::mutex> lock(mMutex);
    if (allData.size() == 0) {
    if (!pullSuccess || allData.size() == 0) {
        return;
    }
    for (const auto& data : allData) {
+2 −1
Original line number Diff line number Diff line
@@ -67,7 +67,8 @@ public:
    virtual ~GaugeMetricProducer();

    // Handles when the pulled data arrives.
    void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
    void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
                      bool pullSuccess) override;

    // GaugeMetric needs to immediately trigger another pull when we create the partial bucket.
    void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
+11 −3
Original line number Diff line number Diff line
@@ -382,9 +382,16 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime
    return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
}

void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
                                       bool pullSuccess) {
    std::lock_guard<std::mutex> lock(mMutex);
    if (mCondition) {
        if (!pullSuccess) {
            // If the pull failed, we won't be able to compute a diff.
            resetBase();
            return;
        }

        if (allData.size() == 0) {
            VLOG("Data pulled is empty");
            StatsdStats::getInstance().noteEmptyData(mPullTagId);
@@ -399,12 +406,13 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
        // if the diff base will be cleared and this new data will serve as new diff base.
        int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
        int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
        if (bucketEndTime < mCurrentBucketStartTimeNs) {
        bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs;
        if (isEventLate) {
            VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
                 (long long)mCurrentBucketStartTimeNs);
            StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
            return;
        }

        for (const auto& data : allData) {
            LogEvent localCopy = data->makeCopy();
            if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
Loading