Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 835e75c6 authored by Ruchir Rastogi's avatar Ruchir Rastogi
Browse files

Combine startPull and sendHeartbeat threads

This ensures that only one helper thread is created per subscription.
Previously, there could be up to two.

+ fixes thread sleep duration if the pulled atoms have different pull
frequencies
+ rename attemptWriteToSocketLocked to attemptWriteToPipeLocked

Test: atest statsd_test
Test: atest CtsStatsdHostTestCases:ShellSubscriberTest
Test: manual testing on Android Studio
Bug: 156678125
Change-Id: I7074bbba5981a591a30e8b70a1ad1d83eadfcc30
parent eafbb325
Loading
Loading
Loading
Loading
+34 −58
Original line number Diff line number Diff line
@@ -41,13 +41,8 @@ void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {

    {
        std::unique_lock<std::mutex> lock(mMutex);
        if (myToken != mToken) {
            // Some other subscription has already come in. Stop.
            return;
        }
        mSubscriptionInfo = mySubscriptionInfo;

        spawnHelperThreadsLocked(mySubscriptionInfo, myToken);
        spawnHelperThread(myToken);
        waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);

        if (mSubscriptionInfo == mySubscriptionInfo) {
@@ -57,14 +52,9 @@ void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
    }
}

void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) {
    if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) {
        std::thread puller([this, myToken] { startPull(myToken); });
        puller.detach();
    }

    std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); });
    heartbeatSender.detach();
void ShellSubscriber::spawnHelperThread(int myToken) {
    std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); });
    t.detach();
}

void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
@@ -114,13 +104,7 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo)
        subscriptionInfo->mPushedMatchers.push_back(pushed);
    }

    int minInterval = -1;
    for (const auto& pulled : config.pulled()) {
        // All intervals need to be multiples of the min interval.
        if (minInterval < 0 || pulled.freq_millis() < minInterval) {
            minInterval = pulled.freq_millis();
        }

        vector<string> packages;
        vector<int32_t> uids;
        for (const string& pkg : pulled.packages()) {
@@ -136,18 +120,18 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo)
                                                   uids);
        VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
    }
    subscriptionInfo->mPullIntervalMin = minInterval;

    return true;
}

void ShellSubscriber::startPull(int myToken) {
    VLOG("ShellSubscriber: pull thread %d starting", myToken);
void ShellSubscriber::pullAndSendHeartbeats(int myToken) {
    VLOG("ShellSubscriber: helper thread %d starting", myToken);
    while (true) {
        int64_t sleepTimeMs = INT_MAX;
        {
            std::lock_guard<std::mutex> lock(mMutex);
            if (!mSubscriptionInfo || mToken != myToken) {
                VLOG("ShellSubscriber: pulling thread %d done!", myToken);
                VLOG("ShellSubscriber: helper thread %d done!", myToken);
                return;
            }

@@ -168,11 +152,27 @@ void ShellSubscriber::startPull(int myToken) {

                pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
            }

            // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received
            // data from statsd. When it receives the data size of 0, perfd will not expect any
            // atoms and recheck whether the subscription should end.
            if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) {
                attemptWriteToPipeLocked(/*dataSize=*/0);
            }

            // Determine how long to sleep before doing more work.
            for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
                int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval;
                int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative
                if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull;
            }
            int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis;
            if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat;
        }

        VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken,
             mSubscriptionInfo->mPullIntervalMin);
        std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
        VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken,
             (long long)sleepTimeMs);
        std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs));
    }
}

@@ -200,7 +200,7 @@ void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve
        }
    }

    if (count > 0) attemptWriteToSocketLocked(mProto.size());
    if (count > 0) attemptWriteToPipeLocked(mProto.size());
}

void ShellSubscriber::onLogEvent(const LogEvent& event) {
@@ -214,26 +214,24 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) {
                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
            event.ToProto(mProto);
            mProto.end(atomToken);
            attemptWriteToSocketLocked(mProto.size());
            attemptWriteToPipeLocked(mProto.size());
        }
    }
}

// Tries to write the atom encoded in mProto to the socket. If the write fails
// Tries to write the atom encoded in mProto to the pipe. If the write fails
// because the read end of the pipe has closed, signals to other threads that
// the subscription should end.
void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) {
    // First write the payload size.
void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) {
    // First, write the payload size.
    if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
        mSubscriptionInfo->mClientAlive = false;
        mSubscriptionShouldEnd.notify_one();
        return;
    }

    if (dataSize == 0) return;

    // Then, write the payload.
    if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
    // Then, write the payload if this is not just a heartbeat.
    if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) {
        mSubscriptionInfo->mClientAlive = false;
        mSubscriptionShouldEnd.notify_one();
        return;
@@ -242,28 +240,6 @@ void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) {
    mLastWriteMs = getElapsedRealtimeMillis();
}

// Send a heartbeat, consisting solely of a data size of 0, if perfd has not
// recently received any writes from statsd. When it receives the data size of
// 0, perfd will not expect any data and recheck whether the shell command is
// still running.
void ShellSubscriber::sendHeartbeats(int myToken) {
    while (true) {
        {
            std::lock_guard<std::mutex> lock(mMutex);
            if (!mSubscriptionInfo || myToken != mToken) {
                VLOG("ShellSubscriber: heartbeat thread %d done!", myToken);
                return;
            }

            if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) {
                VLOG("ShellSubscriber: sending a heartbeat to perfd");
                attemptWriteToSocketLocked(/*dataSize=*/0);
            }
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats));
    }
}

}  // namespace statsd
}  // namespace os
}  // namespace android
+7 −10
Original line number Diff line number Diff line
@@ -92,7 +92,6 @@ private:
        int mOutputFd;
        std::vector<SimpleAtomMatcher> mPushedMatchers;
        std::vector<PullInfo> mPulledInfo;
        int mPullIntervalMin;
        bool mClientAlive;
    };

@@ -100,27 +99,25 @@ private:

    bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);

    void spawnHelperThreadsLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken);
    void spawnHelperThread(int myToken);

    void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo,
                                        int myToken,
                                        std::unique_lock<std::mutex>& lock,
                                        int timeoutSec);

    void startPull(int myToken);
    // Helper thread that pulls atoms at a regular frequency and sends
    // heartbeats to perfd if statsd hasn't recently sent any data. Statsd must
    // send heartbeats for perfd to escape a blocking read call and recheck if
    // the user has terminated the subscription.
    void pullAndSendHeartbeats(int myToken);

    void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
                                const SimpleAtomMatcher& matcher);

    void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo);

    void attemptWriteToSocketLocked(size_t dataSize);

    // Send ocassional heartbeats for two reasons: (a) for statsd to detect when
    // the read end of the pipe has closed and (b) for perfd to escape a
    // blocking read call and recheck if the user has terminated the
    // subscription.
    void sendHeartbeats(int myToken);
    void attemptWriteToPipeLocked(size_t dataSize);

    sp<UidMap> mUidMap;