Loading cmds/statsd/src/metrics/ValueMetricProducer.h +5 −2 Original line number Original line Diff line number Diff line Loading @@ -51,7 +51,7 @@ public: const int64_t version) override { const int64_t version) override { std::lock_guard<std::mutex> lock(mMutex); std::lock_guard<std::mutex> lock(mMutex); if (mPullTagId != -1) { if (mPullTagId != -1 && (mCondition == true || mConditionTrackerIndex < 0) ) { vector<shared_ptr<LogEvent>> allData; vector<shared_ptr<LogEvent>> allData; mStatsPullerManager->Pull(mPullTagId, eventTimeNs, &allData); mStatsPullerManager->Pull(mPullTagId, eventTimeNs, &allData); if (allData.size() == 0) { if (allData.size() == 0) { Loading @@ -73,7 +73,9 @@ public: data->setElapsedTimestampNs(eventTimeNs); data->setElapsedTimestampNs(eventTimeNs); onMatchedLogEventLocked(0, *data); onMatchedLogEventLocked(0, *data); } } } else { // For pushed value metric, we simply flush and reset the current bucket start. } else { // For pushed value metric or pulled metric where condition is not true, // we simply flush and reset the current bucket start. flushCurrentBucketLocked(eventTimeNs); flushCurrentBucketLocked(eventTimeNs); mCurrentBucketStartTimeNs = eventTimeNs; mCurrentBucketStartTimeNs = eventTimeNs; } } Loading Loading @@ -170,6 +172,7 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition); FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition); FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection); FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition); Loading cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +51 −0 Original line number Original line Diff line number Diff line Loading @@ -308,6 +308,57 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) { EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValue); EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValue); } } TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.set_condition(StringToId("SCREEN_ON")); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); shared_ptr<MockStatsPullerManager> pullerManager = make_shared<StrictMock<MockStatsPullerManager>>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event->write(tagId); event->write(100); event->init(); data->push_back(event); return true; })) .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event->write(tagId); event->write(120); event->init(); data->push_back(event); return true; })); ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager); valueProducer.setBucketSize(60 * NS_PER_SEC); valueProducer.onConditionChanged(true, bucketStartTimeNs + 1); valueProducer.onConditionChanged(false, bucket2StartTimeNs-100); EXPECT_FALSE(valueProducer.mCondition); valueProducer.notifyAppUpgrade(bucket2StartTimeNs-50, "ANY.APP", 1, 1); // Expect one full buckets already done and starting a partial bucket. EXPECT_EQ(bucket2StartTimeNs-50, valueProducer.mCurrentBucketStartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); EXPECT_EQ(bucketStartTimeNs, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs); EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValue); EXPECT_FALSE(valueProducer.mCondition); } TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) { TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) { ValueMetric metric; ValueMetric metric; metric.set_id(metricId); metric.set_id(metricId); Loading Loading
cmds/statsd/src/metrics/ValueMetricProducer.h +5 −2 Original line number Original line Diff line number Diff line Loading @@ -51,7 +51,7 @@ public: const int64_t version) override { const int64_t version) override { std::lock_guard<std::mutex> lock(mMutex); std::lock_guard<std::mutex> lock(mMutex); if (mPullTagId != -1) { if (mPullTagId != -1 && (mCondition == true || mConditionTrackerIndex < 0) ) { vector<shared_ptr<LogEvent>> allData; vector<shared_ptr<LogEvent>> allData; mStatsPullerManager->Pull(mPullTagId, eventTimeNs, &allData); mStatsPullerManager->Pull(mPullTagId, eventTimeNs, &allData); if (allData.size() == 0) { if (allData.size() == 0) { Loading @@ -73,7 +73,9 @@ public: data->setElapsedTimestampNs(eventTimeNs); data->setElapsedTimestampNs(eventTimeNs); onMatchedLogEventLocked(0, *data); onMatchedLogEventLocked(0, *data); } } } else { // For pushed value metric, we simply flush and reset the current bucket start. } else { // For pushed value metric or pulled metric where condition is not true, // we simply flush and reset the current bucket start. flushCurrentBucketLocked(eventTimeNs); flushCurrentBucketLocked(eventTimeNs); mCurrentBucketStartTimeNs = eventTimeNs; mCurrentBucketStartTimeNs = eventTimeNs; } } Loading Loading @@ -170,6 +172,7 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition); FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade); FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition); FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition); FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection); FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition); Loading
cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +51 −0 Original line number Original line Diff line number Diff line Loading @@ -308,6 +308,57 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) { EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValue); EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValue); } } TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); metric.mutable_value_field()->add_child()->set_field(2); metric.set_condition(StringToId("SCREEN_ON")); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); shared_ptr<MockStatsPullerManager> pullerManager = make_shared<StrictMock<MockStatsPullerManager>>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event->write(tagId); event->write(100); event->init(); data->push_back(event); return true; })) .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); event->write(tagId); event->write(120); event->init(); data->push_back(event); return true; })); ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager); valueProducer.setBucketSize(60 * NS_PER_SEC); valueProducer.onConditionChanged(true, bucketStartTimeNs + 1); valueProducer.onConditionChanged(false, bucket2StartTimeNs-100); EXPECT_FALSE(valueProducer.mCondition); valueProducer.notifyAppUpgrade(bucket2StartTimeNs-50, "ANY.APP", 1, 1); // Expect one full buckets already done and starting a partial bucket. EXPECT_EQ(bucket2StartTimeNs-50, valueProducer.mCurrentBucketStartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); EXPECT_EQ(bucketStartTimeNs, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs); EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValue); EXPECT_FALSE(valueProducer.mCondition); } TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) { TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) { ValueMetric metric; ValueMetric metric; metric.set_id(metricId); metric.set_id(metricId); Loading