Loading cmds/statsd/src/external/PullDataReceiver.h +2 −1 Original line number Diff line number Diff line Loading @@ -32,9 +32,10 @@ class PullDataReceiver : virtual public RefBase{ * @param data The pulled data. * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the * bucket should be invalidated. * @param originalPullTimeNs This is when all the pulls have been initiated (elapsed time). */ virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, bool pullSuccess) = 0; bool pullSuccess, int64_t originalPullTimeNs) = 0; }; } // namespace statsd Loading cmds/statsd/src/external/StatsPullerManager.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -384,7 +384,7 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) { for (const auto& receiverInfo : pullInfo.second) { sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote(); if (receiverPtr != nullptr) { receiverPtr->onDataPulled(data, pullSuccess); receiverPtr->onDataPulled(data, pullSuccess, elapsedTimeNs); // We may have just come out of a coma, compute next pull time. int numBucketsAhead = (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs; Loading cmds/statsd/src/metrics/GaugeMetricProducer.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -407,7 +407,7 @@ std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const Lo } void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, bool pullSuccess) { bool pullSuccess, int64_t originalPullTimeNs) { std::lock_guard<std::mutex> lock(mMutex); if (!pullSuccess || allData.size() == 0) { return; Loading cmds/statsd/src/metrics/GaugeMetricProducer.h +1 −1 Original line number Diff line number Diff line Loading @@ -68,7 +68,7 @@ public: // Handles when the pulled data arrives. void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, bool pullSuccess) override; bool pullSuccess, int64_t originalPullTimeNs) override; // GaugeMetric needs to immediately trigger another pull when we create the partial bucket. void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid, Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +59 −52 Original line number Diff line number Diff line Loading @@ -361,34 +361,8 @@ void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { invalidateCurrentBucket(); return; } const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs; StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); if (pullDelayNs > mMaxPullDelayNs) { ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId, (long long)mMaxPullDelayNs); StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); // We are missing one pull from the bucket which means we will not have a complete view of // what's going on. invalidateCurrentBucket(); return; } if (timestampNs < mCurrentBucketStartTimeNs) { // The data will be skipped in onMatchedLogEventInternalLocked, but we don't want to report // for every event, just the pull StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); } for (const auto& data : allData) { // make a copy before doing and changes LogEvent localCopy = data->makeCopy(); localCopy.setElapsedTimestampNs(timestampNs); if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == MatchingState::kMatched) { onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); } } mHasGlobalBase = true; accumulateEvents(allData, timestampNs, timestampNs); } int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { Loading @@ -396,7 +370,7 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime } void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, bool pullSuccess) { bool pullSuccess, int64_t originalPullTimeNs) { std::lock_guard<std::mutex> lock(mMutex); if (mCondition) { if (!pullSuccess) { Loading @@ -405,11 +379,6 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven return; } if (allData.size() == 0) { VLOG("Data pulled is empty"); StatsdStats::getInstance().noteEmptyData(mPullTagId); return; } // For scheduled pulled data, the effective event time is snap to the nearest // bucket end. In the case of waking up from a deep sleep state, we will // attribute to the previous bucket end. If the sleep was long but not very long, we Loading @@ -417,32 +386,69 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven // we pull at a later time than real bucket end. // If the sleep was very long, we skip more than one bucket before sleep. In this case, // if the diff base will be cleared and this new data will serve as new diff base. int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs(); int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1; bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs; int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1; accumulateEvents(allData, originalPullTimeNs, bucketEndTime); // We can probably flush the bucket. Since we used bucketEndTime when calling // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. flushIfNeededLocked(originalPullTimeNs); } else { VLOG("No need to commit data on condition false."); } } void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) { bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs; if (isEventLate) { VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime, (long long)mCurrentBucketStartTimeNs); VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs); StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); invalidateCurrentBucket(); return; } const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs; StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); if (pullDelayNs > mMaxPullDelayNs) { ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId, (long long)mMaxPullDelayNs); StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); // We are missing one pull from the bucket which means we will not have a complete view of // what's going on. invalidateCurrentBucket(); return; } if (allData.size() == 0) { VLOG("Data pulled is empty"); StatsdStats::getInstance().noteEmptyData(mPullTagId); } mMatchedMetricDimensionKeys.clear(); for (const auto& data : allData) { LogEvent localCopy = data->makeCopy(); if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == MatchingState::kMatched) { localCopy.setElapsedTimestampNs(bucketEndTime); localCopy.setElapsedTimestampNs(eventElapsedTimeNs); onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); } } mHasGlobalBase = true; // We can probably flush the bucket. Since we used bucketEndTime when calling // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. flushIfNeededLocked(realEventTime); } else { VLOG("No need to commit data on condition false."); // If the new pulled data does not contains some keys we track in our intervals, we need to // reset the base. for (auto& slice : mCurrentSlicedBucket) { bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first) != mMatchedMetricDimensionKeys.end(); if (!presentInPulledData) { for (auto& interval : slice.second) { interval.hasBase = false; } } } mMatchedMetricDimensionKeys.clear(); mHasGlobalBase = true; } void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { if (mCurrentSlicedBucket.size() == 0) { Loading Loading @@ -539,6 +545,7 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn (long long)mCurrentBucketStartTimeNs); return; } mMatchedMetricDimensionKeys.insert(eventKey); flushIfNeededLocked(eventTimeNs); Loading Loading
cmds/statsd/src/external/PullDataReceiver.h +2 −1 Original line number Diff line number Diff line Loading @@ -32,9 +32,10 @@ class PullDataReceiver : virtual public RefBase{ * @param data The pulled data. * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the * bucket should be invalidated. * @param originalPullTimeNs This is when all the pulls have been initiated (elapsed time). */ virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, bool pullSuccess) = 0; bool pullSuccess, int64_t originalPullTimeNs) = 0; }; } // namespace statsd Loading
cmds/statsd/src/external/StatsPullerManager.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -384,7 +384,7 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) { for (const auto& receiverInfo : pullInfo.second) { sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote(); if (receiverPtr != nullptr) { receiverPtr->onDataPulled(data, pullSuccess); receiverPtr->onDataPulled(data, pullSuccess, elapsedTimeNs); // We may have just come out of a coma, compute next pull time. int numBucketsAhead = (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs; Loading
cmds/statsd/src/metrics/GaugeMetricProducer.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -407,7 +407,7 @@ std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const Lo } void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, bool pullSuccess) { bool pullSuccess, int64_t originalPullTimeNs) { std::lock_guard<std::mutex> lock(mMutex); if (!pullSuccess || allData.size() == 0) { return; Loading
cmds/statsd/src/metrics/GaugeMetricProducer.h +1 −1 Original line number Diff line number Diff line Loading @@ -68,7 +68,7 @@ public: // Handles when the pulled data arrives. void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, bool pullSuccess) override; bool pullSuccess, int64_t originalPullTimeNs) override; // GaugeMetric needs to immediately trigger another pull when we create the partial bucket. void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid, Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +59 −52 Original line number Diff line number Diff line Loading @@ -361,34 +361,8 @@ void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { invalidateCurrentBucket(); return; } const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs; StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); if (pullDelayNs > mMaxPullDelayNs) { ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId, (long long)mMaxPullDelayNs); StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); // We are missing one pull from the bucket which means we will not have a complete view of // what's going on. invalidateCurrentBucket(); return; } if (timestampNs < mCurrentBucketStartTimeNs) { // The data will be skipped in onMatchedLogEventInternalLocked, but we don't want to report // for every event, just the pull StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); } for (const auto& data : allData) { // make a copy before doing and changes LogEvent localCopy = data->makeCopy(); localCopy.setElapsedTimestampNs(timestampNs); if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == MatchingState::kMatched) { onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); } } mHasGlobalBase = true; accumulateEvents(allData, timestampNs, timestampNs); } int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { Loading @@ -396,7 +370,7 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime } void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, bool pullSuccess) { bool pullSuccess, int64_t originalPullTimeNs) { std::lock_guard<std::mutex> lock(mMutex); if (mCondition) { if (!pullSuccess) { Loading @@ -405,11 +379,6 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven return; } if (allData.size() == 0) { VLOG("Data pulled is empty"); StatsdStats::getInstance().noteEmptyData(mPullTagId); return; } // For scheduled pulled data, the effective event time is snap to the nearest // bucket end. In the case of waking up from a deep sleep state, we will // attribute to the previous bucket end. If the sleep was long but not very long, we Loading @@ -417,32 +386,69 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven // we pull at a later time than real bucket end. // If the sleep was very long, we skip more than one bucket before sleep. In this case, // if the diff base will be cleared and this new data will serve as new diff base. int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs(); int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1; bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs; int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1; accumulateEvents(allData, originalPullTimeNs, bucketEndTime); // We can probably flush the bucket. Since we used bucketEndTime when calling // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. flushIfNeededLocked(originalPullTimeNs); } else { VLOG("No need to commit data on condition false."); } } void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) { bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs; if (isEventLate) { VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime, (long long)mCurrentBucketStartTimeNs); VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs); StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); invalidateCurrentBucket(); return; } const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs; StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); if (pullDelayNs > mMaxPullDelayNs) { ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId, (long long)mMaxPullDelayNs); StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); // We are missing one pull from the bucket which means we will not have a complete view of // what's going on. invalidateCurrentBucket(); return; } if (allData.size() == 0) { VLOG("Data pulled is empty"); StatsdStats::getInstance().noteEmptyData(mPullTagId); } mMatchedMetricDimensionKeys.clear(); for (const auto& data : allData) { LogEvent localCopy = data->makeCopy(); if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == MatchingState::kMatched) { localCopy.setElapsedTimestampNs(bucketEndTime); localCopy.setElapsedTimestampNs(eventElapsedTimeNs); onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); } } mHasGlobalBase = true; // We can probably flush the bucket. Since we used bucketEndTime when calling // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. flushIfNeededLocked(realEventTime); } else { VLOG("No need to commit data on condition false."); // If the new pulled data does not contains some keys we track in our intervals, we need to // reset the base. for (auto& slice : mCurrentSlicedBucket) { bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first) != mMatchedMetricDimensionKeys.end(); if (!presentInPulledData) { for (auto& interval : slice.second) { interval.hasBase = false; } } } mMatchedMetricDimensionKeys.clear(); mHasGlobalBase = true; } void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { if (mCurrentSlicedBucket.size() == 0) { Loading Loading @@ -539,6 +545,7 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn (long long)mCurrentBucketStartTimeNs); return; } mMatchedMetricDimensionKeys.insert(eventKey); flushIfNeededLocked(eventTimeNs); Loading