Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1d7b0cd6 authored by David Chen's avatar David Chen
Browse files

Support StatsD sending broadcasts.

StatsD will send a broadcast when we're 90% of the way to our
allocated memory limit for the configuration. If the memory usage
goes over the limit, we just lose all the data for this config.

Also modifies the adb shell commands to facilitate debugging of the
broadcasts.

Test: Manually tested on marlin-eng with custom gmscore code.

Change-Id: I517a15bd4c959aa221802f84a51f13141a725102
parent 24c99247
Loading
Loading
Loading
Loading
+48 −40
Original line number Diff line number Diff line
@@ -50,8 +50,8 @@ const int FIELD_ID_UID = 1;
const int FIELD_ID_NAME = 2;

StatsLogProcessor::StatsLogProcessor(const sp<UidMap>& uidMap,
                                     const std::function<void(const vector<uint8_t>&)>& pushLog)
    : mUidMap(uidMap), mPushLog(pushLog) {
                                     const std::function<void(const ConfigKey&)>& sendBroadcast)
    : mUidMap(uidMap), mSendBroadcast(sendBroadcast) {
}

StatsLogProcessor::~StatsLogProcessor() {
@@ -102,12 +102,27 @@ void StatsLogProcessor::OnConfigUpdated(const ConfigKey& key, const StatsdConfig
    }
}

vector<uint8_t> StatsLogProcessor::onDumpReport(const ConfigKey& key) {
size_t StatsLogProcessor::GetMetricsSize(const ConfigKey& key) {
    auto it = mMetricsManagers.find(key);
    if (it == mMetricsManagers.end()) {
        ALOGW("Config source %s does not exist", key.ToString().c_str());
        return vector<uint8_t>();
        return 0;
    }
    return it->second->byteSize();
}

void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outData) {
    auto it = mMetricsManagers.find(key);
    if (it == mMetricsManagers.end()) {
        ALOGW("Config source %s does not exist", key.ToString().c_str());
        return;
    }

    // This allows another broadcast to be sent within the rate-limit period if we get close to
    // filling the buffer again soon.
    mBroadcastTimesMutex.lock();
    mLastBroadcastTimes.erase(key);
    mBroadcastTimesMutex.unlock();

    ProtoOutputStream proto;

@@ -131,17 +146,18 @@ vector<uint8_t> StatsLogProcessor::onDumpReport(const ConfigKey& key) {
    uidMap.SerializeToArray(&uidMapBuffer[0], uidMapSize);
    proto.write(FIELD_TYPE_MESSAGE | FIELD_ID_UID_MAP, uidMapBuffer, uidMapSize);

    vector<uint8_t> buffer(proto.size());
    if (outData != nullptr) {
        outData->clear();
        outData->resize(proto.size());
        size_t pos = 0;
        auto iter = proto.data();
        while (iter.readBuffer() != NULL) {
            size_t toRead = iter.currentToRead();
        std::memcpy(&buffer[pos], iter.readBuffer(), toRead);
            std::memcpy(&((*outData)[pos]), iter.readBuffer(), toRead);
            pos += toRead;
            iter.rp()->move(toRead);
        }

    return buffer;
    }
}

void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
@@ -151,40 +167,32 @@ void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
        mMetricsManagers.erase(it);
        mUidMap->OnConfigRemoved(key);
    }
    auto flushTime = mLastFlushTimes.find(key);
    if (flushTime != mLastFlushTimes.end()) {
        mLastFlushTimes.erase(flushTime);
    }

    std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);
    mLastBroadcastTimes.erase(key);
}

void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs,
                                         const ConfigKey& key,
                                         const unique_ptr<MetricsManager>& metricsManager) {
    auto lastFlushNs = mLastFlushTimes.find(key);
    if (lastFlushNs != mLastFlushTimes.end()) {
        if (timestampNs - lastFlushNs->second < kMinFlushPeriod) {
            return;
        }
    }
    std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);

    size_t totalBytes = metricsManager->byteSize();
    if (totalBytes > kMaxSerializedBytes) {
        flush();
        mLastFlushTimes[key] = std::move(timestampNs);
    if (totalBytes > .9 * kMaxSerializedBytes) { // Send broadcast so that receivers can pull data.
        auto lastFlushNs = mLastBroadcastTimes.find(key);
        if (lastFlushNs != mLastBroadcastTimes.end()) {
            if (timestampNs - lastFlushNs->second < kMinBroadcastPeriod) {
                return;
            }
        }

void StatsLogProcessor::flush() {
    // TODO: Take ConfigKey as an argument and flush metrics related to the
    // ConfigKey. Also, create a wrapper that holds a repeated field of
    // StatsLogReport's.
    /*
    StatsLogReport logReport;
    const int numBytes = logReport.ByteSize();
    vector<uint8_t> logReportBuffer(numBytes);
    logReport.SerializeToArray(&logReportBuffer[0], numBytes);
    mPushLog(logReportBuffer);
    */
        mLastBroadcastTimes[key] = timestampNs;
        ALOGD("StatsD requesting broadcast for %s", key.ToString().c_str());
        mSendBroadcast(key);
    } else if (totalBytes > kMaxSerializedBytes) { // Too late. We need to start clearing data.
        // We ignore the return value so we force each metric producer to clear its contents.
        metricsManager->onDumpReport();
        ALOGD("StatsD had to toss out metrics for %s", key.ToString().c_str());
    }
}

}  // namespace statsd
+13 −11
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ namespace statsd {
class StatsLogProcessor : public ConfigListener {
public:
    StatsLogProcessor(const sp<UidMap>& uidMap,
                      const std::function<void(const vector<uint8_t>&)>& pushLog);
                      const std::function<void(const ConfigKey&)>& sendBroadcast);
    virtual ~StatsLogProcessor();

    virtual void OnLogEvent(const LogEvent& event);
@@ -41,15 +41,16 @@ public:
    void OnConfigUpdated(const ConfigKey& key, const StatsdConfig& config);
    void OnConfigRemoved(const ConfigKey& key);

    vector<uint8_t> onDumpReport(const ConfigKey& key);
    size_t GetMetricsSize(const ConfigKey& key);

    /* Request a flush through a binder call. */
    void flush();
    void onDumpReport(const ConfigKey& key, vector<uint8_t>* outData);

private:
    mutable mutex mBroadcastTimesMutex;

    std::unordered_map<ConfigKey, std::unique_ptr<MetricsManager>> mMetricsManagers;

    std::unordered_map<ConfigKey, long> mLastFlushTimes;
    std::unordered_map<ConfigKey, long> mLastBroadcastTimes;

    sp<UidMap> mUidMap;  // Reference to the UidMap to lookup app name and version for each uid.

@@ -60,17 +61,18 @@ private:
     */
    static const size_t kMaxSerializedBytes = 16 * 1024;

    /* Check if the buffer size exceeds the max buffer size when the new entry is added, and flush
       the logs to callback clients if true. */
    /* Check if we should send a broadcast if approaching memory limits and if we're over, we
     * actually delete the data. */
    void flushIfNecessary(uint64_t timestampNs,
                          const ConfigKey& key,
                          const unique_ptr<MetricsManager>& metricsManager);

    std::function<void(const vector<uint8_t>&)> mPushLog;
    // Function used to send a broadcast so that receiver for the config key can call getData
    // to retrieve the stored data.
    std::function<void(const ConfigKey& key)> mSendBroadcast;

    /* Minimum period between two flushes in nanoseconds. Currently set to 10
     * minutes. */
    static const unsigned long long kMinFlushPeriod = 600 * NS_PER_SEC;
    /* Minimum period between two broadcasts in nanoseconds. Currently set to 60 seconds. */
    static const unsigned long long kMinBroadcastPeriod = 60 * NS_PER_SEC;
};

}  // namespace statsd
+85 −17
Original line number Diff line number Diff line
@@ -73,8 +73,18 @@ StatsService::StatsService(const sp<Looper>& handlerLooper)
{
    mUidMap = new UidMap();
    mConfigManager = new ConfigManager();
    mProcessor = new StatsLogProcessor(mUidMap, [](const vector<uint8_t>& log) {
        // TODO: Update how we send data out of StatsD.
    mProcessor = new StatsLogProcessor(mUidMap, [this](const ConfigKey& key) {
        auto sc = getStatsCompanionService();
        auto receiver = mConfigManager->GetConfigReceiver(key);
        if (sc == nullptr) {
            ALOGD("Could not find StatsCompanionService");
        } else if (receiver.first.size() == 0) {
            ALOGD("Statscompanion could not find a broadcast receiver for %s",
                  key.ToString().c_str());
        } else {
            sc->sendBroadcast(String16(receiver.first.c_str()),
                              String16(receiver.second.c_str()));
        }
    });

    mConfigManager->AddListener(mProcessor);
@@ -206,7 +216,11 @@ status_t StatsService::command(FILE* in, FILE* out, FILE* err, Vector<String8>&
        }

        if (!args[0].compare(String8("send-broadcast"))) {
            return cmd_trigger_broadcast(args);
            return cmd_trigger_broadcast(out, args);
        }

        if (!args[0].compare(String8("print-stats"))) {
            return cmd_print_stats(out);
        }

        if (!args[0].compare(String8("clear-config"))) {
@@ -259,16 +273,56 @@ void StatsService::print_cmd_help(FILE* out) {
    fprintf(out, "  NAME          The name of the configuration\n");
    fprintf(out, "\n");
    fprintf(out, "\n");
    fprintf(out, "usage: adb shell cmd stats send-broadcast PACKAGE CLASS\n");
    fprintf(out, "  Send a broadcast that triggers one subscriber to fetch metrics.\n");
    fprintf(out, "  PACKAGE        The name of the package to receive the broadcast.\n");
    fprintf(out, "  CLASS          The name of the class to receive the broadcast.\n");
    fprintf(out, "usage: adb shell cmd stats send-broadcast [UID] NAME\n");
    fprintf(out, "  Send a broadcast that triggers the subscriber to fetch metrics.\n");
    fprintf(out, "  UID           The uid of the configuration. It is only possible to pass\n");
    fprintf(out, "                the UID parameter on eng builds. If UID is omitted the\n");
    fprintf(out, "                calling uid is used.\n");
    fprintf(out, "  NAME          The name of the configuration\n");
    fprintf(out, "\n");
    fprintf(out, "\n");
    fprintf(out, "usage: adb shell cmd stats print-stats\n");
    fprintf(out, "  Prints some basic stats.\n");
}

status_t StatsService::cmd_trigger_broadcast(Vector<String8>& args) {
status_t StatsService::cmd_trigger_broadcast(FILE* out, Vector<String8>& args) {
    string name;
    bool good = false;
    int uid;
    const int argCount = args.size();
    if (argCount == 2) {
        // Automatically pick the UID
        uid = IPCThreadState::self()->getCallingUid();
        // TODO: What if this isn't a binder call? Should we fail?
        name.assign(args[1].c_str(), args[1].size());
        good = true;
    } else if (argCount == 3) {
        // If it's a userdebug or eng build, then the shell user can
        // impersonate other uids.
        if (mEngBuild) {
            const char* s = args[1].c_str();
            if (*s != '\0') {
                char* end = NULL;
                uid = strtol(s, &end, 0);
                if (*end == '\0') {
                    name.assign(args[2].c_str(), args[2].size());
                    good = true;
                }
            }
        } else {
            fprintf(out,
                    "The metrics can only be dumped for other UIDs on eng or userdebug "
                            "builds.\n");
        }
    }
    if (!good) {
        print_cmd_help(out);
        return UNKNOWN_ERROR;
    }
    auto receiver = mConfigManager->GetConfigReceiver(ConfigKey(uid, name));
    auto sc = getStatsCompanionService();
    sc->sendBroadcast(String16(args[1]), String16(args[2]));
    ALOGD("StatsService::trigger broadcast succeeded");
    sc->sendBroadcast(String16(receiver.first.c_str()), String16(receiver.second.c_str()));
    ALOGD("StatsService::trigger broadcast succeeded to %s, %s", args[1].c_str(), args[2].c_str());
    return NO_ERROR;
}

@@ -373,7 +427,8 @@ status_t StatsService::cmd_dump_report(FILE* out, FILE* err, const Vector<String
            }
        }
        if (good) {
            mProcessor->onDumpReport(ConfigKey(uid, name));
            vector<uint8_t> data;
            mProcessor->onDumpReport(ConfigKey(uid, name), &data);
            // TODO: print the returned StatsLogReport to file instead of printing to logcat.
            fprintf(out, "Dump report for Config [%d,%s]\n", uid, name.c_str());
            fprintf(out, "See the StatsLogReport in logcat...\n");
@@ -389,6 +444,15 @@ status_t StatsService::cmd_dump_report(FILE* out, FILE* err, const Vector<String
    }
}

status_t StatsService::cmd_print_stats(FILE* out) {
    vector<ConfigKey> configs = mConfigManager->GetAllConfigKeys();
    for (const ConfigKey& key : configs) {
        fprintf(out, "Config %s uses %zu bytes\n", key.ToString().c_str(),
                mProcessor->GetMetricsSize(key));
    }
    return NO_ERROR;
}

status_t StatsService::cmd_print_stats_log(FILE* out, const Vector<String8>& args) {
    long msec = 0;

@@ -575,8 +639,12 @@ void StatsService::OnLogEvent(const LogEvent& event) {

Status StatsService::getData(const String16& key, vector <uint8_t>* output) {
    IPCThreadState* ipc = IPCThreadState::self();
    ALOGD("StatsService::getData with Pid %i, Uid %i", ipc->getCallingPid(),
          ipc->getCallingUid());
    if (checkCallingPermission(String16(kPermissionDump))) {
        // TODO: Implement this.
        string keyStr = string(String8(key).string());
        ConfigKey configKey(ipc->getCallingUid(), keyStr);
        mProcessor->onDumpReport(configKey, output);
        return Status::ok();
    } else {
        return Status::fromExceptionCode(binder::Status::EX_SECURITY);
@@ -588,10 +656,9 @@ Status StatsService::addConfiguration(const String16& key,
                                      const String16& package, const String16& cls,
                                      bool* success) {
    IPCThreadState* ipc = IPCThreadState::self();
    int32_t* uid = reinterpret_cast<int32_t*>(ipc->getCallingUid());
    if (checkCallingPermission(String16(kPermissionDump))) {
        string keyString = string(String8(key).string());
        ConfigKey configKey(*uid, keyString);
        ConfigKey configKey(ipc->getCallingUid(), keyString);
        StatsdConfig cfg;
        cfg.ParseFromArray(&config[0], config.size());
        mConfigManager->UpdateConfig(configKey, cfg);
@@ -607,7 +674,8 @@ Status StatsService::addConfiguration(const String16& key,
Status StatsService::removeConfiguration(const String16& key, bool* success) {
    IPCThreadState* ipc = IPCThreadState::self();
    if (checkCallingPermission(String16(kPermissionDump))) {
        // TODO: Implement this.
        string keyStr = string(String8(key).string());
        mConfigManager->RemoveConfig(ConfigKey(ipc->getCallingUid(), keyStr));
        return Status::ok();
    } else {
        *success = false;
+6 −1
Original line number Diff line number Diff line
@@ -124,13 +124,18 @@ private:
    /**
     * Trigger a broadcast.
     */
    status_t cmd_trigger_broadcast(Vector<String8>& args);
    status_t cmd_trigger_broadcast(FILE* out, Vector<String8>& args);

    /**
     * Handle the config sub-command.
     */
    status_t cmd_config(FILE* in, FILE* out, FILE* err, Vector<String8>& args);

    /**
     * Prints some basic stats to std out.
     */
    status_t cmd_print_stats(FILE* out);

    /**
     * Print the event log.
     */
+22 −0
Original line number Diff line number Diff line
@@ -134,12 +134,34 @@ void ConfigManager::RemoveConfigs(int uid) {
    }
}

vector<ConfigKey> ConfigManager::GetAllConfigKeys() {
    vector<ConfigKey> ret;
    for (auto it = mConfigs.cbegin(); it != mConfigs.cend(); ++it) {
        ret.push_back(it->first);
    }
    return ret;
}

const pair<string, string> ConfigManager::GetConfigReceiver(const ConfigKey& key) {
    auto it = mConfigReceivers.find(key);
    if (it == mConfigReceivers.end()) {
        return pair<string,string>();
    } else {
        return it->second;
    }
}

void ConfigManager::Dump(FILE* out) {
    fprintf(out, "CONFIGURATIONS (%d)\n", (int)mConfigs.size());
    fprintf(out, "     uid name\n");
    for (unordered_map<ConfigKey, StatsdConfig>::const_iterator it = mConfigs.begin();
         it != mConfigs.end(); it++) {
        fprintf(out, "  %6d %s\n", it->first.GetUid(), it->first.GetName().c_str());
        auto receiverIt = mConfigReceivers.find(it->first);
        if (receiverIt != mConfigReceivers.end()) {
            fprintf(out, "    -> received by %s, %s\n", receiverIt->second.first.c_str(),
                    receiverIt->second.second.c_str());
        }
        // TODO: Print the contents of the config too.
    }
}
Loading