Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b0d0628a authored by Yangster-mac's avatar Yangster-mac
Browse files

Thread-safety at log processor level.

Test: statsd unit test passed.

Change-Id: Ibe8c8d3cc8297875b16ee385c077b71c87353147
parent 20ac944e
Loading
Loading
Loading
Loading
+12 −11
Original line number Diff line number Diff line
@@ -82,9 +82,7 @@ StatsLogProcessor::~StatsLogProcessor() {
void StatsLogProcessor::onAnomalyAlarmFired(
        const uint64_t timestampNs,
        unordered_set<sp<const AnomalyAlarm>, SpHash<AnomalyAlarm>> anomalySet) {
    // TODO: This is a thread-safety issue. mMetricsManagers could change under our feet.
    // TODO: Solution? Lock everything! :(
    // TODO: Question: Can we replace the other lock (broadcast), or do we need to supplement it?
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    for (const auto& itr : mMetricsManagers) {
        itr.second->onAnomalyAlarmFired(timestampNs, anomalySet);
    }
@@ -92,11 +90,13 @@ void StatsLogProcessor::onAnomalyAlarmFired(

// TODO: what if statsd service restarts? How do we know what logs are already processed before?
void StatsLogProcessor::OnLogEvent(const LogEvent& msg) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);

    StatsdStats::getInstance().noteAtomLogged(msg.GetTagId(), msg.GetTimestampNs() / NS_PER_SEC);
    // pass the event to metrics managers.
    for (auto& pair : mMetricsManagers) {
        pair.second->onLogEvent(msg);
        flushIfNecessary(msg.GetTimestampNs(), pair.first, *(pair.second));
        flushIfNecessaryLocked(msg.GetTimestampNs(), pair.first, *(pair.second));
    }
    // Hard-coded logic to update the isolated uid's in the uid-map.
    // The field numbers need to be currently updated by hand with atoms.proto
@@ -116,6 +116,7 @@ void StatsLogProcessor::OnLogEvent(const LogEvent& msg) {
}

void StatsLogProcessor::OnConfigUpdated(const ConfigKey& key, const StatsdConfig& config) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    ALOGD("Updated configuration for key %s", key.ToString().c_str());
    sp<MetricsManager> newMetricsManager = new MetricsManager(key, config, mTimeBaseSec, mUidMap);
    auto it = mMetricsManagers.find(key);
@@ -142,6 +143,7 @@ void StatsLogProcessor::OnConfigUpdated(const ConfigKey& key, const StatsdConfig
}

size_t StatsLogProcessor::GetMetricsSize(const ConfigKey& key) const {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    auto it = mMetricsManagers.find(key);
    if (it == mMetricsManagers.end()) {
        ALOGW("Config source %s does not exist", key.ToString().c_str());
@@ -152,6 +154,7 @@ size_t StatsLogProcessor::GetMetricsSize(const ConfigKey& key) const {

void StatsLogProcessor::onDumpReport(const ConfigKey& key, const uint64_t& dumpTimeStampNs,
                                     ConfigMetricsReportList* report) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    auto it = mMetricsManagers.find(key);
    if (it == mMetricsManagers.end()) {
        ALOGW("Config source %s does not exist", key.ToString().c_str());
@@ -165,6 +168,7 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, const uint64_t& dumpT
}

void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outData) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    auto it = mMetricsManagers.find(key);
    if (it == mMetricsManagers.end()) {
        ALOGW("Config source %s does not exist", key.ToString().c_str());
@@ -173,9 +177,7 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outD

    // This allows another broadcast to be sent within the rate-limit period if we get close to
    // filling the buffer again soon.
    mBroadcastTimesMutex.lock();
    mLastBroadcastTimes.erase(key);
    mBroadcastTimesMutex.unlock();

    ProtoOutputStream proto;

@@ -224,6 +226,7 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outD
}

void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    auto it = mMetricsManagers.find(key);
    if (it != mMetricsManagers.end()) {
        mMetricsManagers.erase(it);
@@ -231,14 +234,11 @@ void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
    }
    StatsdStats::getInstance().noteConfigRemoved(key);

    std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);
    mLastBroadcastTimes.erase(key);
}

void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs, const ConfigKey& key,
                                         MetricsManager& metricsManager) {
    std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);

void StatsLogProcessor::flushIfNecessaryLocked(
    uint64_t timestampNs, const ConfigKey& key, MetricsManager& metricsManager) {
    auto lastCheckTime = mLastByteSizeTimes.find(key);
    if (lastCheckTime != mLastByteSizeTimes.end()) {
        if (timestampNs - lastCheckTime->second < StatsdStats::kMinByteSizeCheckPeriodNs) {
@@ -274,6 +274,7 @@ void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs, const ConfigKey&

void StatsLogProcessor::WriteDataToDisk() {
    mkdir(STATS_DATA_DIR, S_IRWXU);
    std::lock_guard<std::mutex> lock(mMetricsMutex);
    for (auto& pair : mMetricsManagers) {
        const ConfigKey& key = pair.first;
        vector<uint8_t> data;
+3 −3
Original line number Diff line number Diff line
@@ -57,7 +57,7 @@ public:
    void WriteDataToDisk();

private:
    mutable mutex mBroadcastTimesMutex;
    mutable mutex mMetricsMutex;

    std::unordered_map<ConfigKey, sp<MetricsManager>> mMetricsManagers;

@@ -72,7 +72,7 @@ private:

    /* Check if we should send a broadcast if approaching memory limits and if we're over, we
     * actually delete the data. */
    void flushIfNecessary(uint64_t timestampNs, const ConfigKey& key,
    void flushIfNecessaryLocked(uint64_t timestampNs, const ConfigKey& key,
                                MetricsManager& metricsManager);

    // Function used to send a broadcast so that receiver for the config key can call getData
+6 −6
Original line number Diff line number Diff line
@@ -60,9 +60,9 @@ TEST(StatsLogProcessorTest, TestRateLimitByteSize) {
    // Expect only the first flush to trigger a check for byte size since the last two are
    // rate-limited.
    EXPECT_CALL(mockMetricsManager, byteSize()).Times(1);
    p.flushIfNecessary(99, key, mockMetricsManager);
    p.flushIfNecessary(100, key, mockMetricsManager);
    p.flushIfNecessary(101, key, mockMetricsManager);
    p.flushIfNecessaryLocked(99, key, mockMetricsManager);
    p.flushIfNecessaryLocked(100, key, mockMetricsManager);
    p.flushIfNecessaryLocked(101, key, mockMetricsManager);
}

TEST(StatsLogProcessorTest, TestRateLimitBroadcast) {
@@ -80,12 +80,12 @@ TEST(StatsLogProcessorTest, TestRateLimitBroadcast) {
            .WillRepeatedly(Return(int(StatsdStats::kMaxMetricsBytesPerConfig * .95)));

    // Expect only one broadcast despite always returning a size that should trigger broadcast.
    p.flushIfNecessary(1, key, mockMetricsManager);
    p.flushIfNecessaryLocked(1, key, mockMetricsManager);
    EXPECT_EQ(1, broadcastCount);

    // This next call to flush should not trigger a broadcast.
    p.mLastByteSizeTimes.clear();  // Force another check for byte size.
    p.flushIfNecessary(2, key, mockMetricsManager);
    p.flushIfNecessaryLocked(2, key, mockMetricsManager);
    EXPECT_EQ(1, broadcastCount);
}

@@ -106,7 +106,7 @@ TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge) {
    EXPECT_CALL(mockMetricsManager, onDumpReport(_)).Times(1);

    // Expect to call the onDumpReport and skip the broadcast.
    p.flushIfNecessary(1, key, mockMetricsManager);
    p.flushIfNecessaryLocked(1, key, mockMetricsManager);
    EXPECT_EQ(0, broadcastCount);
}