Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0f861867 authored by Yao Chen's avatar Yao Chen
Browse files

Make StatsLog drop less.

+ Create a thread-safe LogEventQueue to buffer log events.

+ The socket listner thread will read from socket and write to the buffer as quickly as possible
  to minimize the data loss in socket.

+ All pushed data is fetched from the the buffer and processed in a dedicated thread. After an
  event is fetched from the queue, we no longer block the socket listener thread.

+ Report event queue stats via statsdstats, including the min and max queue event history span in
  the queue (to understand how slow statsd can be and how fast the events can be)

Bug: 119031518
Test: unit tests added in statsd_test

Change-Id: I6b65ed9a678935b2e24302ba4b36e69c157adde4
parent cd62e442
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -81,7 +81,7 @@ cc_defaults {
        "src/external/StatsPullerManager.cpp",
        "src/external/puller_util.cpp",
        "src/logd/LogEvent.cpp",
        "src/logd/LogListener.cpp",
        "src/logd/LogEventQueue.cpp",
        "src/matchers/CombinationLogMatchingTracker.cpp",
        "src/matchers/EventMatcherWizard.cpp",
        "src/matchers/matcher_util.cpp",
@@ -226,6 +226,7 @@ cc_test {
        "tests/indexed_priority_queue_test.cpp",
        "tests/LogEntryMatcher_test.cpp",
        "tests/LogEvent_test.cpp",
        "tests/log_event/LogEventQueue_test.cpp",
        "tests/MetricsManager_test.cpp",
        "tests/StatsLogProcessor_test.cpp",
        "tests/StatsService_test.cpp",
+53 −28
Original line number Diff line number Diff line
@@ -132,8 +132,9 @@ binder::Status checkDumpAndUsageStats(const String16& packageName) {
    }                                                             \
}

StatsService::StatsService(const sp<Looper>& handlerLooper)
    : mAnomalyAlarmMonitor(new AlarmMonitor(MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
StatsService::StatsService(const sp<Looper>& handlerLooper, shared_ptr<LogEventQueue> queue)
    : mAnomalyAlarmMonitor(new AlarmMonitor(
              MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
              [](const sp<IStatsCompanionService>& sc, int64_t timeMillis) {
                  if (sc != nullptr) {
                      sc->setAnomalyAlarm(timeMillis);
@@ -146,7 +147,8 @@ StatsService::StatsService(const sp<Looper>& handlerLooper)
                      StatsdStats::getInstance().noteRegisteredAnomalyAlarmChanged();
                  }
              })),
   mPeriodicAlarmMonitor(new AlarmMonitor(MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
      mPeriodicAlarmMonitor(new AlarmMonitor(
              MIN_DIFF_TO_UPDATE_REGISTERED_ALARM_SECS,
              [](const sp<IStatsCompanionService>& sc, int64_t timeMillis) {
                  if (sc != nullptr) {
                      sc->setAlarmForSubscriberTriggering(timeMillis);
@@ -158,8 +160,8 @@ StatsService::StatsService(const sp<Looper>& handlerLooper)
                      sc->cancelAlarmForSubscriberTriggering();
                      StatsdStats::getInstance().noteRegisteredPeriodicAlarmChanged();
                  }

      }))  {
              })),
      mEventQueue(queue) {
    mUidMap = UidMap::getInstance();
    mPullerManager = new StatsPullerManager();
    StatsPuller::SetUidMap(mUidMap);
@@ -201,11 +203,33 @@ StatsService::StatsService(const sp<Looper>& handlerLooper)
    mConfigManager->AddListener(mProcessor);

    init_system_properties();

    if (mEventQueue != nullptr) {
        std::thread pushedEventThread([this] { readLogs(); });
        pushedEventThread.detach();
    }
}

StatsService::~StatsService() {
}

/* Runs on a dedicated thread to process pushed events. */
void StatsService::readLogs() {
    // Read forever..... long live statsd
    while (1) {
        // Block until an event is available.
        auto event = mEventQueue->waitPop();
        // Pass it to StatsLogProcess to all configs/metrics
        // At this point, the LogEventQueue is not blocked, so that the socketListener
        // can read events from the socket and write to buffer to avoid data drop.
        mProcessor->OnLogEvent(event.get());
        // The ShellSubscriber is only used by shell for local debugging.
        if (mShellSubscriber != nullptr) {
            mShellSubscriber->onLogEvent(*event);
        }
    }
}

void StatsService::init_system_properties() {
    mEngBuild = false;
    const prop_info* buildType = __system_property_find("ro.build.type");
@@ -1009,6 +1033,7 @@ void StatsService::Terminate() {
    }
}

// Test only interface!!!
void StatsService::OnLogEvent(LogEvent* event) {
    mProcessor->OnLogEvent(event);
    if (mShellSubscriber != nullptr) {
+8 −4
Original line number Diff line number Diff line
@@ -22,7 +22,7 @@
#include "anomaly/AlarmMonitor.h"
#include "config/ConfigManager.h"
#include "external/StatsPullerManager.h"
#include "logd/LogListener.h"
#include "logd/LogEventQueue.h"
#include "packages/UidMap.h"
#include "shell/ShellSubscriber.h"
#include "statscompanion_util.h"
@@ -52,11 +52,10 @@ namespace statsd {
using android::hardware::Return;

class StatsService : public BnStatsManager,
                     public LogListener,
                     public IStats,
                     public IBinder::DeathRecipient {
public:
    StatsService(const sp<Looper>& handlerLooper);
    StatsService(const sp<Looper>& handlerLooper, std::shared_ptr<LogEventQueue> queue);
    virtual ~StatsService();

    /** The anomaly alarm registered with AlarmManager won't be updated by less than this. */
@@ -92,7 +91,7 @@ public:
    void Terminate();

    /**
     * Called by LogReader when there's a log event to process.
     * Test ONLY interface. In real world, StatsService reads from LogEventQueue.
     */
    virtual void OnLogEvent(LogEvent* event);

@@ -278,6 +277,9 @@ private:
     */
    void print_cmd_help(int out);

    /* Runs on its dedicated thread to process pushed stats event from socket. */
    void readLogs();

    /**
     * Trigger a broadcast.
     */
@@ -410,6 +412,8 @@ private:

    sp<ShellSubscriber> mShellSubscriber;

    std::shared_ptr<LogEventQueue> mEventQueue;

    FRIEND_TEST(StatsServiceTest, TestAddConfig_simple);
    FRIEND_TEST(StatsServiceTest, TestAddConfig_empty);
    FRIEND_TEST(StatsServiceTest, TestAddConfig_invalid);
+37 −0
Original line number Diff line number Diff line
@@ -50,6 +50,7 @@ const int FIELD_ID_ANOMALY_ALARM_STATS = 9;
const int FIELD_ID_PERIODIC_ALARM_STATS = 12;
const int FIELD_ID_SYSTEM_SERVER_RESTART = 15;
const int FIELD_ID_LOGGER_ERROR_STATS = 16;
const int FIELD_ID_OVERFLOW = 18;

const int FIELD_ID_ATOM_STATS_TAG = 1;
const int FIELD_ID_ATOM_STATS_COUNT = 2;
@@ -64,6 +65,10 @@ const int FIELD_ID_LOG_LOSS_STATS_TAG = 4;
const int FIELD_ID_LOG_LOSS_STATS_UID = 5;
const int FIELD_ID_LOG_LOSS_STATS_PID = 6;

const int FIELD_ID_OVERFLOW_COUNT = 1;
const int FIELD_ID_OVERFLOW_MAX_HISTORY = 2;
const int FIELD_ID_OVERFLOW_MIN_HISTORY = 3;

const int FIELD_ID_CONFIG_STATS_UID = 1;
const int FIELD_ID_CONFIG_STATS_ID = 2;
const int FIELD_ID_CONFIG_STATS_CREATION = 3;
@@ -235,6 +240,22 @@ void StatsdStats::noteDataDropped(const ConfigKey& key, const size_t totalBytes)
    noteDataDropped(key, totalBytes, getWallClockSec());
}

void StatsdStats::noteEventQueueOverflow(int64_t oldestEventTimestampNs) {
    lock_guard<std::mutex> lock(mLock);

    mOverflowCount++;

    int64_t history = getElapsedRealtimeNs() - oldestEventTimestampNs;

    if (history > mMaxQueueHistoryNs) {
        mMaxQueueHistoryNs = history;
    }

    if (history < mMinQueueHistoryNs) {
        mMinQueueHistoryNs = history;
    }
}

void StatsdStats::noteDataDropped(const ConfigKey& key, const size_t totalBytes, int32_t timeSec) {
    lock_guard<std::mutex> lock(mLock);
    auto it = mConfigStats.find(key);
@@ -534,6 +555,9 @@ void StatsdStats::resetInternalLocked() {
    mPeriodicAlarmRegisteredStats = 0;
    mSystemServerRestartSec.clear();
    mLogLossStats.clear();
    mOverflowCount = 0;
    mMinQueueHistoryNs = kInt64Max;
    mMaxQueueHistoryNs = 0;
    for (auto& config : mConfigStats) {
        config.second->broadcast_sent_time_sec.clear();
        config.second->activation_time_sec.clear();
@@ -726,6 +750,9 @@ void StatsdStats::dumpStats(int out) const {
                (long long)loss.mWallClockSec, loss.mCount, loss.mLastError, loss.mLastTag,
                loss.mUid, loss.mPid);
    }

    dprintf(out, "Event queue overflow: %d; MaxHistoryNs: %lld; MinHistoryNs: %lld\n",
            mOverflowCount, (long long)mMaxQueueHistoryNs, (long long)mMinQueueHistoryNs);
}

void addConfigStatsToProto(const ConfigStats& configStats, ProtoOutputStream* proto) {
@@ -904,6 +931,16 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) {
        proto.end(token);
    }

    if (mOverflowCount > 0) {
        uint64_t token = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_OVERFLOW);
        proto.write(FIELD_TYPE_INT32 | FIELD_ID_OVERFLOW_COUNT, (int32_t)mOverflowCount);
        proto.write(FIELD_TYPE_INT64 | FIELD_ID_OVERFLOW_MAX_HISTORY,
                    (long long)mMaxQueueHistoryNs);
        proto.write(FIELD_TYPE_INT64 | FIELD_ID_OVERFLOW_MIN_HISTORY,
                    (long long)mMinQueueHistoryNs);
        proto.end(token);
    }

    for (const auto& restart : mSystemServerRestartSec) {
        proto.write(FIELD_TYPE_INT32 | FIELD_ID_SYSTEM_SERVER_RESTART | FIELD_COUNT_REPEATED,
                    restart);
+17 −0
Original line number Diff line number Diff line
@@ -160,6 +160,8 @@ public:
    // Max platform atom tag number.
    static const int32_t kMaxPlatformAtomTag = 100000;

    static const int64_t kInt64Max = 0x7fffffffffffffffLL;

    /**
     * Report a new config has been received and report the static stats about the config.
     *
@@ -419,6 +421,10 @@ public:
     */
    void noteBucketUnknownCondition(int64_t metricId);

    /* Reports one event has been dropped due to queue overflow, and the oldest event timestamp in
     * the queue */
    void noteEventQueueOverflow(int64_t oldestEventTimestampNs);

    /**
     * Reset the historical stats. Including all stats in icebox, and the tracked stats about
     * metrics, matchers, and atoms. The active configs will be kept and StatsdStats will continue
@@ -522,6 +528,17 @@ private:
        int32_t mPid;
    };

    // Max of {(now - oldestEventTimestamp) when overflow happens}.
    // This number is helpful to understand how SLOW statsd can be.
    int64_t mMaxQueueHistoryNs = 0;

    // Min of {(now - oldestEventTimestamp) when overflow happens}.
    // This number is helpful to understand how FAST the events floods to statsd.
    int64_t mMinQueueHistoryNs = kInt64Max;

    // Total number of events that are lost due to queue overflow.
    int32_t mOverflowCount = 0;

    // Timestamps when we detect log loss, and the number of logs lost.
    std::list<LogLossStats> mLogLossStats;

Loading