Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6bf9825b authored by Bookatz's avatar Bookatz
Browse files

Statsd AnomalyDetection improvements

Various fixes and improvements to statsd's anomaly detection.

Bug: 74607818
Test: make statsd_test && adb sync data && adb shell data/nativetest64/statsd_test/statsd_test

Change-Id: Ia67a8eb6da0ea9293f698949e1565f7f024a7cb9
parent d357342a
Loading
Loading
Loading
Loading
+84 −67
Original line number Diff line number Diff line
@@ -31,7 +31,6 @@ namespace android {
namespace os {
namespace statsd {

// TODO: Get rid of bucketNumbers, and return to the original circular array method.
AnomalyTracker::AnomalyTracker(const Alert& alert, const ConfigKey& configKey)
        : mAlert(alert), mConfigKey(configKey), mNumOfPastBuckets(mAlert.num_buckets() - 1) {
    VLOG("AnomalyTracker() called");
@@ -60,87 +59,104 @@ void AnomalyTracker::resetStorage() {

size_t AnomalyTracker::index(int64_t bucketNum) const {
    if (bucketNum < 0) {
        // To support this use-case, we can easily modify index to wrap around. But currently
        // AnomalyTracker should never need this, so if it happens, it's a bug we should log.
        // TODO: Audit this.
        ALOGE("index() was passed a negative bucket number (%lld)!", (long long)bucketNum);
    }
    return bucketNum % mNumOfPastBuckets;
}

void AnomalyTracker::flushPastBuckets(const int64_t& latestPastBucketNum) {
    VLOG("addPastBucket() called.");
    if (latestPastBucketNum <= mMostRecentBucketNum - mNumOfPastBuckets) {
        ALOGE("Cannot add a past bucket %lld units in past", (long long)latestPastBucketNum);
void AnomalyTracker::advanceMostRecentBucketTo(const int64_t& bucketNum) {
    VLOG("advanceMostRecentBucketTo() called.");
    if (bucketNum <= mMostRecentBucketNum) {
        ALOGW("Cannot advance buckets backwards (bucketNum=%lld but mMostRecentBucketNum=%lld)",
              (long long)bucketNum, (long long)mMostRecentBucketNum);
        return;
    }

    // The past packets are ancient. Empty out old mPastBuckets[i] values and reset
    // mSumOverPastBuckets.
    if (latestPastBucketNum - mMostRecentBucketNum >= mNumOfPastBuckets) {
    // If in the future (i.e. buckets are ancient), just empty out all past info.
    if (bucketNum >= mMostRecentBucketNum + mNumOfPastBuckets) {
        resetStorage();
    } else {
        for (int64_t i = std::max(0LL, (long long)(mMostRecentBucketNum - mNumOfPastBuckets + 1));
             i <= latestPastBucketNum - mNumOfPastBuckets; i++) {
        mMostRecentBucketNum = bucketNum;
        return;
    }

    // Clear out space by emptying out old mPastBuckets[i] values and update mSumOverPastBuckets.
    for (int64_t i = mMostRecentBucketNum + 1; i <= bucketNum; i++) {
        const int idx = index(i);
        subtractBucketFromSum(mPastBuckets[idx]);
        mPastBuckets[idx] = nullptr;  // release (but not clear) the old bucket.
    }
    mMostRecentBucketNum = bucketNum;
}

    // It is an update operation.
    if (latestPastBucketNum <= mMostRecentBucketNum &&
        latestPastBucketNum > mMostRecentBucketNum - mNumOfPastBuckets) {
        subtractBucketFromSum(mPastBuckets[index(latestPastBucketNum)]);
    }
}

void AnomalyTracker::addPastBucket(const MetricDimensionKey& key, const int64_t& bucketValue,
void AnomalyTracker::addPastBucket(const MetricDimensionKey& key,
                                   const int64_t& bucketValue,
                                   const int64_t& bucketNum) {
    if (mNumOfPastBuckets == 0) {
    VLOG("addPastBucket(bucketValue) called.");
    if (mNumOfPastBuckets == 0 ||
        bucketNum < 0 || bucketNum <= mMostRecentBucketNum - mNumOfPastBuckets) {
        return;
    }
    flushPastBuckets(bucketNum);

    auto& bucket = mPastBuckets[index(bucketNum)];
    if (bucket == nullptr) {
        bucket = std::make_shared<DimToValMap>();
    const int bucketIndex = index(bucketNum);
    if (bucketNum <= mMostRecentBucketNum && (mPastBuckets[bucketIndex] != nullptr)) {
        // We need to insert into an already existing past bucket.
        std::shared_ptr<DimToValMap>& bucket = mPastBuckets[bucketIndex];
        auto itr = bucket->find(key);
        if (itr != bucket->end()) {
            // Old entry already exists; update it.
            subtractValueFromSum(key, itr->second);
            itr->second = bucketValue;
        } else {
            bucket->insert({key, bucketValue});
        }
        mSumOverPastBuckets[key] += bucketValue;
    } else {
        // Bucket does not exist yet (in future or was never made), so we must make it.
        std::shared_ptr<DimToValMap> bucket = std::make_shared<DimToValMap>();
        bucket->insert({key, bucketValue});
    addBucketToSum(bucket);
    mMostRecentBucketNum = std::max(mMostRecentBucketNum, bucketNum);
        addPastBucket(bucket, bucketNum);
    }
}

void AnomalyTracker::addPastBucket(std::shared_ptr<DimToValMap> bucketValues,
void AnomalyTracker::addPastBucket(std::shared_ptr<DimToValMap> bucket,
                                   const int64_t& bucketNum) {
    VLOG("addPastBucket() called.");
    if (mNumOfPastBuckets == 0) {
    VLOG("addPastBucket(bucket) called.");
    if (mNumOfPastBuckets == 0 ||
            bucketNum < 0 || bucketNum <= mMostRecentBucketNum - mNumOfPastBuckets) {
        return;
    }
    flushPastBuckets(bucketNum);
    // Replace the oldest bucket with the new bucket we are adding.
    mPastBuckets[index(bucketNum)] = bucketValues;
    addBucketToSum(bucketValues);
    mMostRecentBucketNum = std::max(mMostRecentBucketNum, bucketNum);

    if (bucketNum <= mMostRecentBucketNum) {
        // We are updating an old bucket, not adding a new one.
        subtractBucketFromSum(mPastBuckets[index(bucketNum)]);
    } else {
        // Clear space for the new bucket to be at bucketNum.
        advanceMostRecentBucketTo(bucketNum);
    }
    mPastBuckets[index(bucketNum)] = bucket;
    addBucketToSum(bucket);
}

void AnomalyTracker::subtractBucketFromSum(const shared_ptr<DimToValMap>& bucket) {
    if (bucket == nullptr) {
        return;
    }
    // For each dimension present in the bucket, subtract its value from its corresponding sum.
    for (const auto& keyValuePair : *bucket) {
        auto itr = mSumOverPastBuckets.find(keyValuePair.first);
        subtractValueFromSum(keyValuePair.first, keyValuePair.second);
    }
}


void AnomalyTracker::subtractValueFromSum(const MetricDimensionKey& key,
                                          const int64_t& bucketValue) {
    auto itr = mSumOverPastBuckets.find(key);
    if (itr == mSumOverPastBuckets.end()) {
            continue;
        return;
    }
        itr->second -= keyValuePair.second;
        // TODO: No need to look up the object twice like this. Use a var.
    itr->second -= bucketValue;
    if (itr->second == 0) {
        mSumOverPastBuckets.erase(itr);
    }
}
}

void AnomalyTracker::addBucketToSum(const shared_ptr<DimToValMap>& bucket) {
    if (bucket == nullptr) {
@@ -154,7 +170,8 @@ void AnomalyTracker::addBucketToSum(const shared_ptr<DimToValMap>& bucket) {

int64_t AnomalyTracker::getPastBucketValue(const MetricDimensionKey& key,
                                           const int64_t& bucketNum) const {
    if (mNumOfPastBuckets == 0 || bucketNum < 0) {
    if (bucketNum < 0 || bucketNum <= mMostRecentBucketNum - mNumOfPastBuckets
            || bucketNum > mMostRecentBucketNum) {
        return 0;
    }

@@ -174,11 +191,13 @@ int64_t AnomalyTracker::getSumOverPastBuckets(const MetricDimensionKey& key) con
    return 0;
}

bool AnomalyTracker::detectAnomaly(const int64_t& currentBucketNum, const MetricDimensionKey& key,
bool AnomalyTracker::detectAnomaly(const int64_t& currentBucketNum,
                                   const MetricDimensionKey& key,
                                   const int64_t& currentBucketValue) {

    // currentBucketNum should be the next bucket after pastBuckets. If not, advance so that it is.
    if (currentBucketNum > mMostRecentBucketNum + 1) {
        // TODO: This creates a needless 0 entry in mSumOverPastBuckets. Fix this.
        addPastBucket(key, 0, currentBucketNum - 1);
        advanceMostRecentBucketTo(currentBucketNum - 1);
    }
    return mAlert.has_trigger_if_sum_gt() &&
           getSumOverPastBuckets(key) + currentBucketValue > mAlert.trigger_if_sum_gt();
@@ -190,19 +209,17 @@ void AnomalyTracker::declareAnomaly(const uint64_t& timestampNs, const MetricDim
        VLOG("Skipping anomaly declaration since within refractory period");
        return;
    }
    mRefractoryPeriodEndsSec[key] = (timestampNs / NS_PER_SEC) + mAlert.refractory_period_secs();

    if (mAlert.has_refractory_period_secs()) {
        mRefractoryPeriodEndsSec[key] = ((timestampNs + NS_PER_SEC - 1) / NS_PER_SEC) // round up
                                        + mAlert.refractory_period_secs();
        // TODO: If we had access to the bucket_size_millis, consider calling resetStorage()
        // if (mAlert.refractory_period_secs() > mNumOfPastBuckets * bucketSizeNs) {resetStorage();}
    }

    if (!mSubscriptions.empty()) {
        if (mAlert.has_id()) {
            ALOGI("An anomaly (%lld) %s has occurred! Informing subscribers.", mAlert.id(),
                  key.toString().c_str());
        ALOGI("An anomaly (%lld) %s has occurred! Informing subscribers.",
                mAlert.id(), key.toString().c_str());
        informSubscribers(key);
        } else {
            ALOGI("An anomaly (with no id) has occurred! Not informing any subscribers.");
        }
    } else {
        ALOGI("An anomaly has occurred! (But no subscriber for that alert.)");
    }
@@ -227,7 +244,7 @@ bool AnomalyTracker::isInRefractoryPeriod(const uint64_t& timestampNs,
                                          const MetricDimensionKey& key) {
    const auto& it = mRefractoryPeriodEndsSec.find(key);
    if (it != mRefractoryPeriodEndsSec.end()) {
        if ((timestampNs / NS_PER_SEC) <= it->second) {
        if (timestampNs < it->second * NS_PER_SEC) {
            return true;
        } else {
            mRefractoryPeriodEndsSec.erase(key);
+40 −15
Original line number Diff line number Diff line
@@ -47,20 +47,32 @@ public:
        mSubscriptions.push_back(subscription);
    }

    // Adds a bucket.
    // Bucket index starts from 0.
    void addPastBucket(std::shared_ptr<DimToValMap> bucketValues, const int64_t& bucketNum);
    // Adds a bucket for the given bucketNum (index starting at 0).
    // If a bucket for bucketNum already exists, it will be replaced.
    // Also, advances to bucketNum (if not in the past), effectively filling any intervening
    // buckets with 0s.
    void addPastBucket(std::shared_ptr<DimToValMap> bucket, const int64_t& bucketNum);

    // Inserts (or replaces) the bucket entry for the given bucketNum at the given key to be the
    // given bucketValue. If the bucket does not exist, it will be created.
    // Also, advances to bucketNum (if not in the past), effectively filling any intervening
    // buckets with 0s.
    void addPastBucket(const MetricDimensionKey& key, const int64_t& bucketValue,
                       const int64_t& bucketNum);

    // Returns true if detected anomaly for the existing buckets on one or more dimension keys.
    // Returns true if, based on past buckets plus the new currentBucketValue (which generally
    // represents the partially-filled current bucket), an anomaly has happened.
    // Also advances to currBucketNum -1.
    bool detectAnomaly(const int64_t& currBucketNum, const MetricDimensionKey& key,
                       const int64_t& currentBucketValue);

    // Informs incidentd about the detected alert.
    void declareAnomaly(const uint64_t& timestampNs, const MetricDimensionKey& key);

    // Detects the alert and informs the incidentd when applicable.
    // Detects if, based on past buckets plus the new currentBucketValue (which generally
    // represents the partially-filled current bucket), an anomaly has happened, and if so,
    // declares an anomaly and informs relevant subscribers.
    // Also advances to currBucketNum -1.
    void detectAndDeclareAnomaly(const uint64_t& timestampNs, const int64_t& currBucketNum,
                                 const MetricDimensionKey& key, const int64_t& currentBucketValue);

@@ -69,24 +81,26 @@ public:
        return; // Base AnomalyTracker class has no need for the AlarmMonitor.
    }

    // Helper function to return the sum value of past buckets at given dimension.
    // Returns the sum of all past bucket values for the given dimension key.
    int64_t getSumOverPastBuckets(const MetricDimensionKey& key) const;

    // Helper function to return the value for a past bucket.
    // Returns the value for a past bucket, or 0 if that bucket doesn't exist.
    int64_t getPastBucketValue(const MetricDimensionKey& key, const int64_t& bucketNum) const;

    // Returns the anomaly threshold.
    // Returns the anomaly threshold set in the configuration.
    inline int64_t getAnomalyThreshold() const {
        return mAlert.trigger_if_sum_gt();
    }

    // Returns the refractory period timestamp (in seconds) for the given key.
    // Returns the refractory period ending timestamp (in seconds) for the given key.
    // Before this moment, any detected anomaly will be ignored.
    // If there is no stored refractory period ending timestamp, returns 0.
    uint32_t getRefractoryPeriodEndsSec(const MetricDimensionKey& key) const {
        const auto& it = mRefractoryPeriodEndsSec.find(key);
        return it != mRefractoryPeriodEndsSec.end() ? it->second : 0;
    }

    // Returns the (constant) number of past buckets this anomaly tracker can store.
    inline int getNumOfPastBuckets() const {
        return mNumOfPastBuckets;
    }
@@ -112,22 +126,27 @@ protected:
    // for the anomaly detection (since the current bucket is not in the past).
    const int mNumOfPastBuckets;

    // The existing bucket list.
    // Values for each of the past mNumOfPastBuckets buckets. Always of size mNumOfPastBuckets.
    // mPastBuckets[i] can be null, meaning that no data is present in that bucket.
    std::vector<shared_ptr<DimToValMap>> mPastBuckets;

    // Sum over all existing buckets cached in mPastBuckets.
    // Cached sum over all existing buckets in mPastBuckets.
    // Its buckets never contain entries of 0.
    DimToValMap mSumOverPastBuckets;

    // The bucket number of the last added bucket.
    int64_t mMostRecentBucketNum = -1;

    // Map from each dimension to the timestamp that its refractory period (if this anomaly was
    // declared for that dimension) ends, in seconds. Only anomalies that occur after this period
    // ends will be declared.
    // declared for that dimension) ends, in seconds. From this moment and onwards, anomalies
    // can be declared again.
    // Entries may be, but are not guaranteed to be, removed after the period is finished.
    unordered_map<MetricDimensionKey, uint32_t> mRefractoryPeriodEndsSec;

    void flushPastBuckets(const int64_t& currBucketNum);
    // Advances mMostRecentBucketNum to bucketNum, deleting any data that is now too old.
    // Specifically, since it is now too old, removes the data for
    //   [mMostRecentBucketNum - mNumOfPastBuckets + 1, bucketNum - mNumOfPastBuckets].
    void advanceMostRecentBucketTo(const int64_t& bucketNum);

    // Add the information in the given bucket to mSumOverPastBuckets.
    void addBucketToSum(const shared_ptr<DimToValMap>& bucket);
@@ -136,15 +155,21 @@ protected:
    // and remove any items with value 0.
    void subtractBucketFromSum(const shared_ptr<DimToValMap>& bucket);

    // From mSumOverPastBuckets[key], subtracts bucketValue, removing it if it is now 0.
    void subtractValueFromSum(const MetricDimensionKey& key, const int64_t& bucketValue);

    // Returns true if in the refractory period, else false.
    // If there is a stored refractory period but it ended prior to timestampNs, it is removed.
    bool isInRefractoryPeriod(const uint64_t& timestampNs, const MetricDimensionKey& key);

    // Calculates the corresponding bucket index within the circular array.
    // Requires bucketNum >= 0.
    size_t index(int64_t bucketNum) const;

    // Resets all bucket data. For use when all the data gets stale.
    virtual void resetStorage();

    // Informs the subscribers that an anomaly has occurred.
    // Informs the subscribers (incidentd, perfetto, broadcasts, etc) that an anomaly has occurred.
    void informSubscribers(const MetricDimensionKey& key);

    FRIEND_TEST(AnomalyTrackerTest, TestConsecutiveBuckets);
+9 −2
Original line number Diff line number Diff line
@@ -35,7 +35,7 @@ DurationAnomalyTracker::~DurationAnomalyTracker() {

void DurationAnomalyTracker::resetStorage() {
    AnomalyTracker::resetStorage();
    if (!mAlarms.empty()) VLOG("AnomalyTracker.resetStorage() called but mAlarms is NOT empty!");
    if (!mAlarms.empty()) ALOGW("AnomalyTracker.resetStorage() called but mAlarms is NOT empty!");
}

void DurationAnomalyTracker::declareAnomalyIfAlarmExpired(const MetricDimensionKey& dimensionKey,
@@ -57,11 +57,18 @@ void DurationAnomalyTracker::startAlarm(const MetricDimensionKey& dimensionKey,
    // Alarms are stored in secs. Must round up, since if it fires early, it is ignored completely.
    uint32_t timestampSec = static_cast<uint32_t>((timestampNs -1)/ NS_PER_SEC) + 1; // round up
    if (isInRefractoryPeriod(timestampNs, dimensionKey)) {
        // TODO: Bug! By the refractory's end, the data might be erased and the alarm inapplicable.
        VLOG("Setting a delayed anomaly alarm lest it fall in the refractory period");
        timestampSec = getRefractoryPeriodEndsSec(dimensionKey) + 1;
    }

    auto itr = mAlarms.find(dimensionKey);
    if (itr != mAlarms.end() && mAlarmMonitor != nullptr) {
        mAlarmMonitor->remove(itr->second);
    }

    sp<const InternalAlarm> alarm = new InternalAlarm{timestampSec};
    mAlarms.insert({dimensionKey, alarm});
    mAlarms[dimensionKey] = alarm;
    if (mAlarmMonitor != nullptr) {
        mAlarmMonitor->add(alarm);
    }
+3 −2
Original line number Diff line number Diff line
@@ -32,7 +32,8 @@ public:

    virtual ~DurationAnomalyTracker();

    // Starts the alarm at the given timestamp.
    // Sets an alarm for the given timestamp.
    // Replaces previous alarm if one already exists.
    void startAlarm(const MetricDimensionKey& dimensionKey, const uint64_t& eventTime);

    // Stops the alarm.
@@ -46,7 +47,7 @@ public:
                                      const uint64_t& timestampNs);

    // Declares an anomaly for each alarm in firedAlarms that belongs to this DurationAnomalyTracker
    // and removes it from firedAlarms.
    // and removes it from firedAlarms. The AlarmMonitor is not informed.
    // Note that this will generally be called from a different thread from the other functions;
    // the caller is responsible for thread safety.
    void informAlarmsFired(const uint64_t& timestampNs,
+7 −3
Original line number Diff line number Diff line
@@ -328,12 +328,16 @@ int64_t OringDurationTracker::predictAnomalyTimestampNs(
    // TODO: Unit-test this and see if it can be done more efficiently (e.g. use int32).
    // All variables below represent durations (not timestamps).

    const int64_t thresholdNs = anomalyTracker.getAnomalyThreshold();

    // The time until the current bucket ends. This is how much more 'space' it can hold.
    const int64_t currRemainingBucketSizeNs =
            mBucketSizeNs - (eventTimestampNs - mCurrentBucketStartTimeNs);
    // TODO: This should never be < 0. Document/guard against possible failures if it is.

    const int64_t thresholdNs = anomalyTracker.getAnomalyThreshold();
    if (currRemainingBucketSizeNs < 0) {
        ALOGE("OringDurationTracker currRemainingBucketSizeNs < 0");
        // This should never happen. Return the safest thing possible given that data is corrupt.
        return eventTimestampNs + thresholdNs;
    }

    // As we move into the future, old buckets get overwritten (so their old data is erased).

Loading