Loading cmds/statsd/src/shell/ShellSubscriber.cpp +104 −126 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ #include "ShellSubscriber.h" #include <android-base/file.h> #include "matchers/matcher_util.h" #include "stats_log_util.h" Loading @@ -30,154 +31,129 @@ namespace statsd { const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { VLOG("start new shell subscription"); int64_t subscriberId = getElapsedRealtimeNs(); int myToken = claimToken(); mSubscriptionShouldEnd.notify_one(); { std::lock_guard<std::mutex> lock(mMutex); if (mSubscriberId> 0) { VLOG("Only one shell subscriber is allowed."); shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out); if (!readConfig(mySubscriptionInfo)) { return; } mSubscriberId = subscriberId; mInput = in; mOutput = out; } bool success = readConfig(); if (!success) { std::lock_guard<std::mutex> lock(mMutex); cleanUpLocked(); // critical-section std::unique_lock<std::mutex> lock(mMutex); if (myToken < mToken) { // Some other subscription has already come in. Stop. return; } mSubscriptionInfo = mySubscriptionInfo; VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec); std::unique_lock<std::mutex> lk(mMutex); if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) { // This thread terminates after it detects that mToken has changed. std::thread puller([this, myToken] { startPull(myToken); }); puller.detach(); } // Note that the following is blocking, and it's intended as we cannot return until the shell // cmd exits or we time out. // Block until subscription has ended. if (timeoutSec > 0) { mShellDied.wait_for(lk, timeoutSec * 1s, [this, subscriberId] { return mSubscriberId != subscriberId; }); mSubscriptionShouldEnd.wait_for( lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] { return mToken != myToken || !mySubscriptionInfo->mClientAlive; }); } else { mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; }); } mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] { return mToken != myToken || !mySubscriptionInfo->mClientAlive; }); } if (mSubscriptionInfo == mySubscriptionInfo) { mSubscriptionInfo = nullptr; } } // Read configs until EOF is reached. There may be multiple configs in the input // -- each new config should replace the previous one. // // Returns a boolean indicating whether the input was read successfully. bool ShellSubscriber::readConfig() { if (mInput < 0) { return false; // Atomically claim the next token. Token numbers denote subscriber ordering. int ShellSubscriber::claimToken() { std::unique_lock<std::mutex> lock(mMutex); int myToken = ++mToken; return myToken; } while (true) { // Read and parse single config. There should only one config per input. bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) { // Read the size of the config. size_t bufferSize = 0; ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize)); if (bytesRead == 0) { VLOG("We have reached the end of the input."); return true; } else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) { ALOGE("Error reading config size"); size_t bufferSize; if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) { return false; } // Read and parse the config. // Read the config. vector<uint8_t> buffer(bufferSize); bytesRead = read(mInput, buffer.data(), bufferSize); if (bytesRead > 0 && (size_t)bytesRead == bufferSize) { ShellSubscription config; if (config.ParseFromArray(buffer.data(), bufferSize)) { updateConfig(config); } else { ALOGE("Error parsing the config"); if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) { return false; } } else { VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize, bytesRead); // Parse the config. ShellSubscription config; if (!config.ParseFromArray(buffer.data(), bufferSize)) { return false; } } } void ShellSubscriber::updateConfig(const ShellSubscription& config) { mPushedMatchers.clear(); mPulledInfo.clear(); // Update SubscriptionInfo with state from config for (const auto& pushed : config.pushed()) { mPushedMatchers.push_back(pushed); VLOG("adding matcher for pushed atom %d", pushed.atom_id()); subscriptionInfo->mPushedMatchers.push_back(pushed); } int64_t token = getElapsedRealtimeNs(); mPullToken = token; int64_t minInterval = -1; int minInterval = -1; for (const auto& pulled : config.pulled()) { // All intervals need to be multiples of the min interval. if (minInterval < 0 || pulled.freq_millis() < minInterval) { minInterval = pulled.freq_millis(); } mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); } subscriptionInfo->mPullIntervalMin = minInterval; if (mPulledInfo.size() > 0 && minInterval > 0) { // This thread is guaranteed to terminate after it detects the token is // different. std::thread puller([token, minInterval, this] { startPull(token, minInterval); }); puller.detach(); } return true; } void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) { void ShellSubscriber::startPull(int64_t myToken) { while (true) { int64_t nowMillis = getElapsedRealtimeMillis(); { std::lock_guard<std::mutex> lock(mMutex); // If the token has changed, the config has changed, so this // puller can now stop. if (mPulledInfo.size() == 0 || mPullToken != token) { VLOG("Pulling thread %lld done!", (long long)token); if (!mSubscriptionInfo || mToken != myToken) { VLOG("Pulling thread %lld done!", (long long)myToken); return; } for (auto& pullInfo : mPulledInfo) { if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id()); int64_t nowMillis = getElapsedRealtimeMillis(); for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) { if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { vector<std::shared_ptr<LogEvent>> data; mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data); VLOG("pulled %zu atoms", data.size()); if (data.size() > 0) { writeToOutputLocked(data, pullInfo.mPullerMatcher); VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); // TODO(b/150969574): Don't write to a pipe while holding a lock. if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } } } VLOG("Pulling thread %lld sleep....", (long long)token); std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis)); VLOG("Pulling thread %lld sleep....", (long long)myToken); std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); } } // Must be called with the lock acquired, so that mProto isn't being written to // at the same time by multiple threads. void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, // \return boolean indicating if writes were successful (will return false if // client dies) bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher) { if (mOutput < 0) { return; } int count = 0; mProto.clear(); int count = 0; for (const auto& event : data) { VLOG("%s", event->ToString().c_str()); if (matchesSimple(*mUidMap, matcher, *event)) { VLOG("matched"); count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); Loading @@ -189,24 +165,29 @@ void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent> if (count > 0) { // First write the payload size. size_t bufferSize = mProto.size(); write(mOutput, &bufferSize, sizeof(bufferSize)); if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, sizeof(bufferSize))) { return false; } VLOG("%d atoms, proto size: %zu", count, bufferSize); // Then write the payload. mProto.flush(mOutput); if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { return false; } } return true; } void ShellSubscriber::onLogEvent(const LogEvent& event) { // Acquire a lock to prevent corruption from multiple threads writing to // mProto. std::lock_guard<std::mutex> lock(mMutex); if (mOutput < 0) { if (!mSubscriptionInfo) { return; } mProto.clear(); for (const auto& matcher : mPushedMatchers) { for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { VLOG("%s", event.ToString().c_str()); uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | Loading @@ -216,24 +197,21 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) { // First write the payload size. size_t bufferSize = mProto.size(); write(mOutput, &bufferSize, sizeof(bufferSize)); if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, sizeof(bufferSize))) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } // Then write the payload. mProto.flush(mOutput); if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } } } void ShellSubscriber::cleanUpLocked() { // The file descriptors will be closed by binder. mInput = -1; mOutput = -1; mSubscriberId = 0; mPushedMatchers.clear(); mPulledInfo.clear(); // Setting mPullToken == 0 tells pull thread that its work is done. mPullToken = 0; VLOG("done clean up"); } } // namespace statsd Loading cmds/statsd/src/shell/ShellSubscriber.h +21 −20 Original line number Diff line number Diff line Loading @@ -60,9 +60,6 @@ public: ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr) : mUidMap(uidMap), mPullerMgr(pullerMgr){}; /** * Start a new subscription. */ void startNewSubscription(int inFd, int outFd, int timeoutSec); void onLogEvent(const LogEvent& event); Loading @@ -76,15 +73,27 @@ private: int64_t mInterval; int64_t mPrevPullElapsedRealtimeMs; }; bool readConfig(); void updateConfig(const ShellSubscription& config); struct SubscriptionInfo { SubscriptionInfo(const int& inputFd, const int& outputFd) : mInputFd(inputFd), mOutputFd(outputFd), mClientAlive(true) { } int mInputFd; int mOutputFd; std::vector<SimpleAtomMatcher> mPushedMatchers; std::vector<PullInfo> mPulledInfo; int mPullIntervalMin; bool mClientAlive; }; int claimToken(); void startPull(int64_t token, int64_t intervalMillis); bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo); void cleanUpLocked(); void startPull(int64_t myToken); void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, bool writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher); sp<UidMap> mUidMap; Loading @@ -95,19 +104,11 @@ private: mutable std::mutex mMutex; std::condition_variable mShellDied; // semaphore for waiting until shell exits. int mInput = -1; // The input file descriptor int mOutput = -1; // The output file descriptor std::vector<SimpleAtomMatcher> mPushedMatchers; std::vector<PullInfo> mPulledInfo; std::condition_variable mSubscriptionShouldEnd; int64_t mSubscriberId = 0; // A unique id to identify a subscriber. std::shared_ptr<SubscriptionInfo> mSubscriptionInfo = nullptr; int64_t mPullToken = 0; // A unique token to identify a puller thread. int mToken; }; } // namespace statsd Loading cmds/statsd/tests/shell/ShellSubscriber_test.cpp +67 −49 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h" #include "frameworks/base/cmds/statsd/src/shell/shell_data.pb.h" #include "src/shell/ShellSubscriber.h" #include "stats_event.h" #include "tests/metrics/metrics_test_helper.h" #include <stdio.h> Loading Loading @@ -88,6 +89,7 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap, // now read from the pipe. firstly read the atom size. size_t dataSize = 0; EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize))); EXPECT_EQ(expected_data_size, (int)dataSize); // then read that much data which is the atom in proto binary format Loading @@ -103,32 +105,43 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap, expectedData.SerializeToArray(&atomBuffer[0], expected_data_size); EXPECT_EQ(atomBuffer, dataBuffer); close(fds_data[0]); if (reader.joinable()) { reader.join(); } } // TODO(b/149590301): Update this test to use new socket schema. //TEST(ShellSubscriberTest, testPushedSubscription) { // sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>(); // // sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); // vector<std::shared_ptr<LogEvent>> pushedList; // // std::shared_ptr<LogEvent> event1 = // std::make_shared<LogEvent>(29 /*screen_state_atom_id*/, 1000 /*timestamp*/); // event1->write(::android::view::DisplayStateEnum::DISPLAY_STATE_ON); // event1->init(); // pushedList.push_back(event1); // // // create a simple config to get screen events // ShellSubscription config; // config.add_pushed()->set_atom_id(29); // // // this is the expected screen event atom. // ShellData shellData; // shellData.add_atom()->mutable_screen_state_changed()->set_state( // ::android::view::DisplayStateEnum::DISPLAY_STATE_ON); // // runShellTest(config, uidMap, pullerManager, pushedList, shellData); //} TEST(ShellSubscriberTest, testPushedSubscription) { sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); vector<std::shared_ptr<LogEvent>> pushedList; // Create the LogEvent from an AStatsEvent AStatsEvent* statsEvent = AStatsEvent_obtain(); AStatsEvent_setAtomId(statsEvent, 29 /*screen_state_atom_id*/); AStatsEvent_overwriteTimestamp(statsEvent, 1000); AStatsEvent_writeInt32(statsEvent, ::android::view::DisplayStateEnum::DISPLAY_STATE_ON); AStatsEvent_build(statsEvent); size_t size; uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &size); std::shared_ptr<LogEvent> logEvent = std::make_shared<LogEvent>(/*uid=*/0, /*pid=*/0); logEvent->parseBuffer(buffer, size); AStatsEvent_release(statsEvent); pushedList.push_back(logEvent); // create a simple config to get screen events ShellSubscription config; config.add_pushed()->set_atom_id(29); // this is the expected screen event atom. ShellData shellData; shellData.add_atom()->mutable_screen_state_changed()->set_state( ::android::view::DisplayStateEnum::DISPLAY_STATE_ON); runShellTest(config, uidMap, pullerManager, pushedList, shellData); } namespace { Loading Loading @@ -159,33 +172,38 @@ ShellSubscription getPulledConfig() { return config; } shared_ptr<LogEvent> makeCpuActiveTimeAtom(int32_t uid, int64_t timeMillis) { AStatsEvent* statsEvent = AStatsEvent_obtain(); AStatsEvent_setAtomId(statsEvent, 10016); AStatsEvent_overwriteTimestamp(statsEvent, 1111L); AStatsEvent_writeInt32(statsEvent, uid); AStatsEvent_writeInt64(statsEvent, timeMillis); AStatsEvent_build(statsEvent); size_t size; uint8_t* buf = AStatsEvent_getBuffer(statsEvent, &size); std::shared_ptr<LogEvent> logEvent = std::make_shared<LogEvent>(/*uid=*/0, /*pid=*/0); logEvent->parseBuffer(buf, size); return logEvent; } } // namespace // TODO(b/149590301): Update this test to use new socket schema. //TEST(ShellSubscriberTest, testPulledSubscription) { // sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>(); // // sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); // EXPECT_CALL(*pullerManager, Pull(10016, _)) // .WillRepeatedly(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { // data->clear(); // shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, 1111L); // event->write(kUid1); // event->write(kCpuTime1); // event->init(); // data->push_back(event); // // another event // event = make_shared<LogEvent>(tagId, 1111L); // event->write(kUid2); // event->write(kCpuTime2); // event->init(); // data->push_back(event); // return true; // })); // // runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(), // getExpectedShellData()); //} TEST(ShellSubscriberTest, testPulledSubscription) { sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, Pull(10016, _)) .WillRepeatedly(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); data->push_back(makeCpuActiveTimeAtom(/*uid=*/kUid1, /*timeMillis=*/kCpuTime1)); data->push_back(makeCpuActiveTimeAtom(/*uid=*/kUid2, /*timeMillis=*/kCpuTime2)); return true; })); runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(), getExpectedShellData()); } #else GTEST_LOG_(INFO) << "This test does nothing.\n"; Loading Loading
cmds/statsd/src/shell/ShellSubscriber.cpp +104 −126 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ #include "ShellSubscriber.h" #include <android-base/file.h> #include "matchers/matcher_util.h" #include "stats_log_util.h" Loading @@ -30,154 +31,129 @@ namespace statsd { const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { VLOG("start new shell subscription"); int64_t subscriberId = getElapsedRealtimeNs(); int myToken = claimToken(); mSubscriptionShouldEnd.notify_one(); { std::lock_guard<std::mutex> lock(mMutex); if (mSubscriberId> 0) { VLOG("Only one shell subscriber is allowed."); shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out); if (!readConfig(mySubscriptionInfo)) { return; } mSubscriberId = subscriberId; mInput = in; mOutput = out; } bool success = readConfig(); if (!success) { std::lock_guard<std::mutex> lock(mMutex); cleanUpLocked(); // critical-section std::unique_lock<std::mutex> lock(mMutex); if (myToken < mToken) { // Some other subscription has already come in. Stop. return; } mSubscriptionInfo = mySubscriptionInfo; VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec); std::unique_lock<std::mutex> lk(mMutex); if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) { // This thread terminates after it detects that mToken has changed. std::thread puller([this, myToken] { startPull(myToken); }); puller.detach(); } // Note that the following is blocking, and it's intended as we cannot return until the shell // cmd exits or we time out. // Block until subscription has ended. if (timeoutSec > 0) { mShellDied.wait_for(lk, timeoutSec * 1s, [this, subscriberId] { return mSubscriberId != subscriberId; }); mSubscriptionShouldEnd.wait_for( lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] { return mToken != myToken || !mySubscriptionInfo->mClientAlive; }); } else { mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; }); } mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] { return mToken != myToken || !mySubscriptionInfo->mClientAlive; }); } if (mSubscriptionInfo == mySubscriptionInfo) { mSubscriptionInfo = nullptr; } } // Read configs until EOF is reached. There may be multiple configs in the input // -- each new config should replace the previous one. // // Returns a boolean indicating whether the input was read successfully. bool ShellSubscriber::readConfig() { if (mInput < 0) { return false; // Atomically claim the next token. Token numbers denote subscriber ordering. int ShellSubscriber::claimToken() { std::unique_lock<std::mutex> lock(mMutex); int myToken = ++mToken; return myToken; } while (true) { // Read and parse single config. There should only one config per input. bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) { // Read the size of the config. size_t bufferSize = 0; ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize)); if (bytesRead == 0) { VLOG("We have reached the end of the input."); return true; } else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) { ALOGE("Error reading config size"); size_t bufferSize; if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) { return false; } // Read and parse the config. // Read the config. vector<uint8_t> buffer(bufferSize); bytesRead = read(mInput, buffer.data(), bufferSize); if (bytesRead > 0 && (size_t)bytesRead == bufferSize) { ShellSubscription config; if (config.ParseFromArray(buffer.data(), bufferSize)) { updateConfig(config); } else { ALOGE("Error parsing the config"); if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) { return false; } } else { VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize, bytesRead); // Parse the config. ShellSubscription config; if (!config.ParseFromArray(buffer.data(), bufferSize)) { return false; } } } void ShellSubscriber::updateConfig(const ShellSubscription& config) { mPushedMatchers.clear(); mPulledInfo.clear(); // Update SubscriptionInfo with state from config for (const auto& pushed : config.pushed()) { mPushedMatchers.push_back(pushed); VLOG("adding matcher for pushed atom %d", pushed.atom_id()); subscriptionInfo->mPushedMatchers.push_back(pushed); } int64_t token = getElapsedRealtimeNs(); mPullToken = token; int64_t minInterval = -1; int minInterval = -1; for (const auto& pulled : config.pulled()) { // All intervals need to be multiples of the min interval. if (minInterval < 0 || pulled.freq_millis() < minInterval) { minInterval = pulled.freq_millis(); } mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); } subscriptionInfo->mPullIntervalMin = minInterval; if (mPulledInfo.size() > 0 && minInterval > 0) { // This thread is guaranteed to terminate after it detects the token is // different. std::thread puller([token, minInterval, this] { startPull(token, minInterval); }); puller.detach(); } return true; } void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) { void ShellSubscriber::startPull(int64_t myToken) { while (true) { int64_t nowMillis = getElapsedRealtimeMillis(); { std::lock_guard<std::mutex> lock(mMutex); // If the token has changed, the config has changed, so this // puller can now stop. if (mPulledInfo.size() == 0 || mPullToken != token) { VLOG("Pulling thread %lld done!", (long long)token); if (!mSubscriptionInfo || mToken != myToken) { VLOG("Pulling thread %lld done!", (long long)myToken); return; } for (auto& pullInfo : mPulledInfo) { if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id()); int64_t nowMillis = getElapsedRealtimeMillis(); for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) { if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { vector<std::shared_ptr<LogEvent>> data; mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data); VLOG("pulled %zu atoms", data.size()); if (data.size() > 0) { writeToOutputLocked(data, pullInfo.mPullerMatcher); VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); // TODO(b/150969574): Don't write to a pipe while holding a lock. if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } } } VLOG("Pulling thread %lld sleep....", (long long)token); std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis)); VLOG("Pulling thread %lld sleep....", (long long)myToken); std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); } } // Must be called with the lock acquired, so that mProto isn't being written to // at the same time by multiple threads. void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, // \return boolean indicating if writes were successful (will return false if // client dies) bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher) { if (mOutput < 0) { return; } int count = 0; mProto.clear(); int count = 0; for (const auto& event : data) { VLOG("%s", event->ToString().c_str()); if (matchesSimple(*mUidMap, matcher, *event)) { VLOG("matched"); count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); Loading @@ -189,24 +165,29 @@ void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent> if (count > 0) { // First write the payload size. size_t bufferSize = mProto.size(); write(mOutput, &bufferSize, sizeof(bufferSize)); if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, sizeof(bufferSize))) { return false; } VLOG("%d atoms, proto size: %zu", count, bufferSize); // Then write the payload. mProto.flush(mOutput); if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { return false; } } return true; } void ShellSubscriber::onLogEvent(const LogEvent& event) { // Acquire a lock to prevent corruption from multiple threads writing to // mProto. std::lock_guard<std::mutex> lock(mMutex); if (mOutput < 0) { if (!mSubscriptionInfo) { return; } mProto.clear(); for (const auto& matcher : mPushedMatchers) { for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { VLOG("%s", event.ToString().c_str()); uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | Loading @@ -216,24 +197,21 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) { // First write the payload size. size_t bufferSize = mProto.size(); write(mOutput, &bufferSize, sizeof(bufferSize)); if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, sizeof(bufferSize))) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } // Then write the payload. mProto.flush(mOutput); if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } } } void ShellSubscriber::cleanUpLocked() { // The file descriptors will be closed by binder. mInput = -1; mOutput = -1; mSubscriberId = 0; mPushedMatchers.clear(); mPulledInfo.clear(); // Setting mPullToken == 0 tells pull thread that its work is done. mPullToken = 0; VLOG("done clean up"); } } // namespace statsd Loading
cmds/statsd/src/shell/ShellSubscriber.h +21 −20 Original line number Diff line number Diff line Loading @@ -60,9 +60,6 @@ public: ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr) : mUidMap(uidMap), mPullerMgr(pullerMgr){}; /** * Start a new subscription. */ void startNewSubscription(int inFd, int outFd, int timeoutSec); void onLogEvent(const LogEvent& event); Loading @@ -76,15 +73,27 @@ private: int64_t mInterval; int64_t mPrevPullElapsedRealtimeMs; }; bool readConfig(); void updateConfig(const ShellSubscription& config); struct SubscriptionInfo { SubscriptionInfo(const int& inputFd, const int& outputFd) : mInputFd(inputFd), mOutputFd(outputFd), mClientAlive(true) { } int mInputFd; int mOutputFd; std::vector<SimpleAtomMatcher> mPushedMatchers; std::vector<PullInfo> mPulledInfo; int mPullIntervalMin; bool mClientAlive; }; int claimToken(); void startPull(int64_t token, int64_t intervalMillis); bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo); void cleanUpLocked(); void startPull(int64_t myToken); void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, bool writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher); sp<UidMap> mUidMap; Loading @@ -95,19 +104,11 @@ private: mutable std::mutex mMutex; std::condition_variable mShellDied; // semaphore for waiting until shell exits. int mInput = -1; // The input file descriptor int mOutput = -1; // The output file descriptor std::vector<SimpleAtomMatcher> mPushedMatchers; std::vector<PullInfo> mPulledInfo; std::condition_variable mSubscriptionShouldEnd; int64_t mSubscriberId = 0; // A unique id to identify a subscriber. std::shared_ptr<SubscriptionInfo> mSubscriptionInfo = nullptr; int64_t mPullToken = 0; // A unique token to identify a puller thread. int mToken; }; } // namespace statsd Loading
cmds/statsd/tests/shell/ShellSubscriber_test.cpp +67 −49 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h" #include "frameworks/base/cmds/statsd/src/shell/shell_data.pb.h" #include "src/shell/ShellSubscriber.h" #include "stats_event.h" #include "tests/metrics/metrics_test_helper.h" #include <stdio.h> Loading Loading @@ -88,6 +89,7 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap, // now read from the pipe. firstly read the atom size. size_t dataSize = 0; EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize))); EXPECT_EQ(expected_data_size, (int)dataSize); // then read that much data which is the atom in proto binary format Loading @@ -103,32 +105,43 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap, expectedData.SerializeToArray(&atomBuffer[0], expected_data_size); EXPECT_EQ(atomBuffer, dataBuffer); close(fds_data[0]); if (reader.joinable()) { reader.join(); } } // TODO(b/149590301): Update this test to use new socket schema. //TEST(ShellSubscriberTest, testPushedSubscription) { // sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>(); // // sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); // vector<std::shared_ptr<LogEvent>> pushedList; // // std::shared_ptr<LogEvent> event1 = // std::make_shared<LogEvent>(29 /*screen_state_atom_id*/, 1000 /*timestamp*/); // event1->write(::android::view::DisplayStateEnum::DISPLAY_STATE_ON); // event1->init(); // pushedList.push_back(event1); // // // create a simple config to get screen events // ShellSubscription config; // config.add_pushed()->set_atom_id(29); // // // this is the expected screen event atom. // ShellData shellData; // shellData.add_atom()->mutable_screen_state_changed()->set_state( // ::android::view::DisplayStateEnum::DISPLAY_STATE_ON); // // runShellTest(config, uidMap, pullerManager, pushedList, shellData); //} TEST(ShellSubscriberTest, testPushedSubscription) { sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); vector<std::shared_ptr<LogEvent>> pushedList; // Create the LogEvent from an AStatsEvent AStatsEvent* statsEvent = AStatsEvent_obtain(); AStatsEvent_setAtomId(statsEvent, 29 /*screen_state_atom_id*/); AStatsEvent_overwriteTimestamp(statsEvent, 1000); AStatsEvent_writeInt32(statsEvent, ::android::view::DisplayStateEnum::DISPLAY_STATE_ON); AStatsEvent_build(statsEvent); size_t size; uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &size); std::shared_ptr<LogEvent> logEvent = std::make_shared<LogEvent>(/*uid=*/0, /*pid=*/0); logEvent->parseBuffer(buffer, size); AStatsEvent_release(statsEvent); pushedList.push_back(logEvent); // create a simple config to get screen events ShellSubscription config; config.add_pushed()->set_atom_id(29); // this is the expected screen event atom. ShellData shellData; shellData.add_atom()->mutable_screen_state_changed()->set_state( ::android::view::DisplayStateEnum::DISPLAY_STATE_ON); runShellTest(config, uidMap, pullerManager, pushedList, shellData); } namespace { Loading Loading @@ -159,33 +172,38 @@ ShellSubscription getPulledConfig() { return config; } shared_ptr<LogEvent> makeCpuActiveTimeAtom(int32_t uid, int64_t timeMillis) { AStatsEvent* statsEvent = AStatsEvent_obtain(); AStatsEvent_setAtomId(statsEvent, 10016); AStatsEvent_overwriteTimestamp(statsEvent, 1111L); AStatsEvent_writeInt32(statsEvent, uid); AStatsEvent_writeInt64(statsEvent, timeMillis); AStatsEvent_build(statsEvent); size_t size; uint8_t* buf = AStatsEvent_getBuffer(statsEvent, &size); std::shared_ptr<LogEvent> logEvent = std::make_shared<LogEvent>(/*uid=*/0, /*pid=*/0); logEvent->parseBuffer(buf, size); return logEvent; } } // namespace // TODO(b/149590301): Update this test to use new socket schema. //TEST(ShellSubscriberTest, testPulledSubscription) { // sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>(); // // sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); // EXPECT_CALL(*pullerManager, Pull(10016, _)) // .WillRepeatedly(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { // data->clear(); // shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, 1111L); // event->write(kUid1); // event->write(kCpuTime1); // event->init(); // data->push_back(event); // // another event // event = make_shared<LogEvent>(tagId, 1111L); // event->write(kUid2); // event->write(kCpuTime2); // event->init(); // data->push_back(event); // return true; // })); // // runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(), // getExpectedShellData()); //} TEST(ShellSubscriberTest, testPulledSubscription) { sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>(); sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, Pull(10016, _)) .WillRepeatedly(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); data->push_back(makeCpuActiveTimeAtom(/*uid=*/kUid1, /*timeMillis=*/kCpuTime1)); data->push_back(makeCpuActiveTimeAtom(/*uid=*/kUid2, /*timeMillis=*/kCpuTime2)); return true; })); runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(), getExpectedShellData()); } #else GTEST_LOG_(INFO) << "This test does nothing.\n"; Loading