Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6c75ecdc authored by Olivier Gaillard's avatar Olivier Gaillard
Browse files

Introduces an option to set a dump latency requirement.

We are currently dumping invalid data for pulled metrics. Pulled metrics
require a new pull when flushing a bucket. We should either do another
pull or invalidate the previous bucket.

There are cases where we cannot afford to do another pull, e.g. statsd
being killed. If we do not have enough time, we'll just invalidte the
bucket to make sure we have correct data.

Bug: 123866830
Test: atest statsd_test
Change-Id: I090127cace3b7265032ebb2c9bddae976c883771
parent fe71582e
Loading
Loading
Loading
Loading
+22 −13
Original line number Diff line number Diff line
@@ -298,7 +298,7 @@ void StatsLogProcessor::GetActiveConfigsLocked(const int uid, vector<int64_t>& o
void StatsLogProcessor::OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key,
                                        const StatsdConfig& config) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    WriteDataToDiskLocked(key, timestampNs, CONFIG_UPDATED);
    WriteDataToDiskLocked(key, timestampNs, CONFIG_UPDATED, NO_TIME_CONSTRAINTS);
    OnConfigUpdatedLocked(timestampNs, key, config);
}

@@ -355,6 +355,7 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, const int64_t dumpTim
                                     const bool include_current_partial_bucket,
                                     const bool erase_data,
                                     const DumpReportReason dumpReportReason,
                                     const DumpLatency dumpLatency,
                                     ProtoOutputStream* proto) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);

@@ -378,8 +379,10 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, const int64_t dumpTim
        // Start of ConfigMetricsReport (reports).
        uint64_t reportsToken =
                proto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_REPORTS);
        onConfigMetricsReportLocked(key, dumpTimeStampNs, include_current_partial_bucket,
                                    erase_data, dumpReportReason, proto);
        onConfigMetricsReportLocked(key, dumpTimeStampNs,
                                    include_current_partial_bucket,
                                    erase_data, dumpReportReason,
                                    dumpLatency, proto);
        proto->end(reportsToken);
        // End of ConfigMetricsReport (reports).
    } else {
@@ -394,10 +397,11 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, const int64_t dumpTim
                                     const bool include_current_partial_bucket,
                                     const bool erase_data,
                                     const DumpReportReason dumpReportReason,
                                     const DumpLatency dumpLatency,
                                     vector<uint8_t>* outData) {
    ProtoOutputStream proto;
    onDumpReport(key, dumpTimeStampNs, include_current_partial_bucket, erase_data,
                 dumpReportReason, &proto);
                 dumpReportReason, dumpLatency, &proto);

    if (outData != nullptr) {
        outData->clear();
@@ -423,6 +427,7 @@ void StatsLogProcessor::onConfigMetricsReportLocked(const ConfigKey& key,
                                                    const bool include_current_partial_bucket,
                                                    const bool erase_data,
                                                    const DumpReportReason dumpReportReason,
                                                    const DumpLatency dumpLatency,
                                                    ProtoOutputStream* proto) {
    // We already checked whether key exists in mMetricsManagers in
    // WriteDataToDisk.
@@ -438,7 +443,7 @@ void StatsLogProcessor::onConfigMetricsReportLocked(const ConfigKey& key,
    // First, fill in ConfigMetricsReport using current data on memory, which
    // starts from filling in StatsLogReport's.
    it->second->onDumpReport(dumpTimeStampNs, include_current_partial_bucket,
                             erase_data, &str_set, proto);
                             erase_data, dumpLatency, &str_set, proto);

    // Fill in UidMap if there is at least one metric to report.
    // This skips the uid map if it's an empty config.
@@ -492,7 +497,7 @@ void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs)
        }
    }
    if (configKeysTtlExpired.size() > 0) {
        WriteDataToDiskLocked(CONFIG_RESET);
        WriteDataToDiskLocked(CONFIG_RESET, NO_TIME_CONSTRAINTS);
        resetConfigsLocked(timestampNs, configKeysTtlExpired);
    }
}
@@ -501,7 +506,8 @@ void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    auto it = mMetricsManagers.find(key);
    if (it != mMetricsManagers.end()) {
        WriteDataToDiskLocked(key, getElapsedRealtimeNs(), CONFIG_REMOVED);
        WriteDataToDiskLocked(key, getElapsedRealtimeNs(), CONFIG_REMOVED, 
                              NO_TIME_CONSTRAINTS);
        mMetricsManagers.erase(it);
        mUidMap->OnConfigRemoved(key);
    }
@@ -572,14 +578,15 @@ void StatsLogProcessor::flushIfNecessaryLocked(

void StatsLogProcessor::WriteDataToDiskLocked(const ConfigKey& key,
                                              const int64_t timestampNs,
                                              const DumpReportReason dumpReportReason) {
                                              const DumpReportReason dumpReportReason,
                                              const DumpLatency dumpLatency) {
    if (mMetricsManagers.find(key) == mMetricsManagers.end() ||
        !mMetricsManagers.find(key)->second->shouldWriteToDisk()) {
        return;
    }
    ProtoOutputStream proto;
    onConfigMetricsReportLocked(key, timestampNs, true /* include_current_partial_bucket*/,
                                true /* erase_data */, dumpReportReason, &proto);
                                true /* erase_data */, dumpReportReason, dumpLatency, &proto);
    string file_name = StringPrintf("%s/%ld_%d_%lld", STATS_DATA_DIR,
         (long)getWallClockSec(), key.GetUid(), (long long)key.GetId());
    android::base::unique_fd fd(open(file_name.c_str(),
@@ -658,7 +665,8 @@ void StatsLogProcessor::LoadMetricsActivationFromDisk() {
    StorageManager::deleteFile(file_name.c_str());
}

void StatsLogProcessor::WriteDataToDiskLocked(const DumpReportReason dumpReportReason) {
void StatsLogProcessor::WriteDataToDiskLocked(const DumpReportReason dumpReportReason,
                                              const DumpLatency dumpLatency) {
    const int64_t timeNs = getElapsedRealtimeNs();
    // Do not write to disk if we already have in the last few seconds.
    // This is to avoid overwriting files that would have the same name if we
@@ -671,13 +679,14 @@ void StatsLogProcessor::WriteDataToDiskLocked(const DumpReportReason dumpReportR
    }
    mLastWriteTimeNs = timeNs;
    for (auto& pair : mMetricsManagers) {
        WriteDataToDiskLocked(pair.first, timeNs, dumpReportReason);
        WriteDataToDiskLocked(pair.first, timeNs, dumpReportReason, dumpLatency);
    }
}

void StatsLogProcessor::WriteDataToDisk(const DumpReportReason dumpReportReason) {
void StatsLogProcessor::WriteDataToDisk(const DumpReportReason dumpReportReason, 
                                        const DumpLatency dumpLatency) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    WriteDataToDiskLocked(dumpReportReason);
    WriteDataToDiskLocked(dumpReportReason, dumpLatency);
}

void StatsLogProcessor::informPullAlarmFired(const int64_t timestampNs) {
+13 −5
Original line number Diff line number Diff line
@@ -66,10 +66,14 @@ public:

    void onDumpReport(const ConfigKey& key, const int64_t dumpTimeNs,
                      const bool include_current_partial_bucket, const bool erase_data,
                      const DumpReportReason dumpReportReason, vector<uint8_t>* outData);
                      const DumpReportReason dumpReportReason, 
                      const DumpLatency dumpLatency,
                      vector<uint8_t>* outData);
    void onDumpReport(const ConfigKey& key, const int64_t dumpTimeNs,
                      const bool include_current_partial_bucket, const bool erase_data,
                      const DumpReportReason dumpReportReason, ProtoOutputStream* proto);
                      const DumpReportReason dumpReportReason,
                      const DumpLatency dumpLatency,
                      ProtoOutputStream* proto);

    /* Tells MetricsManager that the alarms in alarmSet have fired. Modifies anomaly alarmSet. */
    void onAnomalyAlarmFired(
@@ -82,7 +86,8 @@ public:
            unordered_set<sp<const InternalAlarm>, SpHash<InternalAlarm>> alarmSet);

    /* Flushes data to disk. Data on memory will be gone after written to disk. */
    void WriteDataToDisk(const DumpReportReason dumpReportReason);
    void WriteDataToDisk(const DumpReportReason dumpReportReason,
                         const DumpLatency dumpLatency);

    /* Persist metric activation status onto disk. */
    void WriteMetricsActivationToDisk(int64_t currentTimeNs);
@@ -153,14 +158,17 @@ private:

    void GetActiveConfigsLocked(const int uid, vector<int64_t>& outActiveConfigs);

    void WriteDataToDiskLocked(const DumpReportReason dumpReportReason);
    void WriteDataToDiskLocked(const DumpReportReason dumpReportReason,
                               const DumpLatency dumpLatency);
    void WriteDataToDiskLocked(const ConfigKey& key, const int64_t timestampNs,
                               const DumpReportReason dumpReportReason);
                               const DumpReportReason dumpReportReason,
                               const DumpLatency dumpLatency);

    void onConfigMetricsReportLocked(const ConfigKey& key, const int64_t dumpTimeStampNs,
                                     const bool include_current_partial_bucket,
                                     const bool erase_data,
                                     const DumpReportReason dumpReportReason,
                                     const DumpLatency dumpLatency,
                                     util::ProtoOutputStream* proto);

    /* Check if we should send a broadcast if approaching memory limits and if we're over, we
+13 −7
Original line number Diff line number Diff line
@@ -313,7 +313,9 @@ void StatsService::dumpIncidentSection(int out) {
                proto.start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_REPORTS_LIST);
        mProcessor->onDumpReport(configKey, getElapsedRealtimeNs(),
                                 true /* includeCurrentBucket */, false /* erase_data */,
                                 ADB_DUMP, &proto);
                                 ADB_DUMP,
                                 FAST,
                                 &proto);
        proto.end(reportsListToken);
        proto.flush(out);
        proto.clear();
@@ -694,7 +696,9 @@ status_t StatsService::cmd_dump_report(int out, const Vector<String8>& args) {
        if (good) {
            vector<uint8_t> data;
            mProcessor->onDumpReport(ConfigKey(uid, StrToInt64(name)), getElapsedRealtimeNs(),
                                     includeCurrentBucket, eraseData, ADB_DUMP, &data);
                                     includeCurrentBucket, eraseData, ADB_DUMP,
                                     NO_TIME_CONSTRAINTS,
                                     &data);
            if (proto) {
                for (size_t i = 0; i < data.size(); i ++) {
                    dprintf(out, "%c", data[i]);
@@ -758,7 +762,7 @@ status_t StatsService::cmd_print_uid_map(int out, const Vector<String8>& args) {

status_t StatsService::cmd_write_data_to_disk(int out) {
    dprintf(out, "Writing data to disk\n");
    mProcessor->WriteDataToDisk(ADB_DUMP);
    mProcessor->WriteDataToDisk(ADB_DUMP, NO_TIME_CONSTRAINTS);
    return NO_ERROR;
}

@@ -958,7 +962,7 @@ Status StatsService::systemRunning() {
Status StatsService::informDeviceShutdown() {
    ENFORCE_UID(AID_SYSTEM);
    VLOG("StatsService::informDeviceShutdown");
    mProcessor->WriteDataToDisk(DEVICE_SHUTDOWN);
    mProcessor->WriteDataToDisk(DEVICE_SHUTDOWN, FAST);
    mProcessor->WriteMetricsActivationToDisk(getElapsedRealtimeNs());
    return Status::ok();
}
@@ -1000,7 +1004,7 @@ void StatsService::Startup() {
void StatsService::Terminate() {
    ALOGI("StatsService::Terminating");
    if (mProcessor != nullptr) {
        mProcessor->WriteDataToDisk(TERMINATION_SIGNAL_RECEIVED);
        mProcessor->WriteDataToDisk(TERMINATION_SIGNAL_RECEIVED, FAST);
    }
}

@@ -1017,8 +1021,10 @@ Status StatsService::getData(int64_t key, const String16& packageName, vector<ui
    IPCThreadState* ipc = IPCThreadState::self();
    VLOG("StatsService::getData with Pid %i, Uid %i", ipc->getCallingPid(), ipc->getCallingUid());
    ConfigKey configKey(ipc->getCallingUid(), key);
    // The dump latency does not matter here since we do not include the current bucket, we do not
    // need to pull any new data anyhow.
    mProcessor->onDumpReport(configKey, getElapsedRealtimeNs(), false /* include_current_bucket*/,
                             true /* erase_data */, GET_DATA_CALLED, output);
                             true /* erase_data */, GET_DATA_CALLED, FAST, output);
    return Status::ok();
}

@@ -1312,7 +1318,7 @@ void StatsService::binderDied(const wp <IBinder>& who) {
    StatsdStats::getInstance().noteSystemServerRestart(getWallClockSec());
    if (mProcessor != nullptr) {
        ALOGW("Reset statsd upon system server restarts.");
        mProcessor->WriteDataToDisk(STATSCOMPANION_DIED);
        mProcessor->WriteDataToDisk(STATSCOMPANION_DIED, FAST);
        mProcessor->resetConfigs();
    }
    mAnomalyAlarmMonitor->setStatsCompanionService(nullptr);
+4 −3
Original line number Diff line number Diff line
@@ -139,13 +139,13 @@ void CountMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition


void CountMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
    flushIfNeededLocked(dumpTimeNs);
    mPastBuckets.clear();
}

void CountMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                             const bool include_current_partial_bucket,
                                             const bool erase_data,
                                             const DumpLatency dumpLatency,
                                             std::set<string> *str_set,
                                             ProtoOutputStream* protoOutput) {
    if (include_current_partial_bucket) {
@@ -319,7 +319,7 @@ void CountMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
        return;
    }

    flushCurrentBucketLocked(eventTimeNs);
    flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
    // Setup the bucket start time and number.
    int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
    mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
@@ -328,7 +328,8 @@ void CountMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
         (long long)mCurrentBucketStartTimeNs);
}

void CountMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
void CountMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
                                                   const int64_t& nextBucketStartTimeNs) {
    int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
    CountBucket info;
    info.mBucketStartNs = mCurrentBucketStartTimeNs;
+3 −1
Original line number Diff line number Diff line
@@ -57,6 +57,7 @@ private:
    void onDumpReportLocked(const int64_t dumpTimeNs,
                            const bool include_current_partial_bucket,
                            const bool erase_data,
                            const DumpLatency dumpLatency,
                            std::set<string> *str_set,
                            android::util::ProtoOutputStream* protoOutput) override;

@@ -78,7 +79,8 @@ private:
    // Util function to flush the old packet.
    void flushIfNeededLocked(const int64_t& newEventTime) override;

    void flushCurrentBucketLocked(const int64_t& eventTimeNs) override;
    void flushCurrentBucketLocked(const int64_t& eventTimeNs,
                                  const int64_t& nextBucketStartTimeNs) override;

    std::unordered_map<MetricDimensionKey, std::vector<CountBucket>> mPastBuckets;

Loading