Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 80858223 authored by Yao Chen's avatar Yao Chen Committed by android-build-merger
Browse files

Merge "Handle logd reconnect." into pi-dev

am: 2e0f45f0

Change-Id: I15cdb063331da9b1e6479d598314fb1cf05ab9f1
parents 0c4b2b7f 2e0f45f0
Loading
Loading
Loading
Loading
+58 −15
Original line number Diff line number Diff line
@@ -79,7 +79,8 @@ StatsLogProcessor::StatsLogProcessor(const sp<UidMap>& uidMap,
      mPeriodicAlarmMonitor(periodicAlarmMonitor),
      mSendBroadcast(sendBroadcast),
      mTimeBaseNs(timeBaseNs),
      mLastLogTimestamp(0) {
      mLargestTimestampSeen(0),
      mLastTimestampSeen(0) {
}

StatsLogProcessor::~StatsLogProcessor() {
@@ -156,18 +157,54 @@ void StatsLogProcessor::onIsolatedUidChangedEventLocked(const LogEvent& event) {
}

void StatsLogProcessor::OnLogEvent(LogEvent* event) {
    OnLogEvent(event, false);
}

void StatsLogProcessor::OnLogEvent(LogEvent* event, bool reconnected) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    const int64_t currentTimestampNs = event->GetElapsedTimestampNs();

    if (currentTimestampNs < mLastLogTimestamp) {
        StatsdStats::getInstance().noteLogEventSkipped(
            event->GetTagId(), event->GetElapsedTimestampNs());
    if (reconnected && mLastTimestampSeen != 0) {
        // LogReader tells us the connection has just been reset. Now we need
        // to enter reconnection state to find the last CP.
        mInReconnection = true;
    }

    if (mInReconnection) {
        // We see the checkpoint
        if (currentTimestampNs == mLastTimestampSeen) {
            mInReconnection = false;
            // Found the CP. ignore this event, and we will start to read from next event.
            return;
        }
        if (currentTimestampNs > mLargestTimestampSeen) {
            // We see a new log but CP has not been found yet. Give up now.
            mLogLossCount++;
            mInReconnection = false;
            StatsdStats::getInstance().noteLogLost(currentTimestampNs);
            // Persist the data before we reset. Do we want this?
            WriteDataToDiskLocked();
            // We see fresher event before we see the checkpoint. We might have lost data.
            // The best we can do is to reset.
            std::vector<ConfigKey> configKeys;
            for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
                configKeys.push_back(it->first);
            }
            resetConfigsLocked(currentTimestampNs, configKeys);
        } else {
            // Still in search of the CP. Keep going.
            return;
        }
    }

    mLogCount++;
    mLastTimestampSeen = currentTimestampNs;
    if (mLargestTimestampSeen < currentTimestampNs) {
        mLargestTimestampSeen = currentTimestampNs;
    }

    resetIfConfigTtlExpiredLocked(currentTimestampNs);

    mLastLogTimestamp = currentTimestampNs;
    StatsdStats::getInstance().noteAtomLogged(
        event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC);

@@ -339,15 +376,9 @@ void StatsLogProcessor::onConfigMetricsReportLocked(const ConfigKey& key,
                (long long)getWallClockNs());
}

void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
    std::vector<ConfigKey> configKeysTtlExpired;
    for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
        if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
            configKeysTtlExpired.push_back(it->first);
        }
    }

    for (const auto& key : configKeysTtlExpired) {
void StatsLogProcessor::resetConfigsLocked(const int64_t timestampNs,
                                           const std::vector<ConfigKey>& configs) {
    for (const auto& key : configs) {
        StatsdConfig config;
        if (StorageManager::readConfigFromDisk(key, &config)) {
            OnConfigUpdatedLocked(timestampNs, key, config);
@@ -362,6 +393,18 @@ void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs)
    }
}

void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
    std::vector<ConfigKey> configKeysTtlExpired;
    for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
        if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
            configKeysTtlExpired.push_back(it->first);
        }
    }
    if (configKeysTtlExpired.size() > 0) {
        resetConfigsLocked(timestampNs, configKeysTtlExpired);
    }
}

void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    auto it = mMetricsManagers.find(key);
+18 −1
Original line number Diff line number Diff line
@@ -40,6 +40,9 @@ public:
                      const std::function<void(const ConfigKey&)>& sendBroadcast);
    virtual ~StatsLogProcessor();

    void OnLogEvent(LogEvent* event, bool reconnectionStarts);

    // for testing only.
    void OnLogEvent(LogEvent* event);

    void OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key,
@@ -122,16 +125,30 @@ private:
    // Handler over the isolated uid change event.
    void onIsolatedUidChangedEventLocked(const LogEvent& event);

    void resetConfigsLocked(const int64_t timestampNs, const std::vector<ConfigKey>& configs);

    // Function used to send a broadcast so that receiver for the config key can call getData
    // to retrieve the stored data.
    std::function<void(const ConfigKey& key)> mSendBroadcast;

    const int64_t mTimeBaseNs;

    int64_t mLastLogTimestamp;
    // Largest timestamp of the events that we have processed.
    int64_t mLargestTimestampSeen = 0;

    int64_t mLastTimestampSeen = 0;

    bool mInReconnection = false;

    // Processed log count
    uint64_t mLogCount = 0;

    // Log loss detected count
    int mLogLossCount = 0;

    long mLastPullerCacheClearTimeSec = 0;

    FRIEND_TEST(StatsLogProcessorTest, TestOutOfOrderLogs);
    FRIEND_TEST(StatsLogProcessorTest, TestRateLimitByteSize);
    FRIEND_TEST(StatsLogProcessorTest, TestRateLimitBroadcast);
    FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge);
+2 −2
Original line number Diff line number Diff line
@@ -818,8 +818,8 @@ void StatsService::Startup() {
    mConfigManager->Startup();
}

void StatsService::OnLogEvent(LogEvent* event) {
    mProcessor->OnLogEvent(event);
void StatsService::OnLogEvent(LogEvent* event, bool reconnectionStarts) {
    mProcessor->OnLogEvent(event, reconnectionStarts);
}

Status StatsService::getData(int64_t key, vector<uint8_t>* output) {
+1 −1
Original line number Diff line number Diff line
@@ -76,7 +76,7 @@ public:
    /**
     * Called by LogReader when there's a log event to process.
     */
    virtual void OnLogEvent(LogEvent* event);
    virtual void OnLogEvent(LogEvent* event, bool reconnectionStarts);

    /**
     * Binder call for clients to request data for this configuration key.
+15 −23
Original line number Diff line number Diff line
@@ -50,7 +50,7 @@ const int FIELD_ID_ANOMALY_ALARM_STATS = 9;
// const int FIELD_ID_PULLED_ATOM_STATS = 10; // The proto is written in stats_log_util.cpp
const int FIELD_ID_LOGGER_ERROR_STATS = 11;
const int FIELD_ID_PERIODIC_ALARM_STATS = 12;
const int FIELD_ID_SKIPPED_LOG_EVENT_STATS = 13;
const int FIELD_ID_LOG_LOSS_STATS = 14;

const int FIELD_ID_ATOM_STATS_TAG = 1;
const int FIELD_ID_ATOM_STATS_COUNT = 2;
@@ -61,9 +61,6 @@ const int FIELD_ID_PERIODIC_ALARMS_REGISTERED = 1;
const int FIELD_ID_LOGGER_STATS_TIME = 1;
const int FIELD_ID_LOGGER_STATS_ERROR_CODE = 2;

const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG = 1;
const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP = 2;

const int FIELD_ID_CONFIG_STATS_UID = 1;
const int FIELD_ID_CONFIG_STATS_ID = 2;
const int FIELD_ID_CONFIG_STATS_CREATION = 3;
@@ -182,6 +179,14 @@ void StatsdStats::noteConfigReset(const ConfigKey& key) {
    noteConfigResetInternalLocked(key);
}

void StatsdStats::noteLogLost(int64_t timestampNs) {
    lock_guard<std::mutex> lock(mLock);
    if (mLogLossTimestampNs.size() == kMaxLoggerErrors) {
        mLogLossTimestampNs.pop_front();
    }
    mLogLossTimestampNs.push_back(timestampNs);
}

void StatsdStats::noteBroadcastSent(const ConfigKey& key) {
    noteBroadcastSent(key, getWallClockSec());
}
@@ -350,15 +355,6 @@ void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) {
    mPushedAtomStats[atomId]++;
}

void StatsdStats::noteLogEventSkipped(int tag, int64_t timestamp) {
    lock_guard<std::mutex> lock(mLock);
    // grows strictly one at a time. so it won't > kMaxSkippedLogEvents
    if (mSkippedLogEvents.size() == kMaxSkippedLogEvents) {
        mSkippedLogEvents.pop_front();
    }
    mSkippedLogEvents.push_back(std::make_pair(tag, timestamp));
}

void StatsdStats::noteLoggerError(int error) {
    lock_guard<std::mutex> lock(mLock);
    // grows strictly one at a time. so it won't > kMaxLoggerErrors
@@ -381,7 +377,7 @@ void StatsdStats::resetInternalLocked() {
    mAnomalyAlarmRegisteredStats = 0;
    mPeriodicAlarmRegisteredStats = 0;
    mLoggerErrors.clear();
    mSkippedLogEvents.clear();
    mLogLossTimestampNs.clear();
    for (auto& config : mConfigStats) {
        config.second->broadcast_sent_time_sec.clear();
        config.second->data_drop_time_sec.clear();
@@ -515,8 +511,8 @@ void StatsdStats::dumpStats(FILE* out) const {
        strftime(buffer, sizeof(buffer), "%Y-%m-%d %I:%M%p\n", error_tm);
        fprintf(out, "Logger error %d at %s\n", error.second, buffer);
    }
    for (const auto& skipped : mSkippedLogEvents) {
        fprintf(out, "Log event (%d) skipped at %lld\n", skipped.first, (long long)skipped.second);
    for (const auto& loss : mLogLossTimestampNs) {
        fprintf(out, "Log loss detected at %lld (elapsedRealtimeNs)\n", (long long)loss);
    }
}

@@ -672,13 +668,9 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) {
        proto.end(token);
    }

    for (const auto& skipped : mSkippedLogEvents) {
        uint64_t token = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_SKIPPED_LOG_EVENT_STATS |
                                      FIELD_COUNT_REPEATED);
        proto.write(FIELD_TYPE_INT32 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG, skipped.first);
        proto.write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP,
                    (long long)skipped.second);
        proto.end(token);
    for (const auto& loss : mLogLossTimestampNs) {
        proto.write(FIELD_TYPE_INT64 | FIELD_ID_LOG_LOSS_STATS | FIELD_COUNT_REPEATED,
                    (long long)loss);
    }

    output->clear();
Loading