Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 07f7adb4 authored by Ruchir Rastogi's avatar Ruchir Rastogi
Browse files

Handle errors within LogEvent

Errors are recorded within StatsdStats. The associated CL to change
stats_log.proto within google3 is at cr/306795778.

Test: adb shell dumpsys stats --metadata
Test: bit statsd_test:*
Bug: 144373276
Change-Id: Id80cace9350f77d64a2d401f7fac7b12501e82ee
parent 288c434f
Loading
Loading
Loading
Loading
+13 −9
Original line number Diff line number Diff line
@@ -404,15 +404,24 @@ void StatsLogProcessor::OnLogEvent(LogEvent* event) {
void StatsLogProcessor::OnLogEvent(LogEvent* event, int64_t elapsedRealtimeNs) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);

    // Tell StatsdStats about new event
    const int64_t eventElapsedTimeNs = event->GetElapsedTimestampNs();
    int atomId = event->GetTagId();
    StatsdStats::getInstance().noteAtomLogged(atomId, eventElapsedTimeNs / NS_PER_SEC);
    if (!event->isValid()) {
        StatsdStats::getInstance().noteAtomError(atomId);
        return;
    }

    // Hard-coded logic to update train info on disk and fill in any information
    // this log event may be missing.
    if (event->GetTagId() == android::os::statsd::util::BINARY_PUSH_STATE_CHANGED) {
    if (atomId == android::os::statsd::util::BINARY_PUSH_STATE_CHANGED) {
        onBinaryPushStateChangedEventLocked(event);
    }

    // Hard-coded logic to update experiment ids on disk for certain rollback
    // types and fill the rollback atom with experiment ids
    if (event->GetTagId() == android::os::statsd::util::WATCHDOG_ROLLBACK_OCCURRED) {
    if (atomId == android::os::statsd::util::WATCHDOG_ROLLBACK_OCCURRED) {
        onWatchdogRollbackOccurredLocked(event);
    }

@@ -421,16 +430,11 @@ void StatsLogProcessor::OnLogEvent(LogEvent* event, int64_t elapsedRealtimeNs) {
        ALOGI("%s", event->ToString().c_str());
    }
#endif
    const int64_t eventElapsedTimeNs = event->GetElapsedTimestampNs();

    resetIfConfigTtlExpiredLocked(eventElapsedTimeNs);

    StatsdStats::getInstance().noteAtomLogged(
        event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC);

    // Hard-coded logic to update the isolated uid's in the uid-map.
    // The field numbers need to be currently updated by hand with atoms.proto
    if (event->GetTagId() == android::os::statsd::util::ISOLATED_UID_CHANGED) {
    if (atomId == android::os::statsd::util::ISOLATED_UID_CHANGED) {
        onIsolatedUidChangedEventLocked(*event);
    }

@@ -447,7 +451,7 @@ void StatsLogProcessor::OnLogEvent(LogEvent* event, int64_t elapsedRealtimeNs) {
    }


    if (event->GetTagId() != android::os::statsd::util::ISOLATED_UID_CHANGED) {
    if (atomId != android::os::statsd::util::ISOLATED_UID_CHANGED) {
        // Map the isolated uid to host uid if necessary.
        mapIsolatedUidToHostUidIfNecessaryLocked(event);
    }
+8 −2
Original line number Diff line number Diff line
@@ -67,8 +67,14 @@ bool StatsCallbackPuller::PullInternal(vector<shared_ptr<LogEvent>>* data) {
                    lock_guard<mutex> lk(*cv_mutex);
                    for (const StatsEventParcel& parcel: output) {
                        shared_ptr<LogEvent> event = make_shared<LogEvent>(/*uid=*/-1, /*pid=*/-1);
                        event->parseBuffer((uint8_t*)parcel.buffer.data(), parcel.buffer.size());
                        bool valid = event->parseBuffer((uint8_t*)parcel.buffer.data(),
                                                        parcel.buffer.size());
                        if (valid) {
                            sharedData->push_back(event);
                        } else {
                            StatsdStats::getInstance().noteAtomError(event->GetTagId(),
                                                                     /*pull=*/true);
                        }
                    }
                    *pullSuccess = success;
                    *pullFinish = true;
+42 −4
Original line number Diff line number Diff line
@@ -54,6 +54,7 @@ const int FIELD_ID_ACTIVATION_BROADCAST_GUARDRAIL = 19;

const int FIELD_ID_ATOM_STATS_TAG = 1;
const int FIELD_ID_ATOM_STATS_COUNT = 2;
const int FIELD_ID_ATOM_STATS_ERROR_COUNT = 3;

const int FIELD_ID_ANOMALY_ALARMS_REGISTERED = 1;
const int FIELD_ID_PERIODIC_ALARMS_REGISTERED = 1;
@@ -549,6 +550,20 @@ void StatsdStats::noteBucketBoundaryDelayNs(int64_t metricId, int64_t timeDelayN
            std::min(pullStats.minBucketBoundaryDelayNs, timeDelayNs);
}

void StatsdStats::noteAtomError(int atomTag, bool pull) {
    lock_guard<std::mutex> lock(mLock);
    if (pull) {
        mPulledAtomStats[atomTag].atomErrorCount++;
        return;
    }

    bool present = (mPushedAtomErrorStats.find(atomTag) != mPushedAtomErrorStats.end());
    bool full = (mPushedAtomErrorStats.size() >= (size_t)kMaxPushedAtomErrorStatsSize);
    if (!full || present) {
        mPushedAtomErrorStats[atomTag]++;
    }
}

StatsdStats::AtomMetricStats& StatsdStats::getAtomMetricStats(int64_t metricId) {
    auto atomMetricStatsIter = mAtomMetricStats.find(metricId);
    if (atomMetricStatsIter != mAtomMetricStats.end()) {
@@ -604,9 +619,11 @@ void StatsdStats::resetInternalLocked() {
        pullStats.second.pullExceedMaxDelay = 0;
        pullStats.second.registeredCount = 0;
        pullStats.second.unregisteredCount = 0;
        pullStats.second.atomErrorCount = 0;
    }
    mAtomMetricStats.clear();
    mActivationBroadcastGuardrailStats.clear();
    mPushedAtomErrorStats.clear();
}

string buildTimeString(int64_t timeSec) {
@@ -617,6 +634,15 @@ string buildTimeString(int64_t timeSec) {
    return string(timeBuffer);
}

int StatsdStats::getPushedAtomErrors(int atomId) const {
    const auto& it = mPushedAtomErrorStats.find(atomId);
    if (it != mPushedAtomErrorStats.end()) {
        return it->second;
    } else {
        return 0;
    }
}

void StatsdStats::dumpStats(int out) const {
    lock_guard<std::mutex> lock(mLock);
    time_t t = mStartTimeSec;
@@ -721,11 +747,13 @@ void StatsdStats::dumpStats(int out) const {
    const size_t atomCounts = mPushedAtomStats.size();
    for (size_t i = 2; i < atomCounts; i++) {
        if (mPushedAtomStats[i] > 0) {
            dprintf(out, "Atom %lu->%d\n", (unsigned long)i, mPushedAtomStats[i]);
            dprintf(out, "Atom %zu->(total count)%d, (error count)%d\n", i, mPushedAtomStats[i],
                    getPushedAtomErrors((int)i));
        }
    }
    for (const auto& pair : mNonPlatformPushedAtomStats) {
        dprintf(out, "Atom %lu->%d\n", (unsigned long)pair.first, pair.second);
        dprintf(out, "Atom %d->(total count)%d, (error count)%d\n", pair.first, pair.second,
                getPushedAtomErrors(pair.first));
    }

    dprintf(out, "********Pulled Atom stats***********\n");
@@ -737,13 +765,15 @@ void StatsdStats::dumpStats(int out) const {
                "nanos)%lld, "
                "  (max pull delay nanos)%lld, (data error)%ld\n"
                "  (pull timeout)%ld, (pull exceed max delay)%ld\n"
                "  (registered count) %ld, (unregistered count) %ld\n",
                "  (registered count) %ld, (unregistered count) %ld\n"
                "  (atom error count) %d\n",
                (int)pair.first, (long)pair.second.totalPull, (long)pair.second.totalPullFromCache,
                (long)pair.second.pullFailed, (long)pair.second.minPullIntervalSec,
                (long long)pair.second.avgPullTimeNs, (long long)pair.second.maxPullTimeNs,
                (long long)pair.second.avgPullDelayNs, (long long)pair.second.maxPullDelayNs,
                pair.second.dataError, pair.second.pullTimeout, pair.second.pullExceedMaxDelay,
                pair.second.registeredCount, pair.second.unregisteredCount);
                pair.second.registeredCount, pair.second.unregisteredCount,
                pair.second.atomErrorCount);
    }

    if (mAnomalyAlarmRegisteredStats > 0) {
@@ -919,6 +949,10 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) {
                    proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM_STATS | FIELD_COUNT_REPEATED);
            proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_TAG, (int32_t)i);
            proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_COUNT, mPushedAtomStats[i]);
            int errors = getPushedAtomErrors(i);
            if (errors > 0) {
                proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_ERROR_COUNT, errors);
            }
            proto.end(token);
        }
    }
@@ -928,6 +962,10 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) {
                proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM_STATS | FIELD_COUNT_REPEATED);
        proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_TAG, pair.first);
        proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_COUNT, pair.second);
        int errors = getPushedAtomErrors(pair.first);
        if (errors > 0) {
            proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_ERROR_COUNT, errors);
        }
        proto.end(token);
    }

+20 −0
Original line number Diff line number Diff line
@@ -457,6 +457,16 @@ public:
     */
     void noteActivationBroadcastGuardrailHit(const int uid);

     /**
      * Reports that an atom is erroneous or cannot be parsed successfully by
      * statsd. An atom tag of 0 indicates that the client did not supply the
      * atom id within the encoding.
      *
      * For pushed atoms only, this call should be preceded by a call to
      * noteAtomLogged.
      */
     void noteAtomError(int atomTag, bool pull=false);

    /**
     * Reset the historical stats. Including all stats in icebox, and the tracked stats about
     * metrics, matchers, and atoms. The active configs will be kept and StatsdStats will continue
@@ -495,6 +505,7 @@ public:
        long emptyData = 0;
        long registeredCount = 0;
        long unregisteredCount = 0;
        int32_t atomErrorCount = 0;
    } PulledAtomStats;

    typedef struct {
@@ -542,6 +553,12 @@ private:
    // Maps PullAtomId to its stats. The size is capped by the puller atom counts.
    std::map<int, PulledAtomStats> mPulledAtomStats;

    // Stores the number of times a pushed atom was logged erroneously. The
    // corresponding counts for pulled atoms are stored in PulledAtomStats.
    // The max size of this map is kMaxAtomErrorsStatsSize.
    std::map<int, int> mPushedAtomErrorStats;
    int kMaxPushedAtomErrorStatsSize = 100;

    // Maps metric ID to its stats. The size is capped by the number of metrics.
    std::map<int64_t, AtomMetricStats> mAtomMetricStats;

@@ -609,6 +626,8 @@ private:

    void addToIceBoxLocked(std::shared_ptr<ConfigStats>& stats);

    int getPushedAtomErrors(int atomId) const;

    /**
     * Get a reference to AtomMetricStats for a metric. If none exists, create it. The reference
     * will live as long as `this`.
@@ -627,6 +646,7 @@ private:
    FRIEND_TEST(StatsdStatsTest, TestPullAtomStats);
    FRIEND_TEST(StatsdStatsTest, TestAtomMetricsStats);
    FRIEND_TEST(StatsdStatsTest, TestActivationBroadcastGuardrailHit);
    FRIEND_TEST(StatsdStatsTest, TestAtomErrorStats);
};

}  // namespace statsd
+5 −1
Original line number Diff line number Diff line
@@ -379,7 +379,6 @@ bool LogEvent::parseBuffer(uint8_t* buf, size_t len) {
        typeInfo = readNextValue<uint8_t>();
        uint8_t typeId = getTypeId(typeInfo);

        // TODO(b/144373276): handle errors passed to the socket
        switch (typeId) {
            case BOOL_TYPE:
                parseBool(pos, /*depth=*/0, last, getNumAnnotations(typeInfo));
@@ -406,8 +405,13 @@ bool LogEvent::parseBuffer(uint8_t* buf, size_t len) {
                parseAttributionChain(pos, /*depth=*/0, last, getNumAnnotations(typeInfo));
                if (mAttributionChainIndex == -1) mAttributionChainIndex = pos[0];
                break;
            case ERROR_TYPE:
                mErrorBitmask = readNextValue<int32_t>();
                mValid = false;
                break;
            default:
                mValid = false;
                break;
        }
    }

Loading