Loading cmds/statsd/Android.mk +1 −0 Original line number Diff line number Diff line Loading @@ -189,6 +189,7 @@ LOCAL_SRC_FILES := \ src/atom_field_options.proto \ src/atoms.proto \ src/stats_log.proto \ src/shell/shell_data.proto \ tests/AlarmMonitor_test.cpp \ tests/anomaly/AlarmTracker_test.cpp \ tests/anomaly/AnomalyTracker_test.cpp \ Loading cmds/statsd/src/StatsService.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -319,7 +319,7 @@ status_t StatsService::command(int in, int out, int err, Vector<String8>& args, } if (!args[0].compare(String8("data-subscribe"))) { if (mShellSubscriber == nullptr) { mShellSubscriber = new ShellSubscriber(mUidMap); mShellSubscriber = new ShellSubscriber(mUidMap, mPullerManager); } mShellSubscriber->startNewSubscription(in, out, resultReceiver); return NO_ERROR; Loading cmds/statsd/src/shell/ShellSubscriber.cpp +98 −10 Original line number Diff line number Diff line Loading @@ -18,9 +18,9 @@ #include "ShellSubscriber.h" #include "matchers/matcher_util.h" #include <android-base/file.h> #include "matchers/matcher_util.h" #include "stats_log_util.h" using android::util::ProtoOutputStream; Loading @@ -28,6 +28,8 @@ namespace android { namespace os { namespace statsd { const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver) { VLOG("start new shell subscription"); { Loading @@ -42,25 +44,106 @@ void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> IInterface::asBinder(mResultReceiver)->linkToDeath(this); } // Spawn another thread to read the config updates from the input file descriptor std::thread reader([in, this] { readConfig(in); }); reader.detach(); // Note that the following is blocking, and it's intended as we cannot return until the shell // cmd exits, otherwise all resources & FDs will be automatically closed. std::unique_lock<std::mutex> lk(mMutex); // Read config forever until EOF is reached. Clients may send multiple configs -- each new // config replace the previous one. readConfig(in); // Now we have read an EOF we now wait for the semaphore until the client exits. VLOG("Now wait for client to exit"); std::unique_lock<std::mutex> lk(mMutex); mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; }); if (reader.joinable()) { reader.join(); } } void ShellSubscriber::updateConfig(const ShellSubscription& config) { std::lock_guard<std::mutex> lock(mMutex); mPushedMatchers.clear(); mPulledInfo.clear(); for (const auto& pushed : config.pushed()) { mPushedMatchers.push_back(pushed); VLOG("adding matcher for atom %d", pushed.atom_id()); } int64_t token = getElapsedRealtimeNs(); mPullToken = token; int64_t minInterval = -1; for (const auto& pulled : config.pulled()) { // All intervals need to be multiples of the min interval. if (minInterval < 0 || pulled.freq_millis() < minInterval) { minInterval = pulled.freq_millis(); } mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); } if (mPulledInfo.size() > 0 && minInterval > 0) { // This thread is guaranteed to terminate after it detects the token is different or // cleaned up. std::thread puller([token, minInterval, this] { startPull(token, minInterval); }); puller.detach(); } } void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher) { if (mOutput == 0) return; int count = 0; mProto.clear(); for (const auto& event : data) { VLOG("%s", event->ToString().c_str()); if (matchesSimple(*mUidMap, matcher, *event)) { VLOG("matched"); count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event->ToProto(mProto); mProto.end(atomToken); } } if (count > 0) { // First write the payload size. size_t bufferSize = mProto.size(); write(mOutput, &bufferSize, sizeof(bufferSize)); VLOG("%d atoms, proto size: %zu", count, bufferSize); // Then write the payload. mProto.flush(mOutput); } mProto.clear(); } void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) { while (1) { int64_t nowMillis = getElapsedRealtimeMillis(); { std::lock_guard<std::mutex> lock(mMutex); if (mPulledInfo.size() == 0 || mPullToken != token) { VLOG("Pulling thread %lld done!", (long long)token); return; } for (auto& pullInfo : mPulledInfo) { if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id()); vector<std::shared_ptr<LogEvent>> data; mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), nowMillis * 1000000L, &data); VLOG("pulled %zu atoms", data.size()); if (data.size() > 0) { writeToOutputLocked(data, pullInfo.mPullerMatcher); } pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } } } VLOG("Pulling thread %lld sleep....", (long long)token); std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis)); } } void ShellSubscriber::readConfig(int in) { Loading Loading @@ -101,6 +184,8 @@ void ShellSubscriber::cleanUpLocked() { mOutput = 0; mResultReceiver = nullptr; mPushedMatchers.clear(); mPulledInfo.clear(); mPullToken = 0; VLOG("done clean up"); } Loading @@ -110,10 +195,13 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) { if (mOutput <= 0) { return; } for (const auto& matcher : mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { VLOG("%s", event.ToString().c_str()); uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); // First write the payload size. size_t bufferSize = mProto.size(); write(mOutput, &bufferSize, sizeof(bufferSize)); Loading cmds/statsd/src/shell/ShellSubscriber.h +22 −3 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ #include <mutex> #include <string> #include <thread> #include "external/StatsPullerManager.h" #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h" #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h" #include "packages/UidMap.h" Loading Loading @@ -51,14 +52,15 @@ namespace statsd { * with sizeof(size_t) bytes indicating the size of the proto message payload. * * The stream would be in the following format: * |size_t|atom1 proto|size_t|atom2 proto|.... * |size_t|shellData proto|size_t|shellData proto|.... * * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread * until it exits. */ class ShellSubscriber : public virtual IBinder::DeathRecipient { public: ShellSubscriber(sp<UidMap> uidMap) : mUidMap(uidMap){}; ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr) : mUidMap(uidMap), mPullerMgr(pullerMgr){}; /** * Start a new subscription. Loading @@ -70,15 +72,28 @@ public: void onLogEvent(const LogEvent& event); private: struct PullInfo { PullInfo(const SimpleAtomMatcher& matcher, int64_t interval) : mPullerMatcher(matcher), mInterval(interval), mPrevPullElapsedRealtimeMs(0) { } SimpleAtomMatcher mPullerMatcher; int64_t mInterval; int64_t mPrevPullElapsedRealtimeMs; }; void readConfig(int in); void updateConfig(const ShellSubscription& config); void startPull(int64_t token, int64_t intervalMillis); void cleanUpLocked(); void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher); sp<UidMap> mUidMap; // bool mWritten = false; sp<StatsPullerManager> mPullerMgr; android::util::ProtoOutputStream mProto; Loading @@ -93,6 +108,10 @@ private: sp<IResultReceiver> mResultReceiver; std::vector<SimpleAtomMatcher> mPushedMatchers; std::vector<PullInfo> mPulledInfo; int64_t mPullToken = 0; // A unique token to identify a puller thread. }; } // namespace statsd Loading cmds/statsd/src/shell/shell_config.proto +1 −1 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ option java_outer_classname = "ShellConfig"; import "frameworks/base/cmds/statsd/src/statsd_config.proto"; message PulledAtomSubscription { optional int32 atom_id = 1; optional SimpleAtomMatcher matcher = 1; /* gap between two pulls in milliseconds */ optional int32 freq_millis = 2; Loading Loading
cmds/statsd/Android.mk +1 −0 Original line number Diff line number Diff line Loading @@ -189,6 +189,7 @@ LOCAL_SRC_FILES := \ src/atom_field_options.proto \ src/atoms.proto \ src/stats_log.proto \ src/shell/shell_data.proto \ tests/AlarmMonitor_test.cpp \ tests/anomaly/AlarmTracker_test.cpp \ tests/anomaly/AnomalyTracker_test.cpp \ Loading
cmds/statsd/src/StatsService.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -319,7 +319,7 @@ status_t StatsService::command(int in, int out, int err, Vector<String8>& args, } if (!args[0].compare(String8("data-subscribe"))) { if (mShellSubscriber == nullptr) { mShellSubscriber = new ShellSubscriber(mUidMap); mShellSubscriber = new ShellSubscriber(mUidMap, mPullerManager); } mShellSubscriber->startNewSubscription(in, out, resultReceiver); return NO_ERROR; Loading
cmds/statsd/src/shell/ShellSubscriber.cpp +98 −10 Original line number Diff line number Diff line Loading @@ -18,9 +18,9 @@ #include "ShellSubscriber.h" #include "matchers/matcher_util.h" #include <android-base/file.h> #include "matchers/matcher_util.h" #include "stats_log_util.h" using android::util::ProtoOutputStream; Loading @@ -28,6 +28,8 @@ namespace android { namespace os { namespace statsd { const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver) { VLOG("start new shell subscription"); { Loading @@ -42,25 +44,106 @@ void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> IInterface::asBinder(mResultReceiver)->linkToDeath(this); } // Spawn another thread to read the config updates from the input file descriptor std::thread reader([in, this] { readConfig(in); }); reader.detach(); // Note that the following is blocking, and it's intended as we cannot return until the shell // cmd exits, otherwise all resources & FDs will be automatically closed. std::unique_lock<std::mutex> lk(mMutex); // Read config forever until EOF is reached. Clients may send multiple configs -- each new // config replace the previous one. readConfig(in); // Now we have read an EOF we now wait for the semaphore until the client exits. VLOG("Now wait for client to exit"); std::unique_lock<std::mutex> lk(mMutex); mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; }); if (reader.joinable()) { reader.join(); } } void ShellSubscriber::updateConfig(const ShellSubscription& config) { std::lock_guard<std::mutex> lock(mMutex); mPushedMatchers.clear(); mPulledInfo.clear(); for (const auto& pushed : config.pushed()) { mPushedMatchers.push_back(pushed); VLOG("adding matcher for atom %d", pushed.atom_id()); } int64_t token = getElapsedRealtimeNs(); mPullToken = token; int64_t minInterval = -1; for (const auto& pulled : config.pulled()) { // All intervals need to be multiples of the min interval. if (minInterval < 0 || pulled.freq_millis() < minInterval) { minInterval = pulled.freq_millis(); } mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); } if (mPulledInfo.size() > 0 && minInterval > 0) { // This thread is guaranteed to terminate after it detects the token is different or // cleaned up. std::thread puller([token, minInterval, this] { startPull(token, minInterval); }); puller.detach(); } } void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher) { if (mOutput == 0) return; int count = 0; mProto.clear(); for (const auto& event : data) { VLOG("%s", event->ToString().c_str()); if (matchesSimple(*mUidMap, matcher, *event)) { VLOG("matched"); count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event->ToProto(mProto); mProto.end(atomToken); } } if (count > 0) { // First write the payload size. size_t bufferSize = mProto.size(); write(mOutput, &bufferSize, sizeof(bufferSize)); VLOG("%d atoms, proto size: %zu", count, bufferSize); // Then write the payload. mProto.flush(mOutput); } mProto.clear(); } void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) { while (1) { int64_t nowMillis = getElapsedRealtimeMillis(); { std::lock_guard<std::mutex> lock(mMutex); if (mPulledInfo.size() == 0 || mPullToken != token) { VLOG("Pulling thread %lld done!", (long long)token); return; } for (auto& pullInfo : mPulledInfo) { if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id()); vector<std::shared_ptr<LogEvent>> data; mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), nowMillis * 1000000L, &data); VLOG("pulled %zu atoms", data.size()); if (data.size() > 0) { writeToOutputLocked(data, pullInfo.mPullerMatcher); } pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } } } VLOG("Pulling thread %lld sleep....", (long long)token); std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis)); } } void ShellSubscriber::readConfig(int in) { Loading Loading @@ -101,6 +184,8 @@ void ShellSubscriber::cleanUpLocked() { mOutput = 0; mResultReceiver = nullptr; mPushedMatchers.clear(); mPulledInfo.clear(); mPullToken = 0; VLOG("done clean up"); } Loading @@ -110,10 +195,13 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) { if (mOutput <= 0) { return; } for (const auto& matcher : mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { VLOG("%s", event.ToString().c_str()); uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); // First write the payload size. size_t bufferSize = mProto.size(); write(mOutput, &bufferSize, sizeof(bufferSize)); Loading
cmds/statsd/src/shell/ShellSubscriber.h +22 −3 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ #include <mutex> #include <string> #include <thread> #include "external/StatsPullerManager.h" #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h" #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h" #include "packages/UidMap.h" Loading Loading @@ -51,14 +52,15 @@ namespace statsd { * with sizeof(size_t) bytes indicating the size of the proto message payload. * * The stream would be in the following format: * |size_t|atom1 proto|size_t|atom2 proto|.... * |size_t|shellData proto|size_t|shellData proto|.... * * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread * until it exits. */ class ShellSubscriber : public virtual IBinder::DeathRecipient { public: ShellSubscriber(sp<UidMap> uidMap) : mUidMap(uidMap){}; ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr) : mUidMap(uidMap), mPullerMgr(pullerMgr){}; /** * Start a new subscription. Loading @@ -70,15 +72,28 @@ public: void onLogEvent(const LogEvent& event); private: struct PullInfo { PullInfo(const SimpleAtomMatcher& matcher, int64_t interval) : mPullerMatcher(matcher), mInterval(interval), mPrevPullElapsedRealtimeMs(0) { } SimpleAtomMatcher mPullerMatcher; int64_t mInterval; int64_t mPrevPullElapsedRealtimeMs; }; void readConfig(int in); void updateConfig(const ShellSubscription& config); void startPull(int64_t token, int64_t intervalMillis); void cleanUpLocked(); void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher); sp<UidMap> mUidMap; // bool mWritten = false; sp<StatsPullerManager> mPullerMgr; android::util::ProtoOutputStream mProto; Loading @@ -93,6 +108,10 @@ private: sp<IResultReceiver> mResultReceiver; std::vector<SimpleAtomMatcher> mPushedMatchers; std::vector<PullInfo> mPulledInfo; int64_t mPullToken = 0; // A unique token to identify a puller thread. }; } // namespace statsd Loading
cmds/statsd/src/shell/shell_config.proto +1 −1 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ option java_outer_classname = "ShellConfig"; import "frameworks/base/cmds/statsd/src/statsd_config.proto"; message PulledAtomSubscription { optional int32 atom_id = 1; optional SimpleAtomMatcher matcher = 1; /* gap between two pulls in milliseconds */ optional int32 freq_millis = 2; Loading