Loading cmds/statsd/src/shell/ShellSubscriber.cpp +34 −58 Original line number Diff line number Diff line Loading @@ -41,13 +41,8 @@ void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { { std::unique_lock<std::mutex> lock(mMutex); if (myToken != mToken) { // Some other subscription has already come in. Stop. return; } mSubscriptionInfo = mySubscriptionInfo; spawnHelperThreadsLocked(mySubscriptionInfo, myToken); spawnHelperThread(myToken); waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec); if (mSubscriptionInfo == mySubscriptionInfo) { Loading @@ -57,14 +52,9 @@ void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { } } void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) { if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) { std::thread puller([this, myToken] { startPull(myToken); }); puller.detach(); } std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); }); heartbeatSender.detach(); void ShellSubscriber::spawnHelperThread(int myToken) { std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); }); t.detach(); } void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo, Loading Loading @@ -114,13 +104,7 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) subscriptionInfo->mPushedMatchers.push_back(pushed); } int minInterval = -1; for (const auto& pulled : config.pulled()) { // All intervals need to be multiples of the min interval. if (minInterval < 0 || pulled.freq_millis() < minInterval) { minInterval = pulled.freq_millis(); } vector<string> packages; vector<int32_t> uids; for (const string& pkg : pulled.packages()) { Loading @@ -136,18 +120,18 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) uids); VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); } subscriptionInfo->mPullIntervalMin = minInterval; return true; } void ShellSubscriber::startPull(int myToken) { VLOG("ShellSubscriber: pull thread %d starting", myToken); void ShellSubscriber::pullAndSendHeartbeats(int myToken) { VLOG("ShellSubscriber: helper thread %d starting", myToken); while (true) { int64_t sleepTimeMs = INT_MAX; { std::lock_guard<std::mutex> lock(mMutex); if (!mSubscriptionInfo || mToken != myToken) { VLOG("ShellSubscriber: pulling thread %d done!", myToken); VLOG("ShellSubscriber: helper thread %d done!", myToken); return; } Loading @@ -168,11 +152,27 @@ void ShellSubscriber::startPull(int myToken) { pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received // data from statsd. When it receives the data size of 0, perfd will not expect any // atoms and recheck whether the subscription should end. if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) { attemptWriteToPipeLocked(/*dataSize=*/0); } // Determine how long to sleep before doing more work. for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval; int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull; } int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis; if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat; } VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken, mSubscriptionInfo->mPullIntervalMin); std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken, (long long)sleepTimeMs); std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs)); } } Loading Loading @@ -200,7 +200,7 @@ void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve } } if (count > 0) attemptWriteToSocketLocked(mProto.size()); if (count > 0) attemptWriteToPipeLocked(mProto.size()); } void ShellSubscriber::onLogEvent(const LogEvent& event) { Loading @@ -214,26 +214,24 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) { util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); attemptWriteToSocketLocked(mProto.size()); attemptWriteToPipeLocked(mProto.size()); } } } // Tries to write the atom encoded in mProto to the socket. If the write fails // Tries to write the atom encoded in mProto to the pipe. If the write fails // because the read end of the pipe has closed, signals to other threads that // the subscription should end. void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) { // First write the payload size. void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) { // First, write the payload size. if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } if (dataSize == 0) return; // Then, write the payload. if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { // Then, write the payload if this is not just a heartbeat. if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; Loading @@ -242,28 +240,6 @@ void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) { mLastWriteMs = getElapsedRealtimeMillis(); } // Send a heartbeat, consisting solely of a data size of 0, if perfd has not // recently received any writes from statsd. When it receives the data size of // 0, perfd will not expect any data and recheck whether the shell command is // still running. void ShellSubscriber::sendHeartbeats(int myToken) { while (true) { { std::lock_guard<std::mutex> lock(mMutex); if (!mSubscriptionInfo || myToken != mToken) { VLOG("ShellSubscriber: heartbeat thread %d done!", myToken); return; } if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) { VLOG("ShellSubscriber: sending a heartbeat to perfd"); attemptWriteToSocketLocked(/*dataSize=*/0); } } std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats)); } } } // namespace statsd } // namespace os } // namespace android cmds/statsd/src/shell/ShellSubscriber.h +7 −10 Original line number Diff line number Diff line Loading @@ -92,7 +92,6 @@ private: int mOutputFd; std::vector<SimpleAtomMatcher> mPushedMatchers; std::vector<PullInfo> mPulledInfo; int mPullIntervalMin; bool mClientAlive; }; Loading @@ -100,27 +99,25 @@ private: bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo); void spawnHelperThreadsLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken); void spawnHelperThread(int myToken); void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken, std::unique_lock<std::mutex>& lock, int timeoutSec); void startPull(int myToken); // Helper thread that pulls atoms at a regular frequency and sends // heartbeats to perfd if statsd hasn't recently sent any data. Statsd must // send heartbeats for perfd to escape a blocking read call and recheck if // the user has terminated the subscription. void pullAndSendHeartbeats(int myToken); void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher); void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo); void attemptWriteToSocketLocked(size_t dataSize); // Send ocassional heartbeats for two reasons: (a) for statsd to detect when // the read end of the pipe has closed and (b) for perfd to escape a // blocking read call and recheck if the user has terminated the // subscription. void sendHeartbeats(int myToken); void attemptWriteToPipeLocked(size_t dataSize); sp<UidMap> mUidMap; Loading Loading
cmds/statsd/src/shell/ShellSubscriber.cpp +34 −58 Original line number Diff line number Diff line Loading @@ -41,13 +41,8 @@ void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { { std::unique_lock<std::mutex> lock(mMutex); if (myToken != mToken) { // Some other subscription has already come in. Stop. return; } mSubscriptionInfo = mySubscriptionInfo; spawnHelperThreadsLocked(mySubscriptionInfo, myToken); spawnHelperThread(myToken); waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec); if (mSubscriptionInfo == mySubscriptionInfo) { Loading @@ -57,14 +52,9 @@ void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { } } void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) { if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) { std::thread puller([this, myToken] { startPull(myToken); }); puller.detach(); } std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); }); heartbeatSender.detach(); void ShellSubscriber::spawnHelperThread(int myToken) { std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); }); t.detach(); } void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo, Loading Loading @@ -114,13 +104,7 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) subscriptionInfo->mPushedMatchers.push_back(pushed); } int minInterval = -1; for (const auto& pulled : config.pulled()) { // All intervals need to be multiples of the min interval. if (minInterval < 0 || pulled.freq_millis() < minInterval) { minInterval = pulled.freq_millis(); } vector<string> packages; vector<int32_t> uids; for (const string& pkg : pulled.packages()) { Loading @@ -136,18 +120,18 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) uids); VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); } subscriptionInfo->mPullIntervalMin = minInterval; return true; } void ShellSubscriber::startPull(int myToken) { VLOG("ShellSubscriber: pull thread %d starting", myToken); void ShellSubscriber::pullAndSendHeartbeats(int myToken) { VLOG("ShellSubscriber: helper thread %d starting", myToken); while (true) { int64_t sleepTimeMs = INT_MAX; { std::lock_guard<std::mutex> lock(mMutex); if (!mSubscriptionInfo || mToken != myToken) { VLOG("ShellSubscriber: pulling thread %d done!", myToken); VLOG("ShellSubscriber: helper thread %d done!", myToken); return; } Loading @@ -168,11 +152,27 @@ void ShellSubscriber::startPull(int myToken) { pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received // data from statsd. When it receives the data size of 0, perfd will not expect any // atoms and recheck whether the subscription should end. if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) { attemptWriteToPipeLocked(/*dataSize=*/0); } // Determine how long to sleep before doing more work. for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval; int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull; } int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis; if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat; } VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken, mSubscriptionInfo->mPullIntervalMin); std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken, (long long)sleepTimeMs); std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs)); } } Loading Loading @@ -200,7 +200,7 @@ void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve } } if (count > 0) attemptWriteToSocketLocked(mProto.size()); if (count > 0) attemptWriteToPipeLocked(mProto.size()); } void ShellSubscriber::onLogEvent(const LogEvent& event) { Loading @@ -214,26 +214,24 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) { util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); attemptWriteToSocketLocked(mProto.size()); attemptWriteToPipeLocked(mProto.size()); } } } // Tries to write the atom encoded in mProto to the socket. If the write fails // Tries to write the atom encoded in mProto to the pipe. If the write fails // because the read end of the pipe has closed, signals to other threads that // the subscription should end. void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) { // First write the payload size. void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) { // First, write the payload size. if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } if (dataSize == 0) return; // Then, write the payload. if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { // Then, write the payload if this is not just a heartbeat. if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; Loading @@ -242,28 +240,6 @@ void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) { mLastWriteMs = getElapsedRealtimeMillis(); } // Send a heartbeat, consisting solely of a data size of 0, if perfd has not // recently received any writes from statsd. When it receives the data size of // 0, perfd will not expect any data and recheck whether the shell command is // still running. void ShellSubscriber::sendHeartbeats(int myToken) { while (true) { { std::lock_guard<std::mutex> lock(mMutex); if (!mSubscriptionInfo || myToken != mToken) { VLOG("ShellSubscriber: heartbeat thread %d done!", myToken); return; } if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) { VLOG("ShellSubscriber: sending a heartbeat to perfd"); attemptWriteToSocketLocked(/*dataSize=*/0); } } std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats)); } } } // namespace statsd } // namespace os } // namespace android
cmds/statsd/src/shell/ShellSubscriber.h +7 −10 Original line number Diff line number Diff line Loading @@ -92,7 +92,6 @@ private: int mOutputFd; std::vector<SimpleAtomMatcher> mPushedMatchers; std::vector<PullInfo> mPulledInfo; int mPullIntervalMin; bool mClientAlive; }; Loading @@ -100,27 +99,25 @@ private: bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo); void spawnHelperThreadsLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken); void spawnHelperThread(int myToken); void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken, std::unique_lock<std::mutex>& lock, int timeoutSec); void startPull(int myToken); // Helper thread that pulls atoms at a regular frequency and sends // heartbeats to perfd if statsd hasn't recently sent any data. Statsd must // send heartbeats for perfd to escape a blocking read call and recheck if // the user has terminated the subscription. void pullAndSendHeartbeats(int myToken); void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher); void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo); void attemptWriteToSocketLocked(size_t dataSize); // Send ocassional heartbeats for two reasons: (a) for statsd to detect when // the read end of the pipe has closed and (b) for perfd to escape a // blocking read call and recheck if the user has terminated the // subscription. void sendHeartbeats(int myToken); void attemptWriteToPipeLocked(size_t dataSize); sp<UidMap> mUidMap; Loading