Loading cmds/statsd/src/StatsLogProcessor.cpp +58 −15 Original line number Diff line number Diff line Loading @@ -79,7 +79,8 @@ StatsLogProcessor::StatsLogProcessor(const sp<UidMap>& uidMap, mPeriodicAlarmMonitor(periodicAlarmMonitor), mSendBroadcast(sendBroadcast), mTimeBaseNs(timeBaseNs), mLastLogTimestamp(0) { mLargestTimestampSeen(0), mLastTimestampSeen(0) { } StatsLogProcessor::~StatsLogProcessor() { Loading Loading @@ -156,18 +157,54 @@ void StatsLogProcessor::onIsolatedUidChangedEventLocked(const LogEvent& event) { } void StatsLogProcessor::OnLogEvent(LogEvent* event) { OnLogEvent(event, false); } void StatsLogProcessor::OnLogEvent(LogEvent* event, bool reconnected) { std::lock_guard<std::mutex> lock(mMetricsMutex); const int64_t currentTimestampNs = event->GetElapsedTimestampNs(); if (currentTimestampNs < mLastLogTimestamp) { StatsdStats::getInstance().noteLogEventSkipped( event->GetTagId(), event->GetElapsedTimestampNs()); if (reconnected && mLastTimestampSeen != 0) { // LogReader tells us the connection has just been reset. Now we need // to enter reconnection state to find the last CP. mInReconnection = true; } if (mInReconnection) { // We see the checkpoint if (currentTimestampNs == mLastTimestampSeen) { mInReconnection = false; // Found the CP. ignore this event, and we will start to read from next event. return; } if (currentTimestampNs > mLargestTimestampSeen) { // We see a new log but CP has not been found yet. Give up now. mLogLossCount++; mInReconnection = false; StatsdStats::getInstance().noteLogLost(currentTimestampNs); // Persist the data before we reset. Do we want this? WriteDataToDiskLocked(); // We see fresher event before we see the checkpoint. We might have lost data. // The best we can do is to reset. std::vector<ConfigKey> configKeys; for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) { configKeys.push_back(it->first); } resetConfigsLocked(currentTimestampNs, configKeys); } else { // Still in search of the CP. Keep going. return; } } mLogCount++; mLastTimestampSeen = currentTimestampNs; if (mLargestTimestampSeen < currentTimestampNs) { mLargestTimestampSeen = currentTimestampNs; } resetIfConfigTtlExpiredLocked(currentTimestampNs); mLastLogTimestamp = currentTimestampNs; StatsdStats::getInstance().noteAtomLogged( event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC); Loading Loading @@ -339,15 +376,9 @@ void StatsLogProcessor::onConfigMetricsReportLocked(const ConfigKey& key, (long long)getWallClockNs()); } void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) { std::vector<ConfigKey> configKeysTtlExpired; for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) { if (it->second != nullptr && !it->second->isInTtl(timestampNs)) { configKeysTtlExpired.push_back(it->first); } } for (const auto& key : configKeysTtlExpired) { void StatsLogProcessor::resetConfigsLocked(const int64_t timestampNs, const std::vector<ConfigKey>& configs) { for (const auto& key : configs) { StatsdConfig config; if (StorageManager::readConfigFromDisk(key, &config)) { OnConfigUpdatedLocked(timestampNs, key, config); Loading @@ -362,6 +393,18 @@ void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) } } void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) { std::vector<ConfigKey> configKeysTtlExpired; for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) { if (it->second != nullptr && !it->second->isInTtl(timestampNs)) { configKeysTtlExpired.push_back(it->first); } } if (configKeysTtlExpired.size() > 0) { resetConfigsLocked(timestampNs, configKeysTtlExpired); } } void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) { std::lock_guard<std::mutex> lock(mMetricsMutex); auto it = mMetricsManagers.find(key); Loading cmds/statsd/src/StatsLogProcessor.h +18 −1 Original line number Diff line number Diff line Loading @@ -40,6 +40,9 @@ public: const std::function<void(const ConfigKey&)>& sendBroadcast); virtual ~StatsLogProcessor(); void OnLogEvent(LogEvent* event, bool reconnectionStarts); // for testing only. void OnLogEvent(LogEvent* event); void OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key, Loading Loading @@ -122,16 +125,30 @@ private: // Handler over the isolated uid change event. void onIsolatedUidChangedEventLocked(const LogEvent& event); void resetConfigsLocked(const int64_t timestampNs, const std::vector<ConfigKey>& configs); // Function used to send a broadcast so that receiver for the config key can call getData // to retrieve the stored data. std::function<void(const ConfigKey& key)> mSendBroadcast; const int64_t mTimeBaseNs; int64_t mLastLogTimestamp; // Largest timestamp of the events that we have processed. int64_t mLargestTimestampSeen = 0; int64_t mLastTimestampSeen = 0; bool mInReconnection = false; // Processed log count uint64_t mLogCount = 0; // Log loss detected count int mLogLossCount = 0; long mLastPullerCacheClearTimeSec = 0; FRIEND_TEST(StatsLogProcessorTest, TestOutOfOrderLogs); FRIEND_TEST(StatsLogProcessorTest, TestRateLimitByteSize); FRIEND_TEST(StatsLogProcessorTest, TestRateLimitBroadcast); FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge); Loading cmds/statsd/src/StatsService.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -818,8 +818,8 @@ void StatsService::Startup() { mConfigManager->Startup(); } void StatsService::OnLogEvent(LogEvent* event) { mProcessor->OnLogEvent(event); void StatsService::OnLogEvent(LogEvent* event, bool reconnectionStarts) { mProcessor->OnLogEvent(event, reconnectionStarts); } Status StatsService::getData(int64_t key, vector<uint8_t>* output) { Loading cmds/statsd/src/StatsService.h +1 −1 Original line number Diff line number Diff line Loading @@ -76,7 +76,7 @@ public: /** * Called by LogReader when there's a log event to process. */ virtual void OnLogEvent(LogEvent* event); virtual void OnLogEvent(LogEvent* event, bool reconnectionStarts); /** * Binder call for clients to request data for this configuration key. Loading cmds/statsd/src/guardrail/StatsdStats.cpp +15 −23 Original line number Diff line number Diff line Loading @@ -50,7 +50,7 @@ const int FIELD_ID_ANOMALY_ALARM_STATS = 9; // const int FIELD_ID_PULLED_ATOM_STATS = 10; // The proto is written in stats_log_util.cpp const int FIELD_ID_LOGGER_ERROR_STATS = 11; const int FIELD_ID_PERIODIC_ALARM_STATS = 12; const int FIELD_ID_SKIPPED_LOG_EVENT_STATS = 13; const int FIELD_ID_LOG_LOSS_STATS = 14; const int FIELD_ID_ATOM_STATS_TAG = 1; const int FIELD_ID_ATOM_STATS_COUNT = 2; Loading @@ -61,9 +61,6 @@ const int FIELD_ID_PERIODIC_ALARMS_REGISTERED = 1; const int FIELD_ID_LOGGER_STATS_TIME = 1; const int FIELD_ID_LOGGER_STATS_ERROR_CODE = 2; const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG = 1; const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP = 2; const int FIELD_ID_CONFIG_STATS_UID = 1; const int FIELD_ID_CONFIG_STATS_ID = 2; const int FIELD_ID_CONFIG_STATS_CREATION = 3; Loading Loading @@ -182,6 +179,14 @@ void StatsdStats::noteConfigReset(const ConfigKey& key) { noteConfigResetInternalLocked(key); } void StatsdStats::noteLogLost(int64_t timestampNs) { lock_guard<std::mutex> lock(mLock); if (mLogLossTimestampNs.size() == kMaxLoggerErrors) { mLogLossTimestampNs.pop_front(); } mLogLossTimestampNs.push_back(timestampNs); } void StatsdStats::noteBroadcastSent(const ConfigKey& key) { noteBroadcastSent(key, getWallClockSec()); } Loading Loading @@ -350,15 +355,6 @@ void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) { mPushedAtomStats[atomId]++; } void StatsdStats::noteLogEventSkipped(int tag, int64_t timestamp) { lock_guard<std::mutex> lock(mLock); // grows strictly one at a time. so it won't > kMaxSkippedLogEvents if (mSkippedLogEvents.size() == kMaxSkippedLogEvents) { mSkippedLogEvents.pop_front(); } mSkippedLogEvents.push_back(std::make_pair(tag, timestamp)); } void StatsdStats::noteLoggerError(int error) { lock_guard<std::mutex> lock(mLock); // grows strictly one at a time. so it won't > kMaxLoggerErrors Loading @@ -381,7 +377,7 @@ void StatsdStats::resetInternalLocked() { mAnomalyAlarmRegisteredStats = 0; mPeriodicAlarmRegisteredStats = 0; mLoggerErrors.clear(); mSkippedLogEvents.clear(); mLogLossTimestampNs.clear(); for (auto& config : mConfigStats) { config.second->broadcast_sent_time_sec.clear(); config.second->data_drop_time_sec.clear(); Loading Loading @@ -515,8 +511,8 @@ void StatsdStats::dumpStats(FILE* out) const { strftime(buffer, sizeof(buffer), "%Y-%m-%d %I:%M%p\n", error_tm); fprintf(out, "Logger error %d at %s\n", error.second, buffer); } for (const auto& skipped : mSkippedLogEvents) { fprintf(out, "Log event (%d) skipped at %lld\n", skipped.first, (long long)skipped.second); for (const auto& loss : mLogLossTimestampNs) { fprintf(out, "Log loss detected at %lld (elapsedRealtimeNs)\n", (long long)loss); } } Loading Loading @@ -672,13 +668,9 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) { proto.end(token); } for (const auto& skipped : mSkippedLogEvents) { uint64_t token = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_SKIPPED_LOG_EVENT_STATS | FIELD_COUNT_REPEATED); proto.write(FIELD_TYPE_INT32 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG, skipped.first); proto.write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP, (long long)skipped.second); proto.end(token); for (const auto& loss : mLogLossTimestampNs) { proto.write(FIELD_TYPE_INT64 | FIELD_ID_LOG_LOSS_STATS | FIELD_COUNT_REPEATED, (long long)loss); } output->clear(); Loading Loading
cmds/statsd/src/StatsLogProcessor.cpp +58 −15 Original line number Diff line number Diff line Loading @@ -79,7 +79,8 @@ StatsLogProcessor::StatsLogProcessor(const sp<UidMap>& uidMap, mPeriodicAlarmMonitor(periodicAlarmMonitor), mSendBroadcast(sendBroadcast), mTimeBaseNs(timeBaseNs), mLastLogTimestamp(0) { mLargestTimestampSeen(0), mLastTimestampSeen(0) { } StatsLogProcessor::~StatsLogProcessor() { Loading Loading @@ -156,18 +157,54 @@ void StatsLogProcessor::onIsolatedUidChangedEventLocked(const LogEvent& event) { } void StatsLogProcessor::OnLogEvent(LogEvent* event) { OnLogEvent(event, false); } void StatsLogProcessor::OnLogEvent(LogEvent* event, bool reconnected) { std::lock_guard<std::mutex> lock(mMetricsMutex); const int64_t currentTimestampNs = event->GetElapsedTimestampNs(); if (currentTimestampNs < mLastLogTimestamp) { StatsdStats::getInstance().noteLogEventSkipped( event->GetTagId(), event->GetElapsedTimestampNs()); if (reconnected && mLastTimestampSeen != 0) { // LogReader tells us the connection has just been reset. Now we need // to enter reconnection state to find the last CP. mInReconnection = true; } if (mInReconnection) { // We see the checkpoint if (currentTimestampNs == mLastTimestampSeen) { mInReconnection = false; // Found the CP. ignore this event, and we will start to read from next event. return; } if (currentTimestampNs > mLargestTimestampSeen) { // We see a new log but CP has not been found yet. Give up now. mLogLossCount++; mInReconnection = false; StatsdStats::getInstance().noteLogLost(currentTimestampNs); // Persist the data before we reset. Do we want this? WriteDataToDiskLocked(); // We see fresher event before we see the checkpoint. We might have lost data. // The best we can do is to reset. std::vector<ConfigKey> configKeys; for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) { configKeys.push_back(it->first); } resetConfigsLocked(currentTimestampNs, configKeys); } else { // Still in search of the CP. Keep going. return; } } mLogCount++; mLastTimestampSeen = currentTimestampNs; if (mLargestTimestampSeen < currentTimestampNs) { mLargestTimestampSeen = currentTimestampNs; } resetIfConfigTtlExpiredLocked(currentTimestampNs); mLastLogTimestamp = currentTimestampNs; StatsdStats::getInstance().noteAtomLogged( event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC); Loading Loading @@ -339,15 +376,9 @@ void StatsLogProcessor::onConfigMetricsReportLocked(const ConfigKey& key, (long long)getWallClockNs()); } void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) { std::vector<ConfigKey> configKeysTtlExpired; for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) { if (it->second != nullptr && !it->second->isInTtl(timestampNs)) { configKeysTtlExpired.push_back(it->first); } } for (const auto& key : configKeysTtlExpired) { void StatsLogProcessor::resetConfigsLocked(const int64_t timestampNs, const std::vector<ConfigKey>& configs) { for (const auto& key : configs) { StatsdConfig config; if (StorageManager::readConfigFromDisk(key, &config)) { OnConfigUpdatedLocked(timestampNs, key, config); Loading @@ -362,6 +393,18 @@ void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) } } void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) { std::vector<ConfigKey> configKeysTtlExpired; for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) { if (it->second != nullptr && !it->second->isInTtl(timestampNs)) { configKeysTtlExpired.push_back(it->first); } } if (configKeysTtlExpired.size() > 0) { resetConfigsLocked(timestampNs, configKeysTtlExpired); } } void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) { std::lock_guard<std::mutex> lock(mMetricsMutex); auto it = mMetricsManagers.find(key); Loading
cmds/statsd/src/StatsLogProcessor.h +18 −1 Original line number Diff line number Diff line Loading @@ -40,6 +40,9 @@ public: const std::function<void(const ConfigKey&)>& sendBroadcast); virtual ~StatsLogProcessor(); void OnLogEvent(LogEvent* event, bool reconnectionStarts); // for testing only. void OnLogEvent(LogEvent* event); void OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key, Loading Loading @@ -122,16 +125,30 @@ private: // Handler over the isolated uid change event. void onIsolatedUidChangedEventLocked(const LogEvent& event); void resetConfigsLocked(const int64_t timestampNs, const std::vector<ConfigKey>& configs); // Function used to send a broadcast so that receiver for the config key can call getData // to retrieve the stored data. std::function<void(const ConfigKey& key)> mSendBroadcast; const int64_t mTimeBaseNs; int64_t mLastLogTimestamp; // Largest timestamp of the events that we have processed. int64_t mLargestTimestampSeen = 0; int64_t mLastTimestampSeen = 0; bool mInReconnection = false; // Processed log count uint64_t mLogCount = 0; // Log loss detected count int mLogLossCount = 0; long mLastPullerCacheClearTimeSec = 0; FRIEND_TEST(StatsLogProcessorTest, TestOutOfOrderLogs); FRIEND_TEST(StatsLogProcessorTest, TestRateLimitByteSize); FRIEND_TEST(StatsLogProcessorTest, TestRateLimitBroadcast); FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge); Loading
cmds/statsd/src/StatsService.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -818,8 +818,8 @@ void StatsService::Startup() { mConfigManager->Startup(); } void StatsService::OnLogEvent(LogEvent* event) { mProcessor->OnLogEvent(event); void StatsService::OnLogEvent(LogEvent* event, bool reconnectionStarts) { mProcessor->OnLogEvent(event, reconnectionStarts); } Status StatsService::getData(int64_t key, vector<uint8_t>* output) { Loading
cmds/statsd/src/StatsService.h +1 −1 Original line number Diff line number Diff line Loading @@ -76,7 +76,7 @@ public: /** * Called by LogReader when there's a log event to process. */ virtual void OnLogEvent(LogEvent* event); virtual void OnLogEvent(LogEvent* event, bool reconnectionStarts); /** * Binder call for clients to request data for this configuration key. Loading
cmds/statsd/src/guardrail/StatsdStats.cpp +15 −23 Original line number Diff line number Diff line Loading @@ -50,7 +50,7 @@ const int FIELD_ID_ANOMALY_ALARM_STATS = 9; // const int FIELD_ID_PULLED_ATOM_STATS = 10; // The proto is written in stats_log_util.cpp const int FIELD_ID_LOGGER_ERROR_STATS = 11; const int FIELD_ID_PERIODIC_ALARM_STATS = 12; const int FIELD_ID_SKIPPED_LOG_EVENT_STATS = 13; const int FIELD_ID_LOG_LOSS_STATS = 14; const int FIELD_ID_ATOM_STATS_TAG = 1; const int FIELD_ID_ATOM_STATS_COUNT = 2; Loading @@ -61,9 +61,6 @@ const int FIELD_ID_PERIODIC_ALARMS_REGISTERED = 1; const int FIELD_ID_LOGGER_STATS_TIME = 1; const int FIELD_ID_LOGGER_STATS_ERROR_CODE = 2; const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG = 1; const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP = 2; const int FIELD_ID_CONFIG_STATS_UID = 1; const int FIELD_ID_CONFIG_STATS_ID = 2; const int FIELD_ID_CONFIG_STATS_CREATION = 3; Loading Loading @@ -182,6 +179,14 @@ void StatsdStats::noteConfigReset(const ConfigKey& key) { noteConfigResetInternalLocked(key); } void StatsdStats::noteLogLost(int64_t timestampNs) { lock_guard<std::mutex> lock(mLock); if (mLogLossTimestampNs.size() == kMaxLoggerErrors) { mLogLossTimestampNs.pop_front(); } mLogLossTimestampNs.push_back(timestampNs); } void StatsdStats::noteBroadcastSent(const ConfigKey& key) { noteBroadcastSent(key, getWallClockSec()); } Loading Loading @@ -350,15 +355,6 @@ void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) { mPushedAtomStats[atomId]++; } void StatsdStats::noteLogEventSkipped(int tag, int64_t timestamp) { lock_guard<std::mutex> lock(mLock); // grows strictly one at a time. so it won't > kMaxSkippedLogEvents if (mSkippedLogEvents.size() == kMaxSkippedLogEvents) { mSkippedLogEvents.pop_front(); } mSkippedLogEvents.push_back(std::make_pair(tag, timestamp)); } void StatsdStats::noteLoggerError(int error) { lock_guard<std::mutex> lock(mLock); // grows strictly one at a time. so it won't > kMaxLoggerErrors Loading @@ -381,7 +377,7 @@ void StatsdStats::resetInternalLocked() { mAnomalyAlarmRegisteredStats = 0; mPeriodicAlarmRegisteredStats = 0; mLoggerErrors.clear(); mSkippedLogEvents.clear(); mLogLossTimestampNs.clear(); for (auto& config : mConfigStats) { config.second->broadcast_sent_time_sec.clear(); config.second->data_drop_time_sec.clear(); Loading Loading @@ -515,8 +511,8 @@ void StatsdStats::dumpStats(FILE* out) const { strftime(buffer, sizeof(buffer), "%Y-%m-%d %I:%M%p\n", error_tm); fprintf(out, "Logger error %d at %s\n", error.second, buffer); } for (const auto& skipped : mSkippedLogEvents) { fprintf(out, "Log event (%d) skipped at %lld\n", skipped.first, (long long)skipped.second); for (const auto& loss : mLogLossTimestampNs) { fprintf(out, "Log loss detected at %lld (elapsedRealtimeNs)\n", (long long)loss); } } Loading Loading @@ -672,13 +668,9 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) { proto.end(token); } for (const auto& skipped : mSkippedLogEvents) { uint64_t token = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_SKIPPED_LOG_EVENT_STATS | FIELD_COUNT_REPEATED); proto.write(FIELD_TYPE_INT32 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG, skipped.first); proto.write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP, (long long)skipped.second); proto.end(token); for (const auto& loss : mLogLossTimestampNs) { proto.write(FIELD_TYPE_INT64 | FIELD_ID_LOG_LOSS_STATS | FIELD_COUNT_REPEATED, (long long)loss); } output->clear(); Loading