Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 163d2602 authored by Yao Chen's avatar Yao Chen Committed by Yangster
Browse files

Handle logd reconnect.

When statsd reconnects to logd, statsd will read all logs from buffer again. To prevent us from
reprocessing old events, we do the following:

1. At any given moment, record the largest timestamp(T_max) and last timestamp (check point) that
   we've seen before.
2. When reconnection happens, we look for the check point until we see a new log with a timestamp
   larger than T_max.
   -> If we found the CP, resume after the CP. Success
   -> If we can't find CP, there is definitely log loss. We reset all configs.

Note:
1. Logd has an API to read logs after a certain timestamp. But this api is vulnerable to
time changes from Settings. So we cannot rely on it.

2. If logd inserts a new log (with older timestamp) before CP, we cannot detect it. It's not
   possible to detect it without record all timestamps we have seen.

Test: statsd_test
Bug: 77813113

Change-Id: Ic3fdb47230807606ab11dc994cb162194adb8448
parent 5f91d5ee
Loading
Loading
Loading
Loading
+58 −15
Original line number Diff line number Diff line
@@ -79,7 +79,8 @@ StatsLogProcessor::StatsLogProcessor(const sp<UidMap>& uidMap,
      mPeriodicAlarmMonitor(periodicAlarmMonitor),
      mSendBroadcast(sendBroadcast),
      mTimeBaseNs(timeBaseNs),
      mLastLogTimestamp(0) {
      mLargestTimestampSeen(0),
      mLastTimestampSeen(0) {
}

StatsLogProcessor::~StatsLogProcessor() {
@@ -156,18 +157,54 @@ void StatsLogProcessor::onIsolatedUidChangedEventLocked(const LogEvent& event) {
}

void StatsLogProcessor::OnLogEvent(LogEvent* event) {
    OnLogEvent(event, false);
}

void StatsLogProcessor::OnLogEvent(LogEvent* event, bool reconnected) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    const int64_t currentTimestampNs = event->GetElapsedTimestampNs();

    if (currentTimestampNs < mLastLogTimestamp) {
        StatsdStats::getInstance().noteLogEventSkipped(
            event->GetTagId(), event->GetElapsedTimestampNs());
    if (reconnected && mLastTimestampSeen != 0) {
        // LogReader tells us the connection has just been reset. Now we need
        // to enter reconnection state to find the last CP.
        mInReconnection = true;
    }

    if (mInReconnection) {
        // We see the checkpoint
        if (currentTimestampNs == mLastTimestampSeen) {
            mInReconnection = false;
            // Found the CP. ignore this event, and we will start to read from next event.
            return;
        }
        if (currentTimestampNs > mLargestTimestampSeen) {
            // We see a new log but CP has not been found yet. Give up now.
            mLogLossCount++;
            mInReconnection = false;
            StatsdStats::getInstance().noteLogLost(currentTimestampNs);
            // Persist the data before we reset. Do we want this?
            WriteDataToDiskLocked();
            // We see fresher event before we see the checkpoint. We might have lost data.
            // The best we can do is to reset.
            std::vector<ConfigKey> configKeys;
            for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
                configKeys.push_back(it->first);
            }
            resetConfigsLocked(currentTimestampNs, configKeys);
        } else {
            // Still in search of the CP. Keep going.
            return;
        }
    }

    mLogCount++;
    mLastTimestampSeen = currentTimestampNs;
    if (mLargestTimestampSeen < currentTimestampNs) {
        mLargestTimestampSeen = currentTimestampNs;
    }

    resetIfConfigTtlExpiredLocked(currentTimestampNs);

    mLastLogTimestamp = currentTimestampNs;
    StatsdStats::getInstance().noteAtomLogged(
        event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC);

@@ -339,15 +376,9 @@ void StatsLogProcessor::onConfigMetricsReportLocked(const ConfigKey& key,
                (long long)getWallClockNs());
}

void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
    std::vector<ConfigKey> configKeysTtlExpired;
    for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
        if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
            configKeysTtlExpired.push_back(it->first);
        }
    }

    for (const auto& key : configKeysTtlExpired) {
void StatsLogProcessor::resetConfigsLocked(const int64_t timestampNs,
                                           const std::vector<ConfigKey>& configs) {
    for (const auto& key : configs) {
        StatsdConfig config;
        if (StorageManager::readConfigFromDisk(key, &config)) {
            OnConfigUpdatedLocked(timestampNs, key, config);
@@ -362,6 +393,18 @@ void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs)
    }
}

void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
    std::vector<ConfigKey> configKeysTtlExpired;
    for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
        if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
            configKeysTtlExpired.push_back(it->first);
        }
    }
    if (configKeysTtlExpired.size() > 0) {
        resetConfigsLocked(timestampNs, configKeysTtlExpired);
    }
}

void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    auto it = mMetricsManagers.find(key);
+18 −1
Original line number Diff line number Diff line
@@ -40,6 +40,9 @@ public:
                      const std::function<void(const ConfigKey&)>& sendBroadcast);
    virtual ~StatsLogProcessor();

    void OnLogEvent(LogEvent* event, bool reconnectionStarts);

    // for testing only.
    void OnLogEvent(LogEvent* event);

    void OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key,
@@ -122,16 +125,30 @@ private:
    // Handler over the isolated uid change event.
    void onIsolatedUidChangedEventLocked(const LogEvent& event);

    void resetConfigsLocked(const int64_t timestampNs, const std::vector<ConfigKey>& configs);

    // Function used to send a broadcast so that receiver for the config key can call getData
    // to retrieve the stored data.
    std::function<void(const ConfigKey& key)> mSendBroadcast;

    const int64_t mTimeBaseNs;

    int64_t mLastLogTimestamp;
    // Largest timestamp of the events that we have processed.
    int64_t mLargestTimestampSeen = 0;

    int64_t mLastTimestampSeen = 0;

    bool mInReconnection = false;

    // Processed log count
    uint64_t mLogCount = 0;

    // Log loss detected count
    int mLogLossCount = 0;

    long mLastPullerCacheClearTimeSec = 0;

    FRIEND_TEST(StatsLogProcessorTest, TestOutOfOrderLogs);
    FRIEND_TEST(StatsLogProcessorTest, TestRateLimitByteSize);
    FRIEND_TEST(StatsLogProcessorTest, TestRateLimitBroadcast);
    FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge);
+2 −2
Original line number Diff line number Diff line
@@ -818,8 +818,8 @@ void StatsService::Startup() {
    mConfigManager->Startup();
}

void StatsService::OnLogEvent(LogEvent* event) {
    mProcessor->OnLogEvent(event);
void StatsService::OnLogEvent(LogEvent* event, bool reconnectionStarts) {
    mProcessor->OnLogEvent(event, reconnectionStarts);
}

Status StatsService::getData(int64_t key, vector<uint8_t>* output) {
+1 −1
Original line number Diff line number Diff line
@@ -76,7 +76,7 @@ public:
    /**
     * Called by LogReader when there's a log event to process.
     */
    virtual void OnLogEvent(LogEvent* event);
    virtual void OnLogEvent(LogEvent* event, bool reconnectionStarts);

    /**
     * Binder call for clients to request data for this configuration key.
+15 −23
Original line number Diff line number Diff line
@@ -50,7 +50,7 @@ const int FIELD_ID_ANOMALY_ALARM_STATS = 9;
// const int FIELD_ID_PULLED_ATOM_STATS = 10; // The proto is written in stats_log_util.cpp
const int FIELD_ID_LOGGER_ERROR_STATS = 11;
const int FIELD_ID_PERIODIC_ALARM_STATS = 12;
const int FIELD_ID_SKIPPED_LOG_EVENT_STATS = 13;
const int FIELD_ID_LOG_LOSS_STATS = 14;

const int FIELD_ID_ATOM_STATS_TAG = 1;
const int FIELD_ID_ATOM_STATS_COUNT = 2;
@@ -61,9 +61,6 @@ const int FIELD_ID_PERIODIC_ALARMS_REGISTERED = 1;
const int FIELD_ID_LOGGER_STATS_TIME = 1;
const int FIELD_ID_LOGGER_STATS_ERROR_CODE = 2;

const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG = 1;
const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP = 2;

const int FIELD_ID_CONFIG_STATS_UID = 1;
const int FIELD_ID_CONFIG_STATS_ID = 2;
const int FIELD_ID_CONFIG_STATS_CREATION = 3;
@@ -182,6 +179,14 @@ void StatsdStats::noteConfigReset(const ConfigKey& key) {
    noteConfigResetInternalLocked(key);
}

void StatsdStats::noteLogLost(int64_t timestampNs) {
    lock_guard<std::mutex> lock(mLock);
    if (mLogLossTimestampNs.size() == kMaxLoggerErrors) {
        mLogLossTimestampNs.pop_front();
    }
    mLogLossTimestampNs.push_back(timestampNs);
}

void StatsdStats::noteBroadcastSent(const ConfigKey& key) {
    noteBroadcastSent(key, getWallClockSec());
}
@@ -350,15 +355,6 @@ void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) {
    mPushedAtomStats[atomId]++;
}

void StatsdStats::noteLogEventSkipped(int tag, int64_t timestamp) {
    lock_guard<std::mutex> lock(mLock);
    // grows strictly one at a time. so it won't > kMaxSkippedLogEvents
    if (mSkippedLogEvents.size() == kMaxSkippedLogEvents) {
        mSkippedLogEvents.pop_front();
    }
    mSkippedLogEvents.push_back(std::make_pair(tag, timestamp));
}

void StatsdStats::noteLoggerError(int error) {
    lock_guard<std::mutex> lock(mLock);
    // grows strictly one at a time. so it won't > kMaxLoggerErrors
@@ -381,7 +377,7 @@ void StatsdStats::resetInternalLocked() {
    mAnomalyAlarmRegisteredStats = 0;
    mPeriodicAlarmRegisteredStats = 0;
    mLoggerErrors.clear();
    mSkippedLogEvents.clear();
    mLogLossTimestampNs.clear();
    for (auto& config : mConfigStats) {
        config.second->broadcast_sent_time_sec.clear();
        config.second->data_drop_time_sec.clear();
@@ -515,8 +511,8 @@ void StatsdStats::dumpStats(FILE* out) const {
        strftime(buffer, sizeof(buffer), "%Y-%m-%d %I:%M%p\n", error_tm);
        fprintf(out, "Logger error %d at %s\n", error.second, buffer);
    }
    for (const auto& skipped : mSkippedLogEvents) {
        fprintf(out, "Log event (%d) skipped at %lld\n", skipped.first, (long long)skipped.second);
    for (const auto& loss : mLogLossTimestampNs) {
        fprintf(out, "Log loss detected at %lld (elapsedRealtimeNs)\n", (long long)loss);
    }
}

@@ -672,13 +668,9 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) {
        proto.end(token);
    }

    for (const auto& skipped : mSkippedLogEvents) {
        uint64_t token = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_SKIPPED_LOG_EVENT_STATS |
                                      FIELD_COUNT_REPEATED);
        proto.write(FIELD_TYPE_INT32 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG, skipped.first);
        proto.write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP,
                    (long long)skipped.second);
        proto.end(token);
    for (const auto& loss : mLogLossTimestampNs) {
        proto.write(FIELD_TYPE_INT64 | FIELD_ID_LOG_LOSS_STATS | FIELD_COUNT_REPEATED,
                    (long long)loss);
    }

    output->clear();
Loading