Loading cmds/statsd/src/metrics/ValueMetricProducer.cpp +8 −2 Original line number Diff line number Diff line Loading @@ -312,8 +312,13 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( if (mPullTagId != -1) { // for pulled events if (mCondition == true) { if (!interval.startUpdated) { interval.start = value; interval.startUpdated = true; } else { // skip it if there is already value recorded for the start VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str()); } } else { // Generally we expect value to be monotonically increasing. // If not, there was a reset event. We take the absolute value as Loading Loading @@ -382,6 +387,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) int tainted = 0; for (const auto& slice : mCurrentSlicedBucket) { tainted += slice.second.tainted; tainted += slice.second.startUpdated; info.mValue = slice.second.sum; // it will auto create new vector of ValuebucketInfo if the key is not found. auto& bucketList = mPastBuckets[slice.first]; Loading cmds/statsd/src/metrics/ValueMetricProducer.h +4 −0 Original line number Diff line number Diff line Loading @@ -159,6 +159,10 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition); FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3); }; } // namespace statsd Loading cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +372 −0 Original line number Diff line number Diff line Loading @@ -45,6 +45,8 @@ const int64_t bucketSizeNs = TimeUnitToBucketSizeInMillis(ONE_MINUTE) * 1000000L const int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs; const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs; const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs; const int64_t bucket5StartTimeNs = bucketStartTimeNs + 4 * bucketSizeNs; const int64_t bucket6StartTimeNs = bucketStartTimeNs + 5 * bucketSizeNs; const int64_t eventUpgradeTimeNs = bucketStartTimeNs + 15 * NS_PER_SEC; /* Loading Loading @@ -431,6 +433,376 @@ TEST(ValueMetricProducerTest, TestAnomalyDetection) { std::ceil(1.0 * event6->GetElapsedTimestampNs() / NS_PER_SEC + refPeriodSec)); } // Test value metric no condition, the pull on bucket boundary come in time and too late TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); shared_ptr<MockStatsPullerManager> pullerManager = make_shared<StrictMock<MockStatsPullerManager>>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, tagId, bucketStartTimeNs, pullerManager); valueProducer.setBucketSize(60 * NS_PER_SEC); vector<shared_ptr<LogEvent>> allData; // pull 1 allData.clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); event->write(tagId); event->write(11); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; valueProducer.setBucketSize(60 * NS_PER_SEC); // startUpdated:true tainted:0 sum:0 start:11 EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(11, curInterval.start); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second.back().mValue); // pull 2 at correct time allData.clear(); event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1); event->write(tagId); event->write(23); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // tartUpdated:false tainted:0 sum:12 EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValue); // pull 3 come late. // The previous bucket gets closed with error. (Has start value 23, no ending) // Another bucket gets closed with error. (No start, but ending with 36) // The new bucket is back to normal. allData.clear(); event = make_shared<LogEvent>(tagId, bucket6StartTimeNs + 1); event->write(tagId); event->write(36); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:false tainted:0 sum:12 EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(36, curInterval.start); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(4UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second[1].mValue); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[2].mValue); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[3].mValue); } /* * Test pulled event with non sliced condition. The pull on boundary come late because the alarm * was delivered late. */ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.set_condition(StringToId("SCREEN_ON")); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); shared_ptr<MockStatsPullerManager> pullerManager = make_shared<StrictMock<MockStatsPullerManager>>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) // condition becomes true .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event->write(tagId); event->write(100); event->init(); data->push_back(event); return true; })) // condition becomes false .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20); event->write(tagId); event->write(120); event->init(); data->push_back(event); return true; })); ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs, pullerManager); valueProducer.setBucketSize(60 * NS_PER_SEC); valueProducer.onConditionChanged(true, bucketStartTimeNs + 8); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:false tainted:0 sum:0 start:100 EXPECT_EQ(100, curInterval.start); EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // pull on bucket boundary come late, condition change happens before it valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(false, curInterval.startUpdated); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); // Now the alarm is delivered. // since the condition turned to off before this pull finish, it has no effect vector<shared_ptr<LogEvent>> allData; allData.clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 30); event->write(1); event->write(110); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(false, curInterval.startUpdated); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); } /* * Test pulled event with non sliced condition. The pull on boundary come late, after the condition * change to false, and then true again. This is due to alarm delivered late. */ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.set_condition(StringToId("SCREEN_ON")); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); shared_ptr<MockStatsPullerManager> pullerManager = make_shared<StrictMock<MockStatsPullerManager>>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillRepeatedly(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) // condition becomes true .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event->write(tagId); event->write(100); event->init(); data->push_back(event); return true; })) // condition becomes false .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20); event->write(tagId); event->write(120); event->init(); data->push_back(event); return true; })) // condition becomes true again .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 30); event->write(tagId); event->write(130); event->init(); data->push_back(event); return true; })); ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs, pullerManager); valueProducer.setBucketSize(60 * NS_PER_SEC); valueProducer.onConditionChanged(true, bucketStartTimeNs + 8); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:false tainted:0 sum:0 start:100 EXPECT_EQ(100, curInterval.start); EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // pull on bucket boundary come late, condition change happens before it valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(false, curInterval.startUpdated); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); // condition changed to true again, before the pull alarm is delivered valueProducer.onConditionChanged(true, bucket2StartTimeNs + 25); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(130, curInterval.start); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); // Now the alarm is delivered, but it is considered late, it has no effect vector<shared_ptr<LogEvent>> allData; allData.clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 50); event->write(1); event->write(110); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(130, curInterval.start); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); } /* * Test pulled event with non sliced condition. The pull on boundary come late because the puller is * very slow. */ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.set_condition(StringToId("SCREEN_ON")); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); shared_ptr<MockStatsPullerManager> pullerManager = make_shared<StrictMock<MockStatsPullerManager>>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) // condition becomes true .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event->write(tagId); event->write(100); event->init(); data->push_back(event); return true; })) // condition becomes false .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 20); event->write(tagId); event->write(120); event->init(); data->push_back(event); return true; })); ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs, pullerManager); valueProducer.setBucketSize(60 * NS_PER_SEC); valueProducer.onConditionChanged(true, bucketStartTimeNs + 8); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:false tainted:0 sum:0 start:100 EXPECT_EQ(100, curInterval.start); EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // pull on bucket boundary come late, condition change happens before it. // But puller is very slow in this one, so the data come after bucket finish valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(false, curInterval.startUpdated); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); // Alarm is delivered in time, but the pull is very slow, and pullers are called in order, // so this one comes even later vector<shared_ptr<LogEvent>> allData; allData.clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 30); event->write(1); event->write(110); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(false, curInterval.startUpdated); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); } } // namespace statsd } // namespace os } // namespace android Loading Loading
cmds/statsd/src/metrics/ValueMetricProducer.cpp +8 −2 Original line number Diff line number Diff line Loading @@ -312,8 +312,13 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( if (mPullTagId != -1) { // for pulled events if (mCondition == true) { if (!interval.startUpdated) { interval.start = value; interval.startUpdated = true; } else { // skip it if there is already value recorded for the start VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str()); } } else { // Generally we expect value to be monotonically increasing. // If not, there was a reset event. We take the absolute value as Loading Loading @@ -382,6 +387,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) int tainted = 0; for (const auto& slice : mCurrentSlicedBucket) { tainted += slice.second.tainted; tainted += slice.second.startUpdated; info.mValue = slice.second.sum; // it will auto create new vector of ValuebucketInfo if the key is not found. auto& bucketList = mPastBuckets[slice.first]; Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +4 −0 Original line number Diff line number Diff line Loading @@ -159,6 +159,10 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition); FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3); }; } // namespace statsd Loading
cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +372 −0 Original line number Diff line number Diff line Loading @@ -45,6 +45,8 @@ const int64_t bucketSizeNs = TimeUnitToBucketSizeInMillis(ONE_MINUTE) * 1000000L const int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs; const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs; const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs; const int64_t bucket5StartTimeNs = bucketStartTimeNs + 4 * bucketSizeNs; const int64_t bucket6StartTimeNs = bucketStartTimeNs + 5 * bucketSizeNs; const int64_t eventUpgradeTimeNs = bucketStartTimeNs + 15 * NS_PER_SEC; /* Loading Loading @@ -431,6 +433,376 @@ TEST(ValueMetricProducerTest, TestAnomalyDetection) { std::ceil(1.0 * event6->GetElapsedTimestampNs() / NS_PER_SEC + refPeriodSec)); } // Test value metric no condition, the pull on bucket boundary come in time and too late TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); shared_ptr<MockStatsPullerManager> pullerManager = make_shared<StrictMock<MockStatsPullerManager>>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, tagId, bucketStartTimeNs, pullerManager); valueProducer.setBucketSize(60 * NS_PER_SEC); vector<shared_ptr<LogEvent>> allData; // pull 1 allData.clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); event->write(tagId); event->write(11); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; valueProducer.setBucketSize(60 * NS_PER_SEC); // startUpdated:true tainted:0 sum:0 start:11 EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(11, curInterval.start); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second.back().mValue); // pull 2 at correct time allData.clear(); event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1); event->write(tagId); event->write(23); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // tartUpdated:false tainted:0 sum:12 EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValue); // pull 3 come late. // The previous bucket gets closed with error. (Has start value 23, no ending) // Another bucket gets closed with error. (No start, but ending with 36) // The new bucket is back to normal. allData.clear(); event = make_shared<LogEvent>(tagId, bucket6StartTimeNs + 1); event->write(tagId); event->write(36); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:false tainted:0 sum:12 EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(36, curInterval.start); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(4UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second[1].mValue); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[2].mValue); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[3].mValue); } /* * Test pulled event with non sliced condition. The pull on boundary come late because the alarm * was delivered late. */ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.set_condition(StringToId("SCREEN_ON")); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); shared_ptr<MockStatsPullerManager> pullerManager = make_shared<StrictMock<MockStatsPullerManager>>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) // condition becomes true .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event->write(tagId); event->write(100); event->init(); data->push_back(event); return true; })) // condition becomes false .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20); event->write(tagId); event->write(120); event->init(); data->push_back(event); return true; })); ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs, pullerManager); valueProducer.setBucketSize(60 * NS_PER_SEC); valueProducer.onConditionChanged(true, bucketStartTimeNs + 8); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:false tainted:0 sum:0 start:100 EXPECT_EQ(100, curInterval.start); EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // pull on bucket boundary come late, condition change happens before it valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(false, curInterval.startUpdated); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); // Now the alarm is delivered. // since the condition turned to off before this pull finish, it has no effect vector<shared_ptr<LogEvent>> allData; allData.clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 30); event->write(1); event->write(110); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(false, curInterval.startUpdated); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); } /* * Test pulled event with non sliced condition. The pull on boundary come late, after the condition * change to false, and then true again. This is due to alarm delivered late. */ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.set_condition(StringToId("SCREEN_ON")); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); shared_ptr<MockStatsPullerManager> pullerManager = make_shared<StrictMock<MockStatsPullerManager>>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillRepeatedly(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) // condition becomes true .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event->write(tagId); event->write(100); event->init(); data->push_back(event); return true; })) // condition becomes false .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20); event->write(tagId); event->write(120); event->init(); data->push_back(event); return true; })) // condition becomes true again .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 30); event->write(tagId); event->write(130); event->init(); data->push_back(event); return true; })); ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs, pullerManager); valueProducer.setBucketSize(60 * NS_PER_SEC); valueProducer.onConditionChanged(true, bucketStartTimeNs + 8); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:false tainted:0 sum:0 start:100 EXPECT_EQ(100, curInterval.start); EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // pull on bucket boundary come late, condition change happens before it valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(false, curInterval.startUpdated); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); // condition changed to true again, before the pull alarm is delivered valueProducer.onConditionChanged(true, bucket2StartTimeNs + 25); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(130, curInterval.start); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); // Now the alarm is delivered, but it is considered late, it has no effect vector<shared_ptr<LogEvent>> allData; allData.clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 50); event->write(1); event->write(110); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(130, curInterval.start); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); } /* * Test pulled event with non sliced condition. The pull on boundary come late because the puller is * very slow. */ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.set_condition(StringToId("SCREEN_ON")); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); shared_ptr<MockStatsPullerManager> pullerManager = make_shared<StrictMock<MockStatsPullerManager>>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) // condition becomes true .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event->write(tagId); event->write(100); event->init(); data->push_back(event); return true; })) // condition becomes false .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 20); event->write(tagId); event->write(120); event->init(); data->push_back(event); return true; })); ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs, pullerManager); valueProducer.setBucketSize(60 * NS_PER_SEC); valueProducer.onConditionChanged(true, bucketStartTimeNs + 8); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:false tainted:0 sum:0 start:100 EXPECT_EQ(100, curInterval.start); EXPECT_EQ(true, curInterval.startUpdated); EXPECT_EQ(0, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // pull on bucket boundary come late, condition change happens before it. // But puller is very slow in this one, so the data come after bucket finish valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(false, curInterval.startUpdated); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); // Alarm is delivered in time, but the pull is very slow, and pullers are called in order, // so this one comes even later vector<shared_ptr<LogEvent>> allData; allData.clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 30); event->write(1); event->write(110); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(false, curInterval.startUpdated); EXPECT_EQ(1, curInterval.tainted); EXPECT_EQ(0, curInterval.sum); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue); } } // namespace statsd } // namespace os } // namespace android Loading