Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ca7fc3be authored by Ruchir Rastogi's avatar Ruchir Rastogi Committed by Android (Google) Code Review
Browse files

Merge "Fix ShellSubscriber concurrency issues" into rvc-dev

parents f67a780c 1e240516
Loading
Loading
Loading
Loading
+109 −81
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
#include "ShellSubscriber.h"

#include <android-base/file.h>

#include "matchers/matcher_util.h"
#include "stats_log_util.h"

@@ -32,14 +33,13 @@ const static int FIELD_ID_ATOM = 1;

void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
    int myToken = claimToken();
    VLOG("ShellSubscriber: new subscription %d has come in", myToken);
    mSubscriptionShouldEnd.notify_one();

    shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
    if (!readConfig(mySubscriptionInfo)) {
        return;
    }
    if (!readConfig(mySubscriptionInfo)) return;

    // critical-section
    {
        std::unique_lock<std::mutex> lock(mMutex);
        if (myToken != mToken) {
            // Some other subscription has already come in. Stop.
@@ -47,27 +47,39 @@ void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
        }
        mSubscriptionInfo = mySubscriptionInfo;

    if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) {
        // This thread terminates after it detects that mToken has changed.
        spawnHelperThreadsLocked(mySubscriptionInfo, myToken);
        waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);

        if (mSubscriptionInfo == mySubscriptionInfo) {
            mSubscriptionInfo = nullptr;
        }

    }
}

void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) {
    if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) {
        std::thread puller([this, myToken] { startPull(myToken); });
        puller.detach();
    }

    // Block until subscription has ended.
    std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); });
    heartbeatSender.detach();
}

void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
                                                     int myToken,
                                                     std::unique_lock<std::mutex>& lock,
                                                     int timeoutSec) {
    if (timeoutSec > 0) {
        mSubscriptionShouldEnd.wait_for(
                lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] {
                    return mToken != myToken || !mySubscriptionInfo->mClientAlive;
        mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] {
            return mToken != myToken || !myInfo->mClientAlive;
        });
    } else {
        mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] {
            return mToken != myToken || !mySubscriptionInfo->mClientAlive;
        mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] {
            return mToken != myToken || !myInfo->mClientAlive;
        });
    }

    if (mSubscriptionInfo == mySubscriptionInfo) {
        mSubscriptionInfo = nullptr;
    }
}

// Atomically claim the next token. Token numbers denote subscriber ordering.
@@ -129,51 +141,55 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo)
    return true;
}

void ShellSubscriber::startPull(int64_t myToken) {
void ShellSubscriber::startPull(int myToken) {
    VLOG("ShellSubscriber: pull thread %d starting", myToken);
    while (true) {
        {
            std::lock_guard<std::mutex> lock(mMutex);
            if (!mSubscriptionInfo || mToken != myToken) {
            VLOG("Pulling thread %lld done!", (long long)myToken);
                VLOG("ShellSubscriber: pulling thread %d done!", myToken);
                return;
            }

            int64_t nowMillis = getElapsedRealtimeMillis();
        for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) {
            if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
                vector<std::shared_ptr<LogEvent>> data;
                vector<int32_t> uids;
                uids.insert(uids.end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
                // This is slow. Consider storing the uids per app and listening to uidmap updates.
                for (const string& pkg : pullInfo.mPullPackages) {
                    set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
                    uids.insert(uids.end(), uidsForPkg.begin(), uidsForPkg.end());
            for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
                if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
                    continue;
                }
                uids.push_back(DEFAULT_PULL_UID);

                vector<int32_t> uids;
                getUidsForPullAtom(&uids, pullInfo);

                vector<std::shared_ptr<LogEvent>> data;
                mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data);
                VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
                VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
                writePulledAtomsLocked(data, pullInfo.mPullerMatcher);

                if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) {
                    mSubscriptionInfo->mClientAlive = false;
                    mSubscriptionShouldEnd.notify_one();
                    return;
                }
                pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
            }
        }

        VLOG("Pulling thread %lld sleep....", (long long)myToken);
        VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken,
             mSubscriptionInfo->mPullIntervalMin);
        std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
    }
}

// \return boolean indicating if writes were successful (will return false if
// client dies)
bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
    uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
    // This is slow. Consider storing the uids per app and listening to uidmap updates.
    for (const string& pkg : pullInfo.mPullPackages) {
        set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
        uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
    }
    uids->push_back(DEFAULT_PULL_UID);
}

void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
                                             const SimpleAtomMatcher& matcher) {
    mProto.clear();
    int count = 0;
    for (const auto& event : data) {
        VLOG("%s", event->ToString().c_str());
        if (matchesSimple(*mUidMap, matcher, *event)) {
            count++;
            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
@@ -183,55 +199,67 @@ bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve
        }
    }

    if (count > 0) {
        // First write the payload size.
        size_t bufferSize = mProto.size();
        if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
                                       sizeof(bufferSize))) {
            return false;
        }

        VLOG("%d atoms, proto size: %zu", count, bufferSize);
        // Then write the payload.
        if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
            return false;
        }
    }

    return true;
    if (count > 0) attemptWriteToSocketLocked(mProto.size());
}

void ShellSubscriber::onLogEvent(const LogEvent& event) {
    std::lock_guard<std::mutex> lock(mMutex);
    if (!mSubscriptionInfo) {
        return;
    }
    if (!mSubscriptionInfo) return;

    mProto.clear();
    for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
        if (matchesSimple(*mUidMap, matcher, event)) {
            VLOG("%s", event.ToString().c_str());
            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
            event.ToProto(mProto);
            mProto.end(atomToken);
            attemptWriteToSocketLocked(mProto.size());
        }
    }
}

// Tries to write the atom encoded in mProto to the socket. If the write fails
// because the read end of the pipe has closed, signals to other threads that
// the subscription should end.
void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) {
    // First write the payload size.
            size_t bufferSize = mProto.size();
            if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
                                           sizeof(bufferSize))) {
    if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
        mSubscriptionInfo->mClientAlive = false;
        mSubscriptionShouldEnd.notify_one();
        return;
    }

            // Then write the payload.
    if (dataSize == 0) return;

    // Then, write the payload.
    if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
        mSubscriptionInfo->mClientAlive = false;
        mSubscriptionShouldEnd.notify_one();
        return;
    }

    mLastWriteMs = getElapsedRealtimeMillis();
}

// Send a heartbeat, consisting solely of a data size of 0, if perfd has not
// recently received any writes from statsd. When it receives the data size of
// 0, perfd will not expect any data and recheck whether the shell command is
// still running.
void ShellSubscriber::sendHeartbeats(int myToken) {
    while (true) {
        {
            std::lock_guard<std::mutex> lock(mMutex);
            if (!mSubscriptionInfo || myToken != mToken) {
                VLOG("ShellSubscriber: heartbeat thread %d done!", myToken);
                return;
            }

            if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) {
                VLOG("ShellSubscriber: sending a heartbeat to perfd");
                attemptWriteToSocketLocked(/*dataSize=*/0);
            }
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats));
    }
}

+29 −7
Original line number Diff line number Diff line
@@ -38,11 +38,11 @@ namespace statsd {
 *
 * A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client
 * communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms.
 * The atoms are sent back to the client in real time, as opposed to
 * keeping the data in memory. Shell clients do not subscribe aggregated metrics, as they are
 * responsible for doing the aggregation after receiving the atom events.
 * The atoms are sent back to the client in real time, as opposed to keeping the data in memory.
 * Shell clients do not subscribe aggregated metrics, as they are responsible for doing the
 * aggregation after receiving the atom events.
 *
 * Shell client pass ShellSubscription in the proto binary format. Client can update the
 * Shell clients pass ShellSubscription in the proto binary format. Clients can update the
 * subscription by sending a new subscription. The new subscription would replace the old one.
 * Input data stream format is:
 *
@@ -54,7 +54,7 @@ namespace statsd {
 * The stream would be in the following format:
 * |size_t|shellData proto|size_t|shellData proto|....
 *
 * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
 * Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread
 * until it exits.
 */
class ShellSubscriber : public virtual RefBase {
@@ -100,11 +100,28 @@ private:

    bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);

    void startPull(int64_t myToken);
    void spawnHelperThreadsLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken);

    bool writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
    void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo,
                                        int myToken,
                                        std::unique_lock<std::mutex>& lock,
                                        int timeoutSec);

    void startPull(int myToken);

    void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
                                const SimpleAtomMatcher& matcher);

    void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo);

    void attemptWriteToSocketLocked(size_t dataSize);

    // Send ocassional heartbeats for two reasons: (a) for statsd to detect when
    // the read end of the pipe has closed and (b) for perfd to escape a
    // blocking read call and recheck if the user has terminated the
    // subscription.
    void sendHeartbeats(int myToken);

    sp<UidMap> mUidMap;

    sp<StatsPullerManager> mPullerMgr;
@@ -120,6 +137,11 @@ private:
    int mToken = 0;

    const int32_t DEFAULT_PULL_UID = AID_SYSTEM;

    // Tracks when we last send data to perfd. We need that time to determine
    // when next to send a heartbeat.
    int64_t mLastWriteMs = 0;
    const int64_t kMsBetweenHeartbeats = 1000;
};

}  // namespace statsd
+26 −20
Original line number Diff line number Diff line
@@ -86,28 +86,34 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap,
    // wait for the data to be written.
    std::this_thread::sleep_for(100ms);

    int expected_data_size = expectedData.ByteSize();

    // now read from the pipe. firstly read the atom size.
    // Because we might receive heartbeats from statsd, consisting of data sizes
    // of 0, encapsulate reads within a while loop.
    bool readAtom = false;
    while (!readAtom) {
        // Read the atom size.
        size_t dataSize = 0;
    EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize)));

    EXPECT_EQ(expected_data_size, (int)dataSize);
        read(fds_data[0], &dataSize, sizeof(dataSize));
        if (dataSize == 0) continue;
        EXPECT_EQ(expectedData.ByteSize(), int(dataSize));

    // then read that much data which is the atom in proto binary format
        // Read that much data in proto binary format.
        vector<uint8_t> dataBuffer(dataSize);
        EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize));

    // make sure the received bytes can be parsed to an atom
        // Make sure the received bytes can be parsed to an atom.
        ShellData receivedAtom;
        EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0);

    // serialze the expected atom to bytes. and compare. to make sure they are the same.
    vector<uint8_t> atomBuffer(expected_data_size);
    expectedData.SerializeToArray(&atomBuffer[0], expected_data_size);
    EXPECT_EQ(atomBuffer, dataBuffer);
    close(fds_data[0]);
        // Serialize the expected atom to byte array and compare to make sure
        // they are the same.
        vector<uint8_t> expectedAtomBuffer(expectedData.ByteSize());
        expectedData.SerializeToArray(expectedAtomBuffer.data(), expectedData.ByteSize());
        EXPECT_EQ(expectedAtomBuffer, dataBuffer);

        readAtom = true;
    }

    close(fds_data[0]);
    if (reader.joinable()) {
        reader.join();
    }