Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2b0f8867 authored by yro's avatar yro
Browse files

Migrate all remaining MetricProducers to use ProtoOutputStream

Test: statsd, statsd_test
Change-Id: I1087e1c1ffb372ca288dfc575cb7a372b11ce8c5
parent 6c5c1d73
Loading
Loading
Loading
Loading
+7 −12
Original line number Diff line number Diff line
@@ -24,7 +24,11 @@
#include <limits.h>
#include <stdlib.h>

using namespace android::util;
using android::util::FIELD_TYPE_BOOL;
using android::util::FIELD_TYPE_FLOAT;
using android::util::FIELD_TYPE_INT32;
using android::util::FIELD_TYPE_INT64;
using android::util::FIELD_TYPE_MESSAGE;
using android::util::ProtoOutputStream;
using std::map;
using std::string;
@@ -166,17 +170,8 @@ StatsLogReport CountMetricProducer::onDumpReport() {
    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                  (long long)mCurrentBucketStartTimeNs);

    size_t bufferSize = mProto->size();
    std::unique_ptr<uint8_t[]> buffer(new uint8_t[bufferSize]);

    size_t pos = 0;
    auto it = mProto->data();
    while (it.readBuffer() != NULL) {
        size_t toRead = it.currentToRead();
        std::memcpy(&buffer[pos], it.readBuffer(), toRead);
        pos += toRead;
        it.rp()->move(toRead);
    }
    VLOG("metric %lld dump report now...", mMetric.metric_id());
    std::unique_ptr<uint8_t[]> buffer = serializeProto();

    startNewProtoOutputStream(endTime);
    mPastBuckets.clear();
+4 −6
Original line number Diff line number Diff line
@@ -53,6 +53,7 @@ public:

    void finish() override;

    // TODO: Pass a timestamp as a parameter in onDumpReport.
    StatsLogReport onDumpReport() override;

    void onSlicedConditionMayChange(const uint64_t eventTime) override;
@@ -70,9 +71,12 @@ protected:
                                   bool condition, const LogEvent& event,
                                   bool scheduledPull) override;

    void startNewProtoOutputStream(long long timestamp) override;

private:
    const CountMetric mMetric;

    // TODO: Add a lock to mPastBuckets.
    std::unordered_map<HashableDimensionKey, std::vector<CountBucket>> mPastBuckets;

    size_t mByteSize;
@@ -84,12 +88,6 @@ private:

    void flushCounterIfNeeded(const uint64_t newEventTime);

    std::unique_ptr<android::util::ProtoOutputStream> mProto;

    long long mProtoToken;

    void startNewProtoOutputStream(long long timestamp);

    FRIEND_TEST(CountMetricProducerTest, TestNonDimensionalEvents);
    FRIEND_TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition);
    FRIEND_TEST(CountMetricProducerTest, TestEventsWithSlicedCondition);
+93 −17
Original line number Diff line number Diff line
@@ -23,6 +23,12 @@
#include <limits.h>
#include <stdlib.h>

using android::util::FIELD_TYPE_BOOL;
using android::util::FIELD_TYPE_FLOAT;
using android::util::FIELD_TYPE_INT32;
using android::util::FIELD_TYPE_INT64;
using android::util::FIELD_TYPE_MESSAGE;
using android::util::ProtoOutputStream;
using std::string;
using std::unordered_map;
using std::vector;
@@ -31,6 +37,27 @@ namespace android {
namespace os {
namespace statsd {

// for StatsLogReport
const int FIELD_ID_METRIC_ID = 1;
const int FIELD_ID_START_REPORT_NANOS = 2;
const int FIELD_ID_END_REPORT_NANOS = 3;
const int FIELD_ID_DURATION_METRICS = 6;
// for DurationMetricDataWrapper
const int FIELD_ID_DATA = 1;
// for DurationMetricData
const int FIELD_ID_DIMENSION = 1;
const int FIELD_ID_BUCKET_INFO = 2;
// for KeyValuePair
const int FIELD_ID_KEY = 1;
const int FIELD_ID_VALUE_STR = 2;
const int FIELD_ID_VALUE_INT = 3;
const int FIELD_ID_VALUE_BOOL = 4;
const int FIELD_ID_VALUE_FLOAT = 5;
// for DurationBucketInfo
const int FIELD_ID_START_BUCKET_NANOS = 1;
const int FIELD_ID_END_BUCKET_NANOS = 2;
const int FIELD_ID_DURATION = 3;

DurationMetricProducer::DurationMetricProducer(const DurationMetric& metric,
                                               const int conditionIndex, const size_t startIndex,
                                               const size_t stopIndex, const size_t stopAllIndex,
@@ -61,6 +88,8 @@ DurationMetricProducer::DurationMetricProducer(const DurationMetric& metric,
        mConditionSliced = true;
    }

    startNewProtoOutputStream(mStartTimeNs);

    VLOG("metric %lld created. bucket size %lld start_time: %lld", metric.metric_id(),
         (long long)mBucketSizeNs, (long long)mStartTimeNs);
}
@@ -69,8 +98,15 @@ DurationMetricProducer::~DurationMetricProducer() {
    VLOG("~DurationMetric() called");
}

void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
    mProto = std::make_unique<ProtoOutputStream>();
    mProto->write(FIELD_TYPE_INT32 | FIELD_ID_METRIC_ID, mMetric.metric_id());
    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS);
}

unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
        vector<DurationBucketInfo>& bucket) {
        vector<DurationBucket>& bucket) {
    switch (mMetric.type()) {
        case DurationMetric_AggregationType_DURATION_SUM:
            return make_unique<OringDurationTracker>(mWizard, mConditionTrackerIndex,
@@ -124,29 +160,69 @@ static void addDurationBucketsToReport(StatsLogReport_DurationMetricDataWrapper&
}

StatsLogReport DurationMetricProducer::onDumpReport() {
    VLOG("metric %lld dump report now...", mMetric.metric_id());
    StatsLogReport report;
    report.set_metric_id(mMetric.metric_id());
    report.set_start_report_nanos(mStartTimeNs);
    long long endTime = time(nullptr) * NS_PER_SEC;

    // Dump current bucket if it's stale.
    // If current bucket is still on-going, don't force dump current bucket.
    // In finish(), We can force dump current bucket.
    flushIfNeeded(time(nullptr) * NS_PER_SEC);
    report.set_end_report_nanos(mCurrentBucketStartTimeNs);
    flushIfNeeded(endTime);
    VLOG("metric %lld dump report now...", mMetric.metric_id());

    StatsLogReport_DurationMetricDataWrapper* wrapper = report.mutable_duration_metrics();
    for (const auto& pair : mPastBuckets) {
        const HashableDimensionKey& hashableKey = pair.first;
        VLOG("  dimension key %s", hashableKey.c_str());
        auto it = mDimensionKeyMap.find(hashableKey);
        if (it == mDimensionKeyMap.end()) {
            ALOGW("Dimension key %s not found?!?! skip...", hashableKey.c_str());
            continue;
        }
        VLOG("  dimension key %s", hashableKey.c_str());
        addDurationBucketsToReport(*wrapper, it->second, pair.second);
        long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DATA);

        // First fill dimension (KeyValuePairs).
        for (const auto& kv : it->second) {
            long long dimensionToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION);
            mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
            if (kv.has_value_str()) {
                mProto->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_STR, kv.value_str());
            } else if (kv.has_value_int()) {
                mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
            } else if (kv.has_value_bool()) {
                mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
            } else if (kv.has_value_float()) {
                mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
            }
            mProto->end(dimensionToken);
        }

        // Then fill bucket_info (DurationBucketInfo).
        for (const auto& bucket : pair.second) {
            long long bucketInfoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_BUCKET_INFO);
            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
                          (long long)bucket.mBucketStartNs);
            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
                          (long long)bucket.mBucketEndNs);
            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration);
            mProto->end(bucketInfoToken);
            VLOG("\t bucket [%lld - %lld] duration: %lld", (long long)bucket.mBucketStartNs,
                 (long long)bucket.mBucketEndNs, (long long)bucket.mDuration);
        }

        mProto->end(wrapperToken);
    }

    mProto->end(mProtoToken);
    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                  (long long)mCurrentBucketStartTimeNs);

    std::unique_ptr<uint8_t[]> buffer = serializeProto();

    startNewProtoOutputStream(endTime);
    mPastBuckets.clear();

    // TODO: Once we migrate all MetricProducers to use ProtoOutputStream, we should return this:
    // return std::move(buffer);
    return StatsLogReport();
}
    return report;
};

void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
    if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
@@ -188,17 +264,17 @@ void DurationMetricProducer::onMatchedLogEventInternal(

    if (matcherIndex == mStartIndex) {
        it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);

    } else if (matcherIndex == mStopIndex) {
        it->second->noteStop(atomKey, event.GetTimestampNs());
    }
}

size_t DurationMetricProducer::byteSize() {
    // TODO: return actual proto size when ProtoOutputStream is ready for use for
    // DurationMetricsProducer.
    //    return mProto->size();
    return 0;
  size_t totalSize = 0;
  for (const auto& pair : mPastBuckets) {
      totalSize += pair.second.size() * kBucketSize;
  }
  return totalSize;
}

}  // namespace statsd
+9 −2
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@

#include <unordered_map>

#include <android/util/ProtoOutputStream.h>
#include "../condition/ConditionTracker.h"
#include "../matchers/matcher_util.h"
#include "MetricProducer.h"
@@ -48,6 +49,7 @@ public:

    void finish() override;

    // TODO: Pass a timestamp as a parameter in onDumpReport.
    StatsLogReport onDumpReport() override;

    void onSlicedConditionMayChange(const uint64_t eventTime) override;
@@ -65,6 +67,8 @@ protected:
                                   bool condition, const LogEvent& event,
                                   bool scheduledPull) override;

    void startNewProtoOutputStream(long long timestamp) override;

private:
    const DurationMetric mMetric;

@@ -81,7 +85,8 @@ private:
    const vector<KeyMatcher> mInternalDimension;

    // Save the past buckets and we can clear when the StatsLogReport is dumped.
    std::unordered_map<HashableDimensionKey, std::vector<DurationBucketInfo>> mPastBuckets;
    // TODO: Add a lock to mPastBuckets.
    std::unordered_map<HashableDimensionKey, std::vector<DurationBucket>> mPastBuckets;

    // The current bucket.
    std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>>
@@ -89,9 +94,11 @@ private:

    void flushDurationIfNeeded(const uint64_t newEventTime);

    std::unique_ptr<DurationTracker> createDurationTracker(std::vector<DurationBucketInfo>& bucket);
    std::unique_ptr<DurationTracker> createDurationTracker(std::vector<DurationBucket>& bucket);

    void flushIfNeeded(uint64_t timestamp);

    static const size_t kBucketSize = sizeof(DurationBucket{});
};

}  // namespace statsd
+6 −10
Original line number Diff line number Diff line
@@ -23,7 +23,11 @@
#include <limits.h>
#include <stdlib.h>

using namespace android::util;
using android::util::FIELD_TYPE_BOOL;
using android::util::FIELD_TYPE_FLOAT;
using android::util::FIELD_TYPE_INT32;
using android::util::FIELD_TYPE_INT64;
using android::util::FIELD_TYPE_MESSAGE;
using android::util::ProtoOutputStream;
using std::map;
using std::string;
@@ -87,15 +91,7 @@ StatsLogReport EventMetricProducer::onDumpReport() {

    size_t bufferSize = mProto->size();
    VLOG("metric %lld dump report now... proto size: %zu ", mMetric.metric_id(), bufferSize);
    std::unique_ptr<uint8_t[]> buffer(new uint8_t[bufferSize]);
    size_t pos = 0;
    auto it = mProto->data();
    while (it.readBuffer() != NULL) {
        size_t toRead = it.currentToRead();
        std::memcpy(&buffer[pos], it.readBuffer(), toRead);
        pos += toRead;
        it.rp()->move(toRead);
    }
    std::unique_ptr<uint8_t[]> buffer = serializeProto();

    startNewProtoOutputStream(endTime);

Loading