Loading cmds/statsd/src/anomaly/AnomalyTracker.cpp +84 −67 Original line number Diff line number Diff line Loading @@ -31,7 +31,6 @@ namespace android { namespace os { namespace statsd { // TODO: Get rid of bucketNumbers, and return to the original circular array method. AnomalyTracker::AnomalyTracker(const Alert& alert, const ConfigKey& configKey) : mAlert(alert), mConfigKey(configKey), mNumOfPastBuckets(mAlert.num_buckets() - 1) { VLOG("AnomalyTracker() called"); Loading Loading @@ -60,87 +59,104 @@ void AnomalyTracker::resetStorage() { size_t AnomalyTracker::index(int64_t bucketNum) const { if (bucketNum < 0) { // To support this use-case, we can easily modify index to wrap around. But currently // AnomalyTracker should never need this, so if it happens, it's a bug we should log. // TODO: Audit this. ALOGE("index() was passed a negative bucket number (%lld)!", (long long)bucketNum); } return bucketNum % mNumOfPastBuckets; } void AnomalyTracker::flushPastBuckets(const int64_t& latestPastBucketNum) { VLOG("addPastBucket() called."); if (latestPastBucketNum <= mMostRecentBucketNum - mNumOfPastBuckets) { ALOGE("Cannot add a past bucket %lld units in past", (long long)latestPastBucketNum); void AnomalyTracker::advanceMostRecentBucketTo(const int64_t& bucketNum) { VLOG("advanceMostRecentBucketTo() called."); if (bucketNum <= mMostRecentBucketNum) { ALOGW("Cannot advance buckets backwards (bucketNum=%lld but mMostRecentBucketNum=%lld)", (long long)bucketNum, (long long)mMostRecentBucketNum); return; } // The past packets are ancient. Empty out old mPastBuckets[i] values and reset // mSumOverPastBuckets. if (latestPastBucketNum - mMostRecentBucketNum >= mNumOfPastBuckets) { // If in the future (i.e. buckets are ancient), just empty out all past info. if (bucketNum >= mMostRecentBucketNum + mNumOfPastBuckets) { resetStorage(); } else { for (int64_t i = std::max(0LL, (long long)(mMostRecentBucketNum - mNumOfPastBuckets + 1)); i <= latestPastBucketNum - mNumOfPastBuckets; i++) { mMostRecentBucketNum = bucketNum; return; } // Clear out space by emptying out old mPastBuckets[i] values and update mSumOverPastBuckets. for (int64_t i = mMostRecentBucketNum + 1; i <= bucketNum; i++) { const int idx = index(i); subtractBucketFromSum(mPastBuckets[idx]); mPastBuckets[idx] = nullptr; // release (but not clear) the old bucket. } mMostRecentBucketNum = bucketNum; } // It is an update operation. if (latestPastBucketNum <= mMostRecentBucketNum && latestPastBucketNum > mMostRecentBucketNum - mNumOfPastBuckets) { subtractBucketFromSum(mPastBuckets[index(latestPastBucketNum)]); } } void AnomalyTracker::addPastBucket(const MetricDimensionKey& key, const int64_t& bucketValue, void AnomalyTracker::addPastBucket(const MetricDimensionKey& key, const int64_t& bucketValue, const int64_t& bucketNum) { if (mNumOfPastBuckets == 0) { VLOG("addPastBucket(bucketValue) called."); if (mNumOfPastBuckets == 0 || bucketNum < 0 || bucketNum <= mMostRecentBucketNum - mNumOfPastBuckets) { return; } flushPastBuckets(bucketNum); auto& bucket = mPastBuckets[index(bucketNum)]; if (bucket == nullptr) { bucket = std::make_shared<DimToValMap>(); const int bucketIndex = index(bucketNum); if (bucketNum <= mMostRecentBucketNum && (mPastBuckets[bucketIndex] != nullptr)) { // We need to insert into an already existing past bucket. std::shared_ptr<DimToValMap>& bucket = mPastBuckets[bucketIndex]; auto itr = bucket->find(key); if (itr != bucket->end()) { // Old entry already exists; update it. subtractValueFromSum(key, itr->second); itr->second = bucketValue; } else { bucket->insert({key, bucketValue}); } mSumOverPastBuckets[key] += bucketValue; } else { // Bucket does not exist yet (in future or was never made), so we must make it. std::shared_ptr<DimToValMap> bucket = std::make_shared<DimToValMap>(); bucket->insert({key, bucketValue}); addBucketToSum(bucket); mMostRecentBucketNum = std::max(mMostRecentBucketNum, bucketNum); addPastBucket(bucket, bucketNum); } } void AnomalyTracker::addPastBucket(std::shared_ptr<DimToValMap> bucketValues, void AnomalyTracker::addPastBucket(std::shared_ptr<DimToValMap> bucket, const int64_t& bucketNum) { VLOG("addPastBucket() called."); if (mNumOfPastBuckets == 0) { VLOG("addPastBucket(bucket) called."); if (mNumOfPastBuckets == 0 || bucketNum < 0 || bucketNum <= mMostRecentBucketNum - mNumOfPastBuckets) { return; } flushPastBuckets(bucketNum); // Replace the oldest bucket with the new bucket we are adding. mPastBuckets[index(bucketNum)] = bucketValues; addBucketToSum(bucketValues); mMostRecentBucketNum = std::max(mMostRecentBucketNum, bucketNum); if (bucketNum <= mMostRecentBucketNum) { // We are updating an old bucket, not adding a new one. subtractBucketFromSum(mPastBuckets[index(bucketNum)]); } else { // Clear space for the new bucket to be at bucketNum. advanceMostRecentBucketTo(bucketNum); } mPastBuckets[index(bucketNum)] = bucket; addBucketToSum(bucket); } void AnomalyTracker::subtractBucketFromSum(const shared_ptr<DimToValMap>& bucket) { if (bucket == nullptr) { return; } // For each dimension present in the bucket, subtract its value from its corresponding sum. for (const auto& keyValuePair : *bucket) { auto itr = mSumOverPastBuckets.find(keyValuePair.first); subtractValueFromSum(keyValuePair.first, keyValuePair.second); } } void AnomalyTracker::subtractValueFromSum(const MetricDimensionKey& key, const int64_t& bucketValue) { auto itr = mSumOverPastBuckets.find(key); if (itr == mSumOverPastBuckets.end()) { continue; return; } itr->second -= keyValuePair.second; // TODO: No need to look up the object twice like this. Use a var. itr->second -= bucketValue; if (itr->second == 0) { mSumOverPastBuckets.erase(itr); } } } void AnomalyTracker::addBucketToSum(const shared_ptr<DimToValMap>& bucket) { if (bucket == nullptr) { Loading @@ -154,7 +170,8 @@ void AnomalyTracker::addBucketToSum(const shared_ptr<DimToValMap>& bucket) { int64_t AnomalyTracker::getPastBucketValue(const MetricDimensionKey& key, const int64_t& bucketNum) const { if (mNumOfPastBuckets == 0 || bucketNum < 0) { if (bucketNum < 0 || bucketNum <= mMostRecentBucketNum - mNumOfPastBuckets || bucketNum > mMostRecentBucketNum) { return 0; } Loading @@ -174,11 +191,13 @@ int64_t AnomalyTracker::getSumOverPastBuckets(const MetricDimensionKey& key) con return 0; } bool AnomalyTracker::detectAnomaly(const int64_t& currentBucketNum, const MetricDimensionKey& key, bool AnomalyTracker::detectAnomaly(const int64_t& currentBucketNum, const MetricDimensionKey& key, const int64_t& currentBucketValue) { // currentBucketNum should be the next bucket after pastBuckets. If not, advance so that it is. if (currentBucketNum > mMostRecentBucketNum + 1) { // TODO: This creates a needless 0 entry in mSumOverPastBuckets. Fix this. addPastBucket(key, 0, currentBucketNum - 1); advanceMostRecentBucketTo(currentBucketNum - 1); } return mAlert.has_trigger_if_sum_gt() && getSumOverPastBuckets(key) + currentBucketValue > mAlert.trigger_if_sum_gt(); Loading @@ -190,19 +209,17 @@ void AnomalyTracker::declareAnomaly(const uint64_t& timestampNs, const MetricDim VLOG("Skipping anomaly declaration since within refractory period"); return; } mRefractoryPeriodEndsSec[key] = (timestampNs / NS_PER_SEC) + mAlert.refractory_period_secs(); if (mAlert.has_refractory_period_secs()) { mRefractoryPeriodEndsSec[key] = ((timestampNs + NS_PER_SEC - 1) / NS_PER_SEC) // round up + mAlert.refractory_period_secs(); // TODO: If we had access to the bucket_size_millis, consider calling resetStorage() // if (mAlert.refractory_period_secs() > mNumOfPastBuckets * bucketSizeNs) {resetStorage();} } if (!mSubscriptions.empty()) { if (mAlert.has_id()) { ALOGI("An anomaly (%lld) %s has occurred! Informing subscribers.", mAlert.id(), key.toString().c_str()); ALOGI("An anomaly (%lld) %s has occurred! Informing subscribers.", mAlert.id(), key.toString().c_str()); informSubscribers(key); } else { ALOGI("An anomaly (with no id) has occurred! Not informing any subscribers."); } } else { ALOGI("An anomaly has occurred! (But no subscriber for that alert.)"); } Loading @@ -227,7 +244,7 @@ bool AnomalyTracker::isInRefractoryPeriod(const uint64_t& timestampNs, const MetricDimensionKey& key) { const auto& it = mRefractoryPeriodEndsSec.find(key); if (it != mRefractoryPeriodEndsSec.end()) { if ((timestampNs / NS_PER_SEC) <= it->second) { if (timestampNs < it->second * NS_PER_SEC) { return true; } else { mRefractoryPeriodEndsSec.erase(key); Loading cmds/statsd/src/anomaly/AnomalyTracker.h +40 −15 Original line number Diff line number Diff line Loading @@ -47,20 +47,32 @@ public: mSubscriptions.push_back(subscription); } // Adds a bucket. // Bucket index starts from 0. void addPastBucket(std::shared_ptr<DimToValMap> bucketValues, const int64_t& bucketNum); // Adds a bucket for the given bucketNum (index starting at 0). // If a bucket for bucketNum already exists, it will be replaced. // Also, advances to bucketNum (if not in the past), effectively filling any intervening // buckets with 0s. void addPastBucket(std::shared_ptr<DimToValMap> bucket, const int64_t& bucketNum); // Inserts (or replaces) the bucket entry for the given bucketNum at the given key to be the // given bucketValue. If the bucket does not exist, it will be created. // Also, advances to bucketNum (if not in the past), effectively filling any intervening // buckets with 0s. void addPastBucket(const MetricDimensionKey& key, const int64_t& bucketValue, const int64_t& bucketNum); // Returns true if detected anomaly for the existing buckets on one or more dimension keys. // Returns true if, based on past buckets plus the new currentBucketValue (which generally // represents the partially-filled current bucket), an anomaly has happened. // Also advances to currBucketNum-1. bool detectAnomaly(const int64_t& currBucketNum, const MetricDimensionKey& key, const int64_t& currentBucketValue); // Informs incidentd about the detected alert. void declareAnomaly(const uint64_t& timestampNs, const MetricDimensionKey& key); // Detects the alert and informs the incidentd when applicable. // Detects if, based on past buckets plus the new currentBucketValue (which generally // represents the partially-filled current bucket), an anomaly has happened, and if so, // declares an anomaly and informs relevant subscribers. // Also advances to currBucketNum-1. void detectAndDeclareAnomaly(const uint64_t& timestampNs, const int64_t& currBucketNum, const MetricDimensionKey& key, const int64_t& currentBucketValue); Loading @@ -69,24 +81,26 @@ public: return; // Base AnomalyTracker class has no need for the AlarmMonitor. } // Helper function to return the sum value of past buckets at given dimension. // Returns the sum of all past bucket values for the given dimension key. int64_t getSumOverPastBuckets(const MetricDimensionKey& key) const; // Helper function to return the value for a past bucket. // Returns the value for a past bucket, or 0 if that bucket doesn't exist. int64_t getPastBucketValue(const MetricDimensionKey& key, const int64_t& bucketNum) const; // Returns the anomaly threshold. // Returns the anomaly threshold set in the configuration. inline int64_t getAnomalyThreshold() const { return mAlert.trigger_if_sum_gt(); } // Returns the refractory period timestamp (in seconds) for the given key. // Returns the refractory period ending timestamp (in seconds) for the given key. // Before this moment, any detected anomaly will be ignored. // If there is no stored refractory period ending timestamp, returns 0. uint32_t getRefractoryPeriodEndsSec(const MetricDimensionKey& key) const { const auto& it = mRefractoryPeriodEndsSec.find(key); return it != mRefractoryPeriodEndsSec.end() ? it->second : 0; } // Returns the (constant) number of past buckets this anomaly tracker can store. inline int getNumOfPastBuckets() const { return mNumOfPastBuckets; } Loading @@ -112,22 +126,27 @@ protected: // for the anomaly detection (since the current bucket is not in the past). const int mNumOfPastBuckets; // The existing bucket list. // Values for each of the past mNumOfPastBuckets buckets. Always of size mNumOfPastBuckets. // mPastBuckets[i] can be null, meaning that no data is present in that bucket. std::vector<shared_ptr<DimToValMap>> mPastBuckets; // Sum over all existing buckets cached in mPastBuckets. // Cached sum over all existing buckets in mPastBuckets. // Its buckets never contain entries of 0. DimToValMap mSumOverPastBuckets; // The bucket number of the last added bucket. int64_t mMostRecentBucketNum = -1; // Map from each dimension to the timestamp that its refractory period (if this anomaly was // declared for that dimension) ends, in seconds. Only anomalies that occur after this period // ends will be declared. // declared for that dimension) ends, in seconds. From this moment and onwards, anomalies // can be declared again. // Entries may be, but are not guaranteed to be, removed after the period is finished. unordered_map<MetricDimensionKey, uint32_t> mRefractoryPeriodEndsSec; void flushPastBuckets(const int64_t& currBucketNum); // Advances mMostRecentBucketNum to bucketNum, deleting any data that is now too old. // Specifically, since it is now too old, removes the data for // [mMostRecentBucketNum - mNumOfPastBuckets + 1, bucketNum - mNumOfPastBuckets]. void advanceMostRecentBucketTo(const int64_t& bucketNum); // Add the information in the given bucket to mSumOverPastBuckets. void addBucketToSum(const shared_ptr<DimToValMap>& bucket); Loading @@ -136,15 +155,21 @@ protected: // and remove any items with value 0. void subtractBucketFromSum(const shared_ptr<DimToValMap>& bucket); // From mSumOverPastBuckets[key], subtracts bucketValue, removing it if it is now 0. void subtractValueFromSum(const MetricDimensionKey& key, const int64_t& bucketValue); // Returns true if in the refractory period, else false. // If there is a stored refractory period but it ended prior to timestampNs, it is removed. bool isInRefractoryPeriod(const uint64_t& timestampNs, const MetricDimensionKey& key); // Calculates the corresponding bucket index within the circular array. // Requires bucketNum >= 0. size_t index(int64_t bucketNum) const; // Resets all bucket data. For use when all the data gets stale. virtual void resetStorage(); // Informs the subscribers that an anomaly has occurred. // Informs the subscribers (incidentd, perfetto, broadcasts, etc) that an anomaly has occurred. void informSubscribers(const MetricDimensionKey& key); FRIEND_TEST(AnomalyTrackerTest, TestConsecutiveBuckets); Loading cmds/statsd/src/anomaly/DurationAnomalyTracker.cpp +32 −36 Original line number Diff line number Diff line Loading @@ -27,29 +27,12 @@ namespace statsd { DurationAnomalyTracker::DurationAnomalyTracker(const Alert& alert, const ConfigKey& configKey, const sp<AlarmMonitor>& alarmMonitor) : AnomalyTracker(alert, configKey), mAlarmMonitor(alarmMonitor) { VLOG("DurationAnomalyTracker() called"); } DurationAnomalyTracker::~DurationAnomalyTracker() { stopAllAlarms(); } void DurationAnomalyTracker::resetStorage() { AnomalyTracker::resetStorage(); if (!mAlarms.empty()) VLOG("AnomalyTracker.resetStorage() called but mAlarms is NOT empty!"); } void DurationAnomalyTracker::declareAnomalyIfAlarmExpired(const MetricDimensionKey& dimensionKey, const uint64_t& timestampNs) { auto itr = mAlarms.find(dimensionKey); if (itr == mAlarms.end()) { return; } if (itr->second != nullptr && static_cast<uint32_t>(timestampNs / NS_PER_SEC) >= itr->second->timestampSec) { declareAnomaly(timestampNs, dimensionKey); stopAlarm(dimensionKey); } VLOG("~DurationAnomalyTracker() called"); cancelAllAlarms(); } void DurationAnomalyTracker::startAlarm(const MetricDimensionKey& dimensionKey, Loading @@ -57,34 +40,47 @@ void DurationAnomalyTracker::startAlarm(const MetricDimensionKey& dimensionKey, // Alarms are stored in secs. Must round up, since if it fires early, it is ignored completely. uint32_t timestampSec = static_cast<uint32_t>((timestampNs -1)/ NS_PER_SEC) + 1; // round up if (isInRefractoryPeriod(timestampNs, dimensionKey)) { // TODO: Bug! By the refractory's end, the data might be erased and the alarm inapplicable. VLOG("Setting a delayed anomaly alarm lest it fall in the refractory period"); timestampSec = getRefractoryPeriodEndsSec(dimensionKey) + 1; } auto itr = mAlarms.find(dimensionKey); if (itr != mAlarms.end() && mAlarmMonitor != nullptr) { mAlarmMonitor->remove(itr->second); } sp<const InternalAlarm> alarm = new InternalAlarm{timestampSec}; mAlarms.insert({dimensionKey, alarm}); mAlarms[dimensionKey] = alarm; if (mAlarmMonitor != nullptr) { mAlarmMonitor->add(alarm); } } void DurationAnomalyTracker::stopAlarm(const MetricDimensionKey& dimensionKey) { auto itr = mAlarms.find(dimensionKey); if (itr != mAlarms.end()) { void DurationAnomalyTracker::stopAlarm(const MetricDimensionKey& dimensionKey, const uint64_t& timestampNs) { const auto itr = mAlarms.find(dimensionKey); if (itr == mAlarms.end()) { return; } // If the alarm is set in the past but hasn't fired yet (due to lag), catch it now. if (itr->second != nullptr && timestampNs >= NS_PER_SEC * itr->second->timestampSec) { declareAnomaly(timestampNs, dimensionKey); } mAlarms.erase(dimensionKey); if (mAlarmMonitor != nullptr) { mAlarmMonitor->remove(itr->second); } } } void DurationAnomalyTracker::stopAllAlarms() { std::set<MetricDimensionKey> keys; for (auto itr = mAlarms.begin(); itr != mAlarms.end(); ++itr) { keys.insert(itr->first); void DurationAnomalyTracker::cancelAllAlarms() { if (mAlarmMonitor != nullptr) { for (const auto& itr : mAlarms) { mAlarmMonitor->remove(itr.second); } for (auto key : keys) { stopAlarm(key); } mAlarms.clear(); } void DurationAnomalyTracker::informAlarmsFired(const uint64_t& timestampNs, Loading cmds/statsd/src/anomaly/DurationAnomalyTracker.h +8 −12 Original line number Diff line number Diff line Loading @@ -32,21 +32,20 @@ public: virtual ~DurationAnomalyTracker(); // Starts the alarm at the given timestamp. // Sets an alarm for the given timestamp. // Replaces previous alarm if one already exists. void startAlarm(const MetricDimensionKey& dimensionKey, const uint64_t& eventTime); // Stops the alarm. void stopAlarm(const MetricDimensionKey& dimensionKey); // If it should have already fired, but hasn't yet (e.g. because the AlarmManager is delayed), // declare the anomaly now. void stopAlarm(const MetricDimensionKey& dimensionKey, const uint64_t& timestampNs); // Stop all the alarms owned by this tracker. void stopAllAlarms(); // Declares the anomaly when the alarm expired given the current timestamp. void declareAnomalyIfAlarmExpired(const MetricDimensionKey& dimensionKey, const uint64_t& timestampNs); // Stop all the alarms owned by this tracker. Does not declare any anomalies. void cancelAllAlarms(); // Declares an anomaly for each alarm in firedAlarms that belongs to this DurationAnomalyTracker // and removes it from firedAlarms. // and removes it from firedAlarms. The AlarmMonitor is not informed. // Note that this will generally be called from a different thread from the other functions; // the caller is responsible for thread safety. void informAlarmsFired(const uint64_t& timestampNs, Loading @@ -60,9 +59,6 @@ protected: // Anomaly alarm monitor. sp<AlarmMonitor> mAlarmMonitor; // Resets all bucket data. For use when all the data gets stale. void resetStorage() override; FRIEND_TEST(OringDurationTrackerTest, TestPredictAnomalyTimestamp); FRIEND_TEST(OringDurationTrackerTest, TestAnomalyDetectionExpiredAlarm); FRIEND_TEST(OringDurationTrackerTest, TestAnomalyDetectionFiredAlarm); Loading cmds/statsd/src/atoms.proto +0 −1 Original line number Diff line number Diff line Loading @@ -115,7 +115,6 @@ message Atom { HardwareFailed hardware_failed = 72; PhysicalDropDetected physical_drop_detected = 73; ChargeCyclesReported charge_cycles_reported = 74; // TODO: Reorder the numbering so that the most frequent occur events occur in the first 15. } // Pulled events will start at field 10000. Loading Loading
cmds/statsd/src/anomaly/AnomalyTracker.cpp +84 −67 Original line number Diff line number Diff line Loading @@ -31,7 +31,6 @@ namespace android { namespace os { namespace statsd { // TODO: Get rid of bucketNumbers, and return to the original circular array method. AnomalyTracker::AnomalyTracker(const Alert& alert, const ConfigKey& configKey) : mAlert(alert), mConfigKey(configKey), mNumOfPastBuckets(mAlert.num_buckets() - 1) { VLOG("AnomalyTracker() called"); Loading Loading @@ -60,87 +59,104 @@ void AnomalyTracker::resetStorage() { size_t AnomalyTracker::index(int64_t bucketNum) const { if (bucketNum < 0) { // To support this use-case, we can easily modify index to wrap around. But currently // AnomalyTracker should never need this, so if it happens, it's a bug we should log. // TODO: Audit this. ALOGE("index() was passed a negative bucket number (%lld)!", (long long)bucketNum); } return bucketNum % mNumOfPastBuckets; } void AnomalyTracker::flushPastBuckets(const int64_t& latestPastBucketNum) { VLOG("addPastBucket() called."); if (latestPastBucketNum <= mMostRecentBucketNum - mNumOfPastBuckets) { ALOGE("Cannot add a past bucket %lld units in past", (long long)latestPastBucketNum); void AnomalyTracker::advanceMostRecentBucketTo(const int64_t& bucketNum) { VLOG("advanceMostRecentBucketTo() called."); if (bucketNum <= mMostRecentBucketNum) { ALOGW("Cannot advance buckets backwards (bucketNum=%lld but mMostRecentBucketNum=%lld)", (long long)bucketNum, (long long)mMostRecentBucketNum); return; } // The past packets are ancient. Empty out old mPastBuckets[i] values and reset // mSumOverPastBuckets. if (latestPastBucketNum - mMostRecentBucketNum >= mNumOfPastBuckets) { // If in the future (i.e. buckets are ancient), just empty out all past info. if (bucketNum >= mMostRecentBucketNum + mNumOfPastBuckets) { resetStorage(); } else { for (int64_t i = std::max(0LL, (long long)(mMostRecentBucketNum - mNumOfPastBuckets + 1)); i <= latestPastBucketNum - mNumOfPastBuckets; i++) { mMostRecentBucketNum = bucketNum; return; } // Clear out space by emptying out old mPastBuckets[i] values and update mSumOverPastBuckets. for (int64_t i = mMostRecentBucketNum + 1; i <= bucketNum; i++) { const int idx = index(i); subtractBucketFromSum(mPastBuckets[idx]); mPastBuckets[idx] = nullptr; // release (but not clear) the old bucket. } mMostRecentBucketNum = bucketNum; } // It is an update operation. if (latestPastBucketNum <= mMostRecentBucketNum && latestPastBucketNum > mMostRecentBucketNum - mNumOfPastBuckets) { subtractBucketFromSum(mPastBuckets[index(latestPastBucketNum)]); } } void AnomalyTracker::addPastBucket(const MetricDimensionKey& key, const int64_t& bucketValue, void AnomalyTracker::addPastBucket(const MetricDimensionKey& key, const int64_t& bucketValue, const int64_t& bucketNum) { if (mNumOfPastBuckets == 0) { VLOG("addPastBucket(bucketValue) called."); if (mNumOfPastBuckets == 0 || bucketNum < 0 || bucketNum <= mMostRecentBucketNum - mNumOfPastBuckets) { return; } flushPastBuckets(bucketNum); auto& bucket = mPastBuckets[index(bucketNum)]; if (bucket == nullptr) { bucket = std::make_shared<DimToValMap>(); const int bucketIndex = index(bucketNum); if (bucketNum <= mMostRecentBucketNum && (mPastBuckets[bucketIndex] != nullptr)) { // We need to insert into an already existing past bucket. std::shared_ptr<DimToValMap>& bucket = mPastBuckets[bucketIndex]; auto itr = bucket->find(key); if (itr != bucket->end()) { // Old entry already exists; update it. subtractValueFromSum(key, itr->second); itr->second = bucketValue; } else { bucket->insert({key, bucketValue}); } mSumOverPastBuckets[key] += bucketValue; } else { // Bucket does not exist yet (in future or was never made), so we must make it. std::shared_ptr<DimToValMap> bucket = std::make_shared<DimToValMap>(); bucket->insert({key, bucketValue}); addBucketToSum(bucket); mMostRecentBucketNum = std::max(mMostRecentBucketNum, bucketNum); addPastBucket(bucket, bucketNum); } } void AnomalyTracker::addPastBucket(std::shared_ptr<DimToValMap> bucketValues, void AnomalyTracker::addPastBucket(std::shared_ptr<DimToValMap> bucket, const int64_t& bucketNum) { VLOG("addPastBucket() called."); if (mNumOfPastBuckets == 0) { VLOG("addPastBucket(bucket) called."); if (mNumOfPastBuckets == 0 || bucketNum < 0 || bucketNum <= mMostRecentBucketNum - mNumOfPastBuckets) { return; } flushPastBuckets(bucketNum); // Replace the oldest bucket with the new bucket we are adding. mPastBuckets[index(bucketNum)] = bucketValues; addBucketToSum(bucketValues); mMostRecentBucketNum = std::max(mMostRecentBucketNum, bucketNum); if (bucketNum <= mMostRecentBucketNum) { // We are updating an old bucket, not adding a new one. subtractBucketFromSum(mPastBuckets[index(bucketNum)]); } else { // Clear space for the new bucket to be at bucketNum. advanceMostRecentBucketTo(bucketNum); } mPastBuckets[index(bucketNum)] = bucket; addBucketToSum(bucket); } void AnomalyTracker::subtractBucketFromSum(const shared_ptr<DimToValMap>& bucket) { if (bucket == nullptr) { return; } // For each dimension present in the bucket, subtract its value from its corresponding sum. for (const auto& keyValuePair : *bucket) { auto itr = mSumOverPastBuckets.find(keyValuePair.first); subtractValueFromSum(keyValuePair.first, keyValuePair.second); } } void AnomalyTracker::subtractValueFromSum(const MetricDimensionKey& key, const int64_t& bucketValue) { auto itr = mSumOverPastBuckets.find(key); if (itr == mSumOverPastBuckets.end()) { continue; return; } itr->second -= keyValuePair.second; // TODO: No need to look up the object twice like this. Use a var. itr->second -= bucketValue; if (itr->second == 0) { mSumOverPastBuckets.erase(itr); } } } void AnomalyTracker::addBucketToSum(const shared_ptr<DimToValMap>& bucket) { if (bucket == nullptr) { Loading @@ -154,7 +170,8 @@ void AnomalyTracker::addBucketToSum(const shared_ptr<DimToValMap>& bucket) { int64_t AnomalyTracker::getPastBucketValue(const MetricDimensionKey& key, const int64_t& bucketNum) const { if (mNumOfPastBuckets == 0 || bucketNum < 0) { if (bucketNum < 0 || bucketNum <= mMostRecentBucketNum - mNumOfPastBuckets || bucketNum > mMostRecentBucketNum) { return 0; } Loading @@ -174,11 +191,13 @@ int64_t AnomalyTracker::getSumOverPastBuckets(const MetricDimensionKey& key) con return 0; } bool AnomalyTracker::detectAnomaly(const int64_t& currentBucketNum, const MetricDimensionKey& key, bool AnomalyTracker::detectAnomaly(const int64_t& currentBucketNum, const MetricDimensionKey& key, const int64_t& currentBucketValue) { // currentBucketNum should be the next bucket after pastBuckets. If not, advance so that it is. if (currentBucketNum > mMostRecentBucketNum + 1) { // TODO: This creates a needless 0 entry in mSumOverPastBuckets. Fix this. addPastBucket(key, 0, currentBucketNum - 1); advanceMostRecentBucketTo(currentBucketNum - 1); } return mAlert.has_trigger_if_sum_gt() && getSumOverPastBuckets(key) + currentBucketValue > mAlert.trigger_if_sum_gt(); Loading @@ -190,19 +209,17 @@ void AnomalyTracker::declareAnomaly(const uint64_t& timestampNs, const MetricDim VLOG("Skipping anomaly declaration since within refractory period"); return; } mRefractoryPeriodEndsSec[key] = (timestampNs / NS_PER_SEC) + mAlert.refractory_period_secs(); if (mAlert.has_refractory_period_secs()) { mRefractoryPeriodEndsSec[key] = ((timestampNs + NS_PER_SEC - 1) / NS_PER_SEC) // round up + mAlert.refractory_period_secs(); // TODO: If we had access to the bucket_size_millis, consider calling resetStorage() // if (mAlert.refractory_period_secs() > mNumOfPastBuckets * bucketSizeNs) {resetStorage();} } if (!mSubscriptions.empty()) { if (mAlert.has_id()) { ALOGI("An anomaly (%lld) %s has occurred! Informing subscribers.", mAlert.id(), key.toString().c_str()); ALOGI("An anomaly (%lld) %s has occurred! Informing subscribers.", mAlert.id(), key.toString().c_str()); informSubscribers(key); } else { ALOGI("An anomaly (with no id) has occurred! Not informing any subscribers."); } } else { ALOGI("An anomaly has occurred! (But no subscriber for that alert.)"); } Loading @@ -227,7 +244,7 @@ bool AnomalyTracker::isInRefractoryPeriod(const uint64_t& timestampNs, const MetricDimensionKey& key) { const auto& it = mRefractoryPeriodEndsSec.find(key); if (it != mRefractoryPeriodEndsSec.end()) { if ((timestampNs / NS_PER_SEC) <= it->second) { if (timestampNs < it->second * NS_PER_SEC) { return true; } else { mRefractoryPeriodEndsSec.erase(key); Loading
cmds/statsd/src/anomaly/AnomalyTracker.h +40 −15 Original line number Diff line number Diff line Loading @@ -47,20 +47,32 @@ public: mSubscriptions.push_back(subscription); } // Adds a bucket. // Bucket index starts from 0. void addPastBucket(std::shared_ptr<DimToValMap> bucketValues, const int64_t& bucketNum); // Adds a bucket for the given bucketNum (index starting at 0). // If a bucket for bucketNum already exists, it will be replaced. // Also, advances to bucketNum (if not in the past), effectively filling any intervening // buckets with 0s. void addPastBucket(std::shared_ptr<DimToValMap> bucket, const int64_t& bucketNum); // Inserts (or replaces) the bucket entry for the given bucketNum at the given key to be the // given bucketValue. If the bucket does not exist, it will be created. // Also, advances to bucketNum (if not in the past), effectively filling any intervening // buckets with 0s. void addPastBucket(const MetricDimensionKey& key, const int64_t& bucketValue, const int64_t& bucketNum); // Returns true if detected anomaly for the existing buckets on one or more dimension keys. // Returns true if, based on past buckets plus the new currentBucketValue (which generally // represents the partially-filled current bucket), an anomaly has happened. // Also advances to currBucketNum-1. bool detectAnomaly(const int64_t& currBucketNum, const MetricDimensionKey& key, const int64_t& currentBucketValue); // Informs incidentd about the detected alert. void declareAnomaly(const uint64_t& timestampNs, const MetricDimensionKey& key); // Detects the alert and informs the incidentd when applicable. // Detects if, based on past buckets plus the new currentBucketValue (which generally // represents the partially-filled current bucket), an anomaly has happened, and if so, // declares an anomaly and informs relevant subscribers. // Also advances to currBucketNum-1. void detectAndDeclareAnomaly(const uint64_t& timestampNs, const int64_t& currBucketNum, const MetricDimensionKey& key, const int64_t& currentBucketValue); Loading @@ -69,24 +81,26 @@ public: return; // Base AnomalyTracker class has no need for the AlarmMonitor. } // Helper function to return the sum value of past buckets at given dimension. // Returns the sum of all past bucket values for the given dimension key. int64_t getSumOverPastBuckets(const MetricDimensionKey& key) const; // Helper function to return the value for a past bucket. // Returns the value for a past bucket, or 0 if that bucket doesn't exist. int64_t getPastBucketValue(const MetricDimensionKey& key, const int64_t& bucketNum) const; // Returns the anomaly threshold. // Returns the anomaly threshold set in the configuration. inline int64_t getAnomalyThreshold() const { return mAlert.trigger_if_sum_gt(); } // Returns the refractory period timestamp (in seconds) for the given key. // Returns the refractory period ending timestamp (in seconds) for the given key. // Before this moment, any detected anomaly will be ignored. // If there is no stored refractory period ending timestamp, returns 0. uint32_t getRefractoryPeriodEndsSec(const MetricDimensionKey& key) const { const auto& it = mRefractoryPeriodEndsSec.find(key); return it != mRefractoryPeriodEndsSec.end() ? it->second : 0; } // Returns the (constant) number of past buckets this anomaly tracker can store. inline int getNumOfPastBuckets() const { return mNumOfPastBuckets; } Loading @@ -112,22 +126,27 @@ protected: // for the anomaly detection (since the current bucket is not in the past). const int mNumOfPastBuckets; // The existing bucket list. // Values for each of the past mNumOfPastBuckets buckets. Always of size mNumOfPastBuckets. // mPastBuckets[i] can be null, meaning that no data is present in that bucket. std::vector<shared_ptr<DimToValMap>> mPastBuckets; // Sum over all existing buckets cached in mPastBuckets. // Cached sum over all existing buckets in mPastBuckets. // Its buckets never contain entries of 0. DimToValMap mSumOverPastBuckets; // The bucket number of the last added bucket. int64_t mMostRecentBucketNum = -1; // Map from each dimension to the timestamp that its refractory period (if this anomaly was // declared for that dimension) ends, in seconds. Only anomalies that occur after this period // ends will be declared. // declared for that dimension) ends, in seconds. From this moment and onwards, anomalies // can be declared again. // Entries may be, but are not guaranteed to be, removed after the period is finished. unordered_map<MetricDimensionKey, uint32_t> mRefractoryPeriodEndsSec; void flushPastBuckets(const int64_t& currBucketNum); // Advances mMostRecentBucketNum to bucketNum, deleting any data that is now too old. // Specifically, since it is now too old, removes the data for // [mMostRecentBucketNum - mNumOfPastBuckets + 1, bucketNum - mNumOfPastBuckets]. void advanceMostRecentBucketTo(const int64_t& bucketNum); // Add the information in the given bucket to mSumOverPastBuckets. void addBucketToSum(const shared_ptr<DimToValMap>& bucket); Loading @@ -136,15 +155,21 @@ protected: // and remove any items with value 0. void subtractBucketFromSum(const shared_ptr<DimToValMap>& bucket); // From mSumOverPastBuckets[key], subtracts bucketValue, removing it if it is now 0. void subtractValueFromSum(const MetricDimensionKey& key, const int64_t& bucketValue); // Returns true if in the refractory period, else false. // If there is a stored refractory period but it ended prior to timestampNs, it is removed. bool isInRefractoryPeriod(const uint64_t& timestampNs, const MetricDimensionKey& key); // Calculates the corresponding bucket index within the circular array. // Requires bucketNum >= 0. size_t index(int64_t bucketNum) const; // Resets all bucket data. For use when all the data gets stale. virtual void resetStorage(); // Informs the subscribers that an anomaly has occurred. // Informs the subscribers (incidentd, perfetto, broadcasts, etc) that an anomaly has occurred. void informSubscribers(const MetricDimensionKey& key); FRIEND_TEST(AnomalyTrackerTest, TestConsecutiveBuckets); Loading
cmds/statsd/src/anomaly/DurationAnomalyTracker.cpp +32 −36 Original line number Diff line number Diff line Loading @@ -27,29 +27,12 @@ namespace statsd { DurationAnomalyTracker::DurationAnomalyTracker(const Alert& alert, const ConfigKey& configKey, const sp<AlarmMonitor>& alarmMonitor) : AnomalyTracker(alert, configKey), mAlarmMonitor(alarmMonitor) { VLOG("DurationAnomalyTracker() called"); } DurationAnomalyTracker::~DurationAnomalyTracker() { stopAllAlarms(); } void DurationAnomalyTracker::resetStorage() { AnomalyTracker::resetStorage(); if (!mAlarms.empty()) VLOG("AnomalyTracker.resetStorage() called but mAlarms is NOT empty!"); } void DurationAnomalyTracker::declareAnomalyIfAlarmExpired(const MetricDimensionKey& dimensionKey, const uint64_t& timestampNs) { auto itr = mAlarms.find(dimensionKey); if (itr == mAlarms.end()) { return; } if (itr->second != nullptr && static_cast<uint32_t>(timestampNs / NS_PER_SEC) >= itr->second->timestampSec) { declareAnomaly(timestampNs, dimensionKey); stopAlarm(dimensionKey); } VLOG("~DurationAnomalyTracker() called"); cancelAllAlarms(); } void DurationAnomalyTracker::startAlarm(const MetricDimensionKey& dimensionKey, Loading @@ -57,34 +40,47 @@ void DurationAnomalyTracker::startAlarm(const MetricDimensionKey& dimensionKey, // Alarms are stored in secs. Must round up, since if it fires early, it is ignored completely. uint32_t timestampSec = static_cast<uint32_t>((timestampNs -1)/ NS_PER_SEC) + 1; // round up if (isInRefractoryPeriod(timestampNs, dimensionKey)) { // TODO: Bug! By the refractory's end, the data might be erased and the alarm inapplicable. VLOG("Setting a delayed anomaly alarm lest it fall in the refractory period"); timestampSec = getRefractoryPeriodEndsSec(dimensionKey) + 1; } auto itr = mAlarms.find(dimensionKey); if (itr != mAlarms.end() && mAlarmMonitor != nullptr) { mAlarmMonitor->remove(itr->second); } sp<const InternalAlarm> alarm = new InternalAlarm{timestampSec}; mAlarms.insert({dimensionKey, alarm}); mAlarms[dimensionKey] = alarm; if (mAlarmMonitor != nullptr) { mAlarmMonitor->add(alarm); } } void DurationAnomalyTracker::stopAlarm(const MetricDimensionKey& dimensionKey) { auto itr = mAlarms.find(dimensionKey); if (itr != mAlarms.end()) { void DurationAnomalyTracker::stopAlarm(const MetricDimensionKey& dimensionKey, const uint64_t& timestampNs) { const auto itr = mAlarms.find(dimensionKey); if (itr == mAlarms.end()) { return; } // If the alarm is set in the past but hasn't fired yet (due to lag), catch it now. if (itr->second != nullptr && timestampNs >= NS_PER_SEC * itr->second->timestampSec) { declareAnomaly(timestampNs, dimensionKey); } mAlarms.erase(dimensionKey); if (mAlarmMonitor != nullptr) { mAlarmMonitor->remove(itr->second); } } } void DurationAnomalyTracker::stopAllAlarms() { std::set<MetricDimensionKey> keys; for (auto itr = mAlarms.begin(); itr != mAlarms.end(); ++itr) { keys.insert(itr->first); void DurationAnomalyTracker::cancelAllAlarms() { if (mAlarmMonitor != nullptr) { for (const auto& itr : mAlarms) { mAlarmMonitor->remove(itr.second); } for (auto key : keys) { stopAlarm(key); } mAlarms.clear(); } void DurationAnomalyTracker::informAlarmsFired(const uint64_t& timestampNs, Loading
cmds/statsd/src/anomaly/DurationAnomalyTracker.h +8 −12 Original line number Diff line number Diff line Loading @@ -32,21 +32,20 @@ public: virtual ~DurationAnomalyTracker(); // Starts the alarm at the given timestamp. // Sets an alarm for the given timestamp. // Replaces previous alarm if one already exists. void startAlarm(const MetricDimensionKey& dimensionKey, const uint64_t& eventTime); // Stops the alarm. void stopAlarm(const MetricDimensionKey& dimensionKey); // If it should have already fired, but hasn't yet (e.g. because the AlarmManager is delayed), // declare the anomaly now. void stopAlarm(const MetricDimensionKey& dimensionKey, const uint64_t& timestampNs); // Stop all the alarms owned by this tracker. void stopAllAlarms(); // Declares the anomaly when the alarm expired given the current timestamp. void declareAnomalyIfAlarmExpired(const MetricDimensionKey& dimensionKey, const uint64_t& timestampNs); // Stop all the alarms owned by this tracker. Does not declare any anomalies. void cancelAllAlarms(); // Declares an anomaly for each alarm in firedAlarms that belongs to this DurationAnomalyTracker // and removes it from firedAlarms. // and removes it from firedAlarms. The AlarmMonitor is not informed. // Note that this will generally be called from a different thread from the other functions; // the caller is responsible for thread safety. void informAlarmsFired(const uint64_t& timestampNs, Loading @@ -60,9 +59,6 @@ protected: // Anomaly alarm monitor. sp<AlarmMonitor> mAlarmMonitor; // Resets all bucket data. For use when all the data gets stale. void resetStorage() override; FRIEND_TEST(OringDurationTrackerTest, TestPredictAnomalyTimestamp); FRIEND_TEST(OringDurationTrackerTest, TestAnomalyDetectionExpiredAlarm); FRIEND_TEST(OringDurationTrackerTest, TestAnomalyDetectionFiredAlarm); Loading
cmds/statsd/src/atoms.proto +0 −1 Original line number Diff line number Diff line Loading @@ -115,7 +115,6 @@ message Atom { HardwareFailed hardware_failed = 72; PhysicalDropDetected physical_drop_detected = 73; ChargeCyclesReported charge_cycles_reported = 74; // TODO: Reorder the numbering so that the most frequent occur events occur in the first 15. } // Pulled events will start at field 10000. Loading