Loading cmds/statsd/src/StatsService.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -826,7 +826,7 @@ status_t StatsService::cmd_print_pulled_metrics(int out, const Vector<String8>& uids.push_back(AID_SYSTEM); } vector<shared_ptr<LogEvent>> stats; if (mPullerManager->Pull(s, uids, &stats)) { if (mPullerManager->Pull(s, uids, getElapsedRealtimeNs(), &stats)) { for (const auto& it : stats) { dprintf(out, "Pull from %d: %s\n", s, it->ToString().c_str()); } Loading cmds/statsd/src/external/StatsPuller.cpp +8 −5 Original line number Diff line number Diff line Loading @@ -38,14 +38,16 @@ StatsPuller::StatsPuller(const int tagId, const int64_t coolDownNs, const int64_ mPullTimeoutNs(pullTimeoutNs), mCoolDownNs(coolDownNs), mAdditiveFields(additiveFields), mLastPullTimeNs(0) { mLastPullTimeNs(0), mLastEventTimeNs(0) { } bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) { bool StatsPuller::Pull(const int64_t eventTimeNs, std::vector<std::shared_ptr<LogEvent>>* data) { lock_guard<std::mutex> lock(mLock); int64_t elapsedTimeNs = getElapsedRealtimeNs(); StatsdStats::getInstance().notePull(mTagId); const bool shouldUseCache = elapsedTimeNs - mLastPullTimeNs < mCoolDownNs; const bool shouldUseCache = (mLastEventTimeNs == eventTimeNs) || (elapsedTimeNs - mLastPullTimeNs < mCoolDownNs); if (shouldUseCache) { if (mHasGoodData) { (*data) = mCachedData; Loading @@ -54,13 +56,13 @@ bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) { } return mHasGoodData; } if (mLastPullTimeNs > 0) { StatsdStats::getInstance().updateMinPullIntervalSec( mTagId, (elapsedTimeNs - mLastPullTimeNs) / NS_PER_SEC); } mCachedData.clear(); mLastPullTimeNs = elapsedTimeNs; mLastEventTimeNs = eventTimeNs; mHasGoodData = PullInternal(&mCachedData); if (!mHasGoodData) { return mHasGoodData; Loading @@ -70,7 +72,7 @@ bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) { const bool pullTimeOut = pullDurationNs > mPullTimeoutNs; if (pullTimeOut) { // Something went wrong. Discard the data. clearCacheLocked(); mCachedData.clear(); mHasGoodData = false; StatsdStats::getInstance().notePullTimeout(mTagId); ALOGW("Pull for atom %d exceeds timeout %lld nano seconds.", mTagId, Loading Loading @@ -104,6 +106,7 @@ int StatsPuller::clearCacheLocked() { int ret = mCachedData.size(); mCachedData.clear(); mLastPullTimeNs = 0; mLastEventTimeNs = 0; return ret; } Loading cmds/statsd/src/external/StatsPuller.h +6 −1 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ public: // 2) pull takes longer than mPullTimeoutNs (intrinsic to puller) // If a metric wants to make any change to the data, like timestamps, it // should make a copy as this data may be shared with multiple metrics. bool Pull(std::vector<std::shared_ptr<LogEvent>>* data); bool Pull(const int64_t eventTimeNs, std::vector<std::shared_ptr<LogEvent>>* data); // Clear cache immediately int ForceClearCache(); Loading Loading @@ -94,6 +94,11 @@ private: int64_t mLastPullTimeNs; // All pulls happen due to an event (app upgrade, bucket boundary, condition change, etc). // If multiple pulls need to be done at the same event time, we will always use the cache after // the first pull. int64_t mLastEventTimeNs; // Cache of data from last pull. If next request comes before cool down finishes, // cached data will be returned. // Cached data is cleared when Loading cmds/statsd/src/external/StatsPullerManager.cpp +13 −10 Original line number Diff line number Diff line Loading @@ -91,20 +91,21 @@ StatsPullerManager::StatsPullerManager() mPullAtomCallbackDeathRecipient(AIBinder_DeathRecipient_new(pullAtomCallbackDied)) { } bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data, bool useUids) { std::lock_guard<std::mutex> _l(mLock); return PullLocked(tagId, configKey, data, useUids); return PullLocked(tagId, configKey, eventTimeNs, data, useUids); } bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool useUids) { std::lock_guard<std::mutex> _l(mLock); return PullLocked(tagId, uids, data, useUids); return PullLocked(tagId, uids, eventTimeNs, data, useUids); } bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey, vector<shared_ptr<LogEvent>>* data, bool useUids) { const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data, bool useUids) { vector<int32_t> uids; if (useUids) { auto uidProviderIt = mPullUidProviders.find(configKey); Loading @@ -123,18 +124,19 @@ bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey, } uids = pullUidProvider->getPullAtomUids(tagId); } return PullLocked(tagId, uids, data, useUids); return PullLocked(tagId, uids, eventTimeNs, data, useUids); } bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids, vector<shared_ptr<LogEvent>>* data, bool useUids) { const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data, bool useUids) { VLOG("Initiating pulling %d", tagId); if (useUids) { for (int32_t uid : uids) { PullerKey key = {.atomTag = tagId, .uid = uid}; auto pullerIt = kAllPullAtomInfo.find(key); if (pullerIt != kAllPullAtomInfo.end()) { bool ret = pullerIt->second->Pull(data); bool ret = pullerIt->second->Pull(eventTimeNs, data); VLOG("pulled %zu items", data->size()); if (!ret) { StatsdStats::getInstance().notePullFailed(tagId); Loading @@ -149,7 +151,7 @@ bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids, PullerKey key = {.atomTag = tagId, .uid = -1}; auto pullerIt = kAllPullAtomInfo.find(key); if (pullerIt != kAllPullAtomInfo.end()) { bool ret = pullerIt->second->Pull(data); bool ret = pullerIt->second->Pull(eventTimeNs, data); VLOG("pulled %zu items", data->size()); if (!ret) { StatsdStats::getInstance().notePullFailed(tagId); Loading Loading @@ -290,7 +292,8 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) { } for (const auto& pullInfo : needToPull) { vector<shared_ptr<LogEvent>> data; bool pullSuccess = PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, &data); bool pullSuccess = PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data); if (!pullSuccess) { VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs); } Loading cmds/statsd/src/external/StatsPullerManager.h +6 −6 Original line number Diff line number Diff line Loading @@ -101,11 +101,11 @@ public: // registered for any of the uids for this atom. // If the metric wants to make any change to the data, like timestamps, they // should make a copy as this data may be shared with multiple metrics. virtual bool Pull(int tagId, const ConfigKey& configKey, virtual bool Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool useUids = true); // Same as above, but directly specify the allowed uids to pull from. virtual bool Pull(int tagId, const vector<int32_t>& uids, virtual bool Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool useUids = true); // Clear pull data cache immediately. Loading Loading @@ -152,11 +152,11 @@ private: // mapping from Config Key to the PullUidProvider for that config std::map<ConfigKey, wp<PullUidProvider>> mPullUidProviders; bool PullLocked(int tagId, const ConfigKey& configKey, vector<std::shared_ptr<LogEvent>>* data, bool useUids = true); bool PullLocked(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool useUids = true); bool PullLocked(int tagId, const vector<int32_t>& uids, vector<std::shared_ptr<LogEvent>>* data, bool useUids); bool PullLocked(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool useUids); // locks for data receiver and StatsCompanionService changes std::mutex mLock; Loading Loading
cmds/statsd/src/StatsService.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -826,7 +826,7 @@ status_t StatsService::cmd_print_pulled_metrics(int out, const Vector<String8>& uids.push_back(AID_SYSTEM); } vector<shared_ptr<LogEvent>> stats; if (mPullerManager->Pull(s, uids, &stats)) { if (mPullerManager->Pull(s, uids, getElapsedRealtimeNs(), &stats)) { for (const auto& it : stats) { dprintf(out, "Pull from %d: %s\n", s, it->ToString().c_str()); } Loading
cmds/statsd/src/external/StatsPuller.cpp +8 −5 Original line number Diff line number Diff line Loading @@ -38,14 +38,16 @@ StatsPuller::StatsPuller(const int tagId, const int64_t coolDownNs, const int64_ mPullTimeoutNs(pullTimeoutNs), mCoolDownNs(coolDownNs), mAdditiveFields(additiveFields), mLastPullTimeNs(0) { mLastPullTimeNs(0), mLastEventTimeNs(0) { } bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) { bool StatsPuller::Pull(const int64_t eventTimeNs, std::vector<std::shared_ptr<LogEvent>>* data) { lock_guard<std::mutex> lock(mLock); int64_t elapsedTimeNs = getElapsedRealtimeNs(); StatsdStats::getInstance().notePull(mTagId); const bool shouldUseCache = elapsedTimeNs - mLastPullTimeNs < mCoolDownNs; const bool shouldUseCache = (mLastEventTimeNs == eventTimeNs) || (elapsedTimeNs - mLastPullTimeNs < mCoolDownNs); if (shouldUseCache) { if (mHasGoodData) { (*data) = mCachedData; Loading @@ -54,13 +56,13 @@ bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) { } return mHasGoodData; } if (mLastPullTimeNs > 0) { StatsdStats::getInstance().updateMinPullIntervalSec( mTagId, (elapsedTimeNs - mLastPullTimeNs) / NS_PER_SEC); } mCachedData.clear(); mLastPullTimeNs = elapsedTimeNs; mLastEventTimeNs = eventTimeNs; mHasGoodData = PullInternal(&mCachedData); if (!mHasGoodData) { return mHasGoodData; Loading @@ -70,7 +72,7 @@ bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) { const bool pullTimeOut = pullDurationNs > mPullTimeoutNs; if (pullTimeOut) { // Something went wrong. Discard the data. clearCacheLocked(); mCachedData.clear(); mHasGoodData = false; StatsdStats::getInstance().notePullTimeout(mTagId); ALOGW("Pull for atom %d exceeds timeout %lld nano seconds.", mTagId, Loading Loading @@ -104,6 +106,7 @@ int StatsPuller::clearCacheLocked() { int ret = mCachedData.size(); mCachedData.clear(); mLastPullTimeNs = 0; mLastEventTimeNs = 0; return ret; } Loading
cmds/statsd/src/external/StatsPuller.h +6 −1 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ public: // 2) pull takes longer than mPullTimeoutNs (intrinsic to puller) // If a metric wants to make any change to the data, like timestamps, it // should make a copy as this data may be shared with multiple metrics. bool Pull(std::vector<std::shared_ptr<LogEvent>>* data); bool Pull(const int64_t eventTimeNs, std::vector<std::shared_ptr<LogEvent>>* data); // Clear cache immediately int ForceClearCache(); Loading Loading @@ -94,6 +94,11 @@ private: int64_t mLastPullTimeNs; // All pulls happen due to an event (app upgrade, bucket boundary, condition change, etc). // If multiple pulls need to be done at the same event time, we will always use the cache after // the first pull. int64_t mLastEventTimeNs; // Cache of data from last pull. If next request comes before cool down finishes, // cached data will be returned. // Cached data is cleared when Loading
cmds/statsd/src/external/StatsPullerManager.cpp +13 −10 Original line number Diff line number Diff line Loading @@ -91,20 +91,21 @@ StatsPullerManager::StatsPullerManager() mPullAtomCallbackDeathRecipient(AIBinder_DeathRecipient_new(pullAtomCallbackDied)) { } bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data, bool useUids) { std::lock_guard<std::mutex> _l(mLock); return PullLocked(tagId, configKey, data, useUids); return PullLocked(tagId, configKey, eventTimeNs, data, useUids); } bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool useUids) { std::lock_guard<std::mutex> _l(mLock); return PullLocked(tagId, uids, data, useUids); return PullLocked(tagId, uids, eventTimeNs, data, useUids); } bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey, vector<shared_ptr<LogEvent>>* data, bool useUids) { const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data, bool useUids) { vector<int32_t> uids; if (useUids) { auto uidProviderIt = mPullUidProviders.find(configKey); Loading @@ -123,18 +124,19 @@ bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey, } uids = pullUidProvider->getPullAtomUids(tagId); } return PullLocked(tagId, uids, data, useUids); return PullLocked(tagId, uids, eventTimeNs, data, useUids); } bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids, vector<shared_ptr<LogEvent>>* data, bool useUids) { const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data, bool useUids) { VLOG("Initiating pulling %d", tagId); if (useUids) { for (int32_t uid : uids) { PullerKey key = {.atomTag = tagId, .uid = uid}; auto pullerIt = kAllPullAtomInfo.find(key); if (pullerIt != kAllPullAtomInfo.end()) { bool ret = pullerIt->second->Pull(data); bool ret = pullerIt->second->Pull(eventTimeNs, data); VLOG("pulled %zu items", data->size()); if (!ret) { StatsdStats::getInstance().notePullFailed(tagId); Loading @@ -149,7 +151,7 @@ bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids, PullerKey key = {.atomTag = tagId, .uid = -1}; auto pullerIt = kAllPullAtomInfo.find(key); if (pullerIt != kAllPullAtomInfo.end()) { bool ret = pullerIt->second->Pull(data); bool ret = pullerIt->second->Pull(eventTimeNs, data); VLOG("pulled %zu items", data->size()); if (!ret) { StatsdStats::getInstance().notePullFailed(tagId); Loading Loading @@ -290,7 +292,8 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) { } for (const auto& pullInfo : needToPull) { vector<shared_ptr<LogEvent>> data; bool pullSuccess = PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, &data); bool pullSuccess = PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data); if (!pullSuccess) { VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs); } Loading
cmds/statsd/src/external/StatsPullerManager.h +6 −6 Original line number Diff line number Diff line Loading @@ -101,11 +101,11 @@ public: // registered for any of the uids for this atom. // If the metric wants to make any change to the data, like timestamps, they // should make a copy as this data may be shared with multiple metrics. virtual bool Pull(int tagId, const ConfigKey& configKey, virtual bool Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool useUids = true); // Same as above, but directly specify the allowed uids to pull from. virtual bool Pull(int tagId, const vector<int32_t>& uids, virtual bool Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool useUids = true); // Clear pull data cache immediately. Loading Loading @@ -152,11 +152,11 @@ private: // mapping from Config Key to the PullUidProvider for that config std::map<ConfigKey, wp<PullUidProvider>> mPullUidProviders; bool PullLocked(int tagId, const ConfigKey& configKey, vector<std::shared_ptr<LogEvent>>* data, bool useUids = true); bool PullLocked(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool useUids = true); bool PullLocked(int tagId, const vector<int32_t>& uids, vector<std::shared_ptr<LogEvent>>* data, bool useUids); bool PullLocked(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs, vector<std::shared_ptr<LogEvent>>* data, bool useUids); // locks for data receiver and StatsCompanionService changes std::mutex mLock; Loading