Loading cmds/statsd/src/StatsLogProcessor.cpp +5 −11 Original line number Diff line number Diff line Loading @@ -111,9 +111,7 @@ void StatsLogProcessor::OnConfigUpdated(const ConfigKey& key, const StatsdConfig unique_ptr<MetricsManager> newMetricsManager = std::make_unique<MetricsManager>(key, config); auto it = mMetricsManagers.find(key); if (it != mMetricsManagers.end()) { it->second->finish(); } else if (mMetricsManagers.size() > StatsdStats::kMaxConfigCount) { if (it == mMetricsManagers.end() && mMetricsManagers.size() > StatsdStats::kMaxConfigCount) { ALOGE("Can't accept more configs!"); return; } Loading Loading @@ -167,11 +165,7 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outD // First, fill in ConfigMetricsReport using current data on memory, which // starts from filling in StatsLogReport's. for (auto& m : it->second->onDumpReport()) { // Add each vector of StatsLogReport into a repeated field. proto.write(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_METRICS, reinterpret_cast<char*>(m.get()->data()), m.get()->size()); } it->second->onDumpReport(&proto); // Fill in UidMap. auto uidMap = mUidMap->getOutput(key); Loading Loading @@ -205,7 +199,6 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outD void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) { auto it = mMetricsManagers.find(key); if (it != mMetricsManagers.end()) { it->second->finish(); mMetricsManagers.erase(it); mUidMap->OnConfigRemoved(key); } Loading @@ -231,8 +224,9 @@ void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs, const ConfigKey& mLastByteSizeTimes[key] = timestampNs; if (totalBytes > StatsdStats::kMaxMetricsBytesPerConfig) { // Too late. We need to start clearing data. // We ignore the return value so we force each metric producer to clear its contents. metricsManager.onDumpReport(); // TODO(b/70571383): By 12/15/2017 add API to drop data directly ProtoOutputStream proto; metricsManager.onDumpReport(&proto); StatsdStats::getInstance().noteDataDropped(key); VLOG("StatsD had to toss out metrics for %s", key.ToString().c_str()); } else if (totalBytes > .9 * StatsdStats::kMaxMetricsBytesPerConfig) { Loading cmds/statsd/src/metrics/CountMetricProducer.cpp +28 −45 Original line number Diff line number Diff line Loading @@ -83,8 +83,6 @@ CountMetricProducer::CountMetricProducer(const ConfigKey& key, const CountMetric mConditionSliced = true; } startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); } Loading @@ -93,27 +91,18 @@ CountMetricProducer::~CountMetricProducer() { VLOG("~CountMetricProducer() called"); } void CountMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS); } void CountMetricProducer::finish() { } void CountMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); } std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReportLocked() { long long endTime = time(nullptr) * NS_PER_SEC; void CountMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, ProtoOutputStream* protoOutput) { flushIfNeededLocked(dumpTimeNs); protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs); long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS); // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeededLocked(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& counter : mPastBuckets) { Loading @@ -125,52 +114,46 @@ std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReportLocked() continue; } long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); // First fill dimension (KeyValuePairs). for (const auto& kv : it->second) { long long dimensionToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); long long dimensionToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); if (kv.has_value_str()) { mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); } else if (kv.has_value_int()) { mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); } else if (kv.has_value_bool()) { mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); } else if (kv.has_value_float()) { mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); } mProto->end(dimensionToken); protoOutput->end(dimensionToken); } // Then fill bucket_info (CountBucketInfo). for (const auto& bucket : counter.second) { long long bucketInfoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, long long bucketInfoToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, (long long)bucket.mBucketStartNs); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, (long long)bucket.mBucketEndNs); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)bucket.mCount); mProto->end(bucketInfoToken); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)bucket.mCount); protoOutput->end(bucketInfoToken); VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.mCount); } mProto->end(wrapperToken); protoOutput->end(wrapperToken); } mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); protoOutput->end(protoToken); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs); VLOG("metric %s dump report now...", mMetric.name().c_str()); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(); startNewProtoOutputStreamLocked(endTime); mPastBuckets.clear(); return buffer; mStartTimeNs = mCurrentBucketStartTimeNs; // TODO: Clear mDimensionKeyMap once the report is dumped. } Loading cmds/statsd/src/metrics/CountMetricProducer.h +2 −7 Original line number Diff line number Diff line Loading @@ -48,8 +48,6 @@ public: virtual ~CountMetricProducer(); void finish() override; // TODO: Implement this later. virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version) override{}; Loading @@ -63,8 +61,8 @@ protected: const LogEvent& event, bool scheduledPull) override; private: // TODO: Pass a timestamp as a parameter in onDumpReport. std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override; void onDumpReportLocked(const uint64_t dumpTimeNs, android::util::ProtoOutputStream* protoOutput) override; // Internal interface to handle condition change. void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; Loading @@ -78,9 +76,6 @@ private: // Util function to flush the old packet. void flushIfNeededLocked(const uint64_t& newEventTime); // Util function to init/reset the proto output stream. void startNewProtoOutputStreamLocked(long long timestamp); const CountMetric mMetric; // TODO: Add a lock to mPastBuckets. Loading cmds/statsd/src/metrics/DurationMetricProducer.cpp +28 −44 Original line number Diff line number Diff line Loading @@ -93,8 +93,6 @@ DurationMetricProducer::DurationMetricProducer(const ConfigKey& key, const Durat mConditionSliced = true; } startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); } Loading @@ -114,13 +112,6 @@ sp<AnomalyTracker> DurationMetricProducer::createAnomalyTracker(const Alert &ale return new AnomalyTracker(alert, mConfigKey); } void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS); } unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( const HashableDimensionKey& eventKey) const { switch (mMetric.aggregation_type()) { Loading @@ -135,11 +126,6 @@ unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( } } void DurationMetricProducer::finish() { // TODO: write the StatsLogReport to dropbox using // DropboxWriter. } void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); flushIfNeededLocked(eventTime); Loading @@ -161,13 +147,14 @@ void DurationMetricProducer::onConditionChangedLocked(const bool conditionMet, } } std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked() { long long endTime = time(nullptr) * NS_PER_SEC; void DurationMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, ProtoOutputStream* protoOutput) { flushIfNeededLocked(dumpTimeNs); protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs); long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS); // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeededLocked(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& pair : mPastBuckets) { Loading @@ -180,49 +167,46 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked } long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); // First fill dimension (KeyValuePairs). for (const auto& kv : it->second) { long long dimensionToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); long long dimensionToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); if (kv.has_value_str()) { mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); } else if (kv.has_value_int()) { mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); } else if (kv.has_value_bool()) { mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); } else if (kv.has_value_float()) { mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); } mProto->end(dimensionToken); protoOutput->end(dimensionToken); } // Then fill bucket_info (DurationBucketInfo). for (const auto& bucket : pair.second) { long long bucketInfoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, long long bucketInfoToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, (long long)bucket.mBucketStartNs); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, (long long)bucket.mBucketEndNs); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration); mProto->end(bucketInfoToken); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration); protoOutput->end(bucketInfoToken); VLOG("\t bucket [%lld - %lld] duration: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.mDuration); } mProto->end(wrapperToken); protoOutput->end(wrapperToken); } mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(); startNewProtoOutputStreamLocked(endTime); protoOutput->end(protoToken); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs); mPastBuckets.clear(); return buffer; mStartTimeNs = mCurrentBucketStartTimeNs; } void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) { Loading cmds/statsd/src/metrics/DurationMetricProducer.h +2 −7 Original line number Diff line number Diff line Loading @@ -47,8 +47,6 @@ public: virtual sp<AnomalyTracker> createAnomalyTracker(const Alert &alert) override; void finish() override; // TODO: Implement this later. virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version) override{}; Loading @@ -62,8 +60,8 @@ protected: const LogEvent& event, bool scheduledPull) override; private: // TODO: Pass a timestamp as a parameter in onDumpReport. std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override; void onDumpReportLocked(const uint64_t dumpTimeNs, android::util::ProtoOutputStream* protoOutput) override; // Internal interface to handle condition change. void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; Loading @@ -77,9 +75,6 @@ private: // Util function to flush the old packet. void flushIfNeededLocked(const uint64_t& eventTime); // Util function to init/reset the proto output stream. void startNewProtoOutputStreamLocked(long long timestamp); const DurationMetric mMetric; // Index of the SimpleAtomMatcher which defines the start. Loading Loading
cmds/statsd/src/StatsLogProcessor.cpp +5 −11 Original line number Diff line number Diff line Loading @@ -111,9 +111,7 @@ void StatsLogProcessor::OnConfigUpdated(const ConfigKey& key, const StatsdConfig unique_ptr<MetricsManager> newMetricsManager = std::make_unique<MetricsManager>(key, config); auto it = mMetricsManagers.find(key); if (it != mMetricsManagers.end()) { it->second->finish(); } else if (mMetricsManagers.size() > StatsdStats::kMaxConfigCount) { if (it == mMetricsManagers.end() && mMetricsManagers.size() > StatsdStats::kMaxConfigCount) { ALOGE("Can't accept more configs!"); return; } Loading Loading @@ -167,11 +165,7 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outD // First, fill in ConfigMetricsReport using current data on memory, which // starts from filling in StatsLogReport's. for (auto& m : it->second->onDumpReport()) { // Add each vector of StatsLogReport into a repeated field. proto.write(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_METRICS, reinterpret_cast<char*>(m.get()->data()), m.get()->size()); } it->second->onDumpReport(&proto); // Fill in UidMap. auto uidMap = mUidMap->getOutput(key); Loading Loading @@ -205,7 +199,6 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outD void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) { auto it = mMetricsManagers.find(key); if (it != mMetricsManagers.end()) { it->second->finish(); mMetricsManagers.erase(it); mUidMap->OnConfigRemoved(key); } Loading @@ -231,8 +224,9 @@ void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs, const ConfigKey& mLastByteSizeTimes[key] = timestampNs; if (totalBytes > StatsdStats::kMaxMetricsBytesPerConfig) { // Too late. We need to start clearing data. // We ignore the return value so we force each metric producer to clear its contents. metricsManager.onDumpReport(); // TODO(b/70571383): By 12/15/2017 add API to drop data directly ProtoOutputStream proto; metricsManager.onDumpReport(&proto); StatsdStats::getInstance().noteDataDropped(key); VLOG("StatsD had to toss out metrics for %s", key.ToString().c_str()); } else if (totalBytes > .9 * StatsdStats::kMaxMetricsBytesPerConfig) { Loading
cmds/statsd/src/metrics/CountMetricProducer.cpp +28 −45 Original line number Diff line number Diff line Loading @@ -83,8 +83,6 @@ CountMetricProducer::CountMetricProducer(const ConfigKey& key, const CountMetric mConditionSliced = true; } startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); } Loading @@ -93,27 +91,18 @@ CountMetricProducer::~CountMetricProducer() { VLOG("~CountMetricProducer() called"); } void CountMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS); } void CountMetricProducer::finish() { } void CountMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); } std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReportLocked() { long long endTime = time(nullptr) * NS_PER_SEC; void CountMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, ProtoOutputStream* protoOutput) { flushIfNeededLocked(dumpTimeNs); protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs); long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS); // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeededLocked(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& counter : mPastBuckets) { Loading @@ -125,52 +114,46 @@ std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReportLocked() continue; } long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); // First fill dimension (KeyValuePairs). for (const auto& kv : it->second) { long long dimensionToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); long long dimensionToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); if (kv.has_value_str()) { mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); } else if (kv.has_value_int()) { mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); } else if (kv.has_value_bool()) { mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); } else if (kv.has_value_float()) { mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); } mProto->end(dimensionToken); protoOutput->end(dimensionToken); } // Then fill bucket_info (CountBucketInfo). for (const auto& bucket : counter.second) { long long bucketInfoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, long long bucketInfoToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, (long long)bucket.mBucketStartNs); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, (long long)bucket.mBucketEndNs); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)bucket.mCount); mProto->end(bucketInfoToken); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)bucket.mCount); protoOutput->end(bucketInfoToken); VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.mCount); } mProto->end(wrapperToken); protoOutput->end(wrapperToken); } mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); protoOutput->end(protoToken); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs); VLOG("metric %s dump report now...", mMetric.name().c_str()); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(); startNewProtoOutputStreamLocked(endTime); mPastBuckets.clear(); return buffer; mStartTimeNs = mCurrentBucketStartTimeNs; // TODO: Clear mDimensionKeyMap once the report is dumped. } Loading
cmds/statsd/src/metrics/CountMetricProducer.h +2 −7 Original line number Diff line number Diff line Loading @@ -48,8 +48,6 @@ public: virtual ~CountMetricProducer(); void finish() override; // TODO: Implement this later. virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version) override{}; Loading @@ -63,8 +61,8 @@ protected: const LogEvent& event, bool scheduledPull) override; private: // TODO: Pass a timestamp as a parameter in onDumpReport. std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override; void onDumpReportLocked(const uint64_t dumpTimeNs, android::util::ProtoOutputStream* protoOutput) override; // Internal interface to handle condition change. void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; Loading @@ -78,9 +76,6 @@ private: // Util function to flush the old packet. void flushIfNeededLocked(const uint64_t& newEventTime); // Util function to init/reset the proto output stream. void startNewProtoOutputStreamLocked(long long timestamp); const CountMetric mMetric; // TODO: Add a lock to mPastBuckets. Loading
cmds/statsd/src/metrics/DurationMetricProducer.cpp +28 −44 Original line number Diff line number Diff line Loading @@ -93,8 +93,6 @@ DurationMetricProducer::DurationMetricProducer(const ConfigKey& key, const Durat mConditionSliced = true; } startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); } Loading @@ -114,13 +112,6 @@ sp<AnomalyTracker> DurationMetricProducer::createAnomalyTracker(const Alert &ale return new AnomalyTracker(alert, mConfigKey); } void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique<ProtoOutputStream>(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS); } unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( const HashableDimensionKey& eventKey) const { switch (mMetric.aggregation_type()) { Loading @@ -135,11 +126,6 @@ unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker( } } void DurationMetricProducer::finish() { // TODO: write the StatsLogReport to dropbox using // DropboxWriter. } void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); flushIfNeededLocked(eventTime); Loading @@ -161,13 +147,14 @@ void DurationMetricProducer::onConditionChangedLocked(const bool conditionMet, } } std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked() { long long endTime = time(nullptr) * NS_PER_SEC; void DurationMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, ProtoOutputStream* protoOutput) { flushIfNeededLocked(dumpTimeNs); protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs); long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS); // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. flushIfNeededLocked(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& pair : mPastBuckets) { Loading @@ -180,49 +167,46 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked } long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); // First fill dimension (KeyValuePairs). for (const auto& kv : it->second) { long long dimensionToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); long long dimensionToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); if (kv.has_value_str()) { mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); } else if (kv.has_value_int()) { mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); } else if (kv.has_value_bool()) { mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); } else if (kv.has_value_float()) { mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); } mProto->end(dimensionToken); protoOutput->end(dimensionToken); } // Then fill bucket_info (DurationBucketInfo). for (const auto& bucket : pair.second) { long long bucketInfoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, long long bucketInfoToken = protoOutput->start( FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, (long long)bucket.mBucketStartNs); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, (long long)bucket.mBucketEndNs); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration); mProto->end(bucketInfoToken); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration); protoOutput->end(bucketInfoToken); VLOG("\t bucket [%lld - %lld] duration: %lld", (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, (long long)bucket.mDuration); } mProto->end(wrapperToken); protoOutput->end(wrapperToken); } mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(); startNewProtoOutputStreamLocked(endTime); protoOutput->end(protoToken); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs); mPastBuckets.clear(); return buffer; mStartTimeNs = mCurrentBucketStartTimeNs; } void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) { Loading
cmds/statsd/src/metrics/DurationMetricProducer.h +2 −7 Original line number Diff line number Diff line Loading @@ -47,8 +47,6 @@ public: virtual sp<AnomalyTracker> createAnomalyTracker(const Alert &alert) override; void finish() override; // TODO: Implement this later. virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version) override{}; Loading @@ -62,8 +60,8 @@ protected: const LogEvent& event, bool scheduledPull) override; private: // TODO: Pass a timestamp as a parameter in onDumpReport. std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override; void onDumpReportLocked(const uint64_t dumpTimeNs, android::util::ProtoOutputStream* protoOutput) override; // Internal interface to handle condition change. void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; Loading @@ -77,9 +75,6 @@ private: // Util function to flush the old packet. void flushIfNeededLocked(const uint64_t& eventTime); // Util function to init/reset the proto output stream. void startNewProtoOutputStreamLocked(long long timestamp); const DurationMetric mMetric; // Index of the SimpleAtomMatcher which defines the start. Loading