Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 59080f66 authored by Ruchir Rastogi's avatar Ruchir Rastogi Committed by Automerger Merge Worker
Browse files

Merge "Fix ShellSubscriber concurrency issues" into rvc-dev am: ca7fc3be am:...

Merge "Fix ShellSubscriber concurrency issues" into rvc-dev am: ca7fc3be am: 0a60d240 am: f283d69d

Change-Id: Idd66ecb29489169eaff4e4cc0d6d4ed9f19ba67f
parents 76f94be2 f283d69d
Loading
Loading
Loading
Loading
+109 −81
Original line number Original line Diff line number Diff line
@@ -19,6 +19,7 @@
#include "ShellSubscriber.h"
#include "ShellSubscriber.h"


#include <android-base/file.h>
#include <android-base/file.h>

#include "matchers/matcher_util.h"
#include "matchers/matcher_util.h"
#include "stats_log_util.h"
#include "stats_log_util.h"


@@ -32,14 +33,13 @@ const static int FIELD_ID_ATOM = 1;


void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
    int myToken = claimToken();
    int myToken = claimToken();
    VLOG("ShellSubscriber: new subscription %d has come in", myToken);
    mSubscriptionShouldEnd.notify_one();
    mSubscriptionShouldEnd.notify_one();


    shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
    shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
    if (!readConfig(mySubscriptionInfo)) {
    if (!readConfig(mySubscriptionInfo)) return;
        return;
    }


    // critical-section
    {
        std::unique_lock<std::mutex> lock(mMutex);
        std::unique_lock<std::mutex> lock(mMutex);
        if (myToken != mToken) {
        if (myToken != mToken) {
            // Some other subscription has already come in. Stop.
            // Some other subscription has already come in. Stop.
@@ -47,27 +47,39 @@ void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
        }
        }
        mSubscriptionInfo = mySubscriptionInfo;
        mSubscriptionInfo = mySubscriptionInfo;


    if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) {
        spawnHelperThreadsLocked(mySubscriptionInfo, myToken);
        // This thread terminates after it detects that mToken has changed.
        waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);

        if (mSubscriptionInfo == mySubscriptionInfo) {
            mSubscriptionInfo = nullptr;
        }

    }
}

void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) {
    if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) {
        std::thread puller([this, myToken] { startPull(myToken); });
        std::thread puller([this, myToken] { startPull(myToken); });
        puller.detach();
        puller.detach();
    }
    }


    // Block until subscription has ended.
    std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); });
    heartbeatSender.detach();
}

void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
                                                     int myToken,
                                                     std::unique_lock<std::mutex>& lock,
                                                     int timeoutSec) {
    if (timeoutSec > 0) {
    if (timeoutSec > 0) {
        mSubscriptionShouldEnd.wait_for(
        mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] {
                lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] {
            return mToken != myToken || !myInfo->mClientAlive;
                    return mToken != myToken || !mySubscriptionInfo->mClientAlive;
        });
        });
    } else {
    } else {
        mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] {
        mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] {
            return mToken != myToken || !mySubscriptionInfo->mClientAlive;
            return mToken != myToken || !myInfo->mClientAlive;
        });
        });
    }
    }

    if (mSubscriptionInfo == mySubscriptionInfo) {
        mSubscriptionInfo = nullptr;
    }
}
}


// Atomically claim the next token. Token numbers denote subscriber ordering.
// Atomically claim the next token. Token numbers denote subscriber ordering.
@@ -129,51 +141,55 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo)
    return true;
    return true;
}
}


void ShellSubscriber::startPull(int64_t myToken) {
void ShellSubscriber::startPull(int myToken) {
    VLOG("ShellSubscriber: pull thread %d starting", myToken);
    while (true) {
    while (true) {
        {
            std::lock_guard<std::mutex> lock(mMutex);
            std::lock_guard<std::mutex> lock(mMutex);
            if (!mSubscriptionInfo || mToken != myToken) {
            if (!mSubscriptionInfo || mToken != myToken) {
            VLOG("Pulling thread %lld done!", (long long)myToken);
                VLOG("ShellSubscriber: pulling thread %d done!", myToken);
                return;
                return;
            }
            }


            int64_t nowMillis = getElapsedRealtimeMillis();
            int64_t nowMillis = getElapsedRealtimeMillis();
        for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) {
            for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
            if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
                if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
                vector<std::shared_ptr<LogEvent>> data;
                    continue;
                vector<int32_t> uids;
                uids.insert(uids.end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
                // This is slow. Consider storing the uids per app and listening to uidmap updates.
                for (const string& pkg : pullInfo.mPullPackages) {
                    set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
                    uids.insert(uids.end(), uidsForPkg.begin(), uidsForPkg.end());
                }
                }
                uids.push_back(DEFAULT_PULL_UID);

                vector<int32_t> uids;
                getUidsForPullAtom(&uids, pullInfo);

                vector<std::shared_ptr<LogEvent>> data;
                mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data);
                mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data);
                VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
                VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
                writePulledAtomsLocked(data, pullInfo.mPullerMatcher);


                if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) {
                    mSubscriptionInfo->mClientAlive = false;
                    mSubscriptionShouldEnd.notify_one();
                    return;
                }
                pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
                pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
            }
            }
        }
        }


        VLOG("Pulling thread %lld sleep....", (long long)myToken);
        VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken,
             mSubscriptionInfo->mPullIntervalMin);
        std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
        std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
    }
    }
}
}


// \return boolean indicating if writes were successful (will return false if
void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
// client dies)
    uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
    // This is slow. Consider storing the uids per app and listening to uidmap updates.
    for (const string& pkg : pullInfo.mPullPackages) {
        set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
        uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
    }
    uids->push_back(DEFAULT_PULL_UID);
}

void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
                                             const SimpleAtomMatcher& matcher) {
                                             const SimpleAtomMatcher& matcher) {
    mProto.clear();
    mProto.clear();
    int count = 0;
    int count = 0;
    for (const auto& event : data) {
    for (const auto& event : data) {
        VLOG("%s", event->ToString().c_str());
        if (matchesSimple(*mUidMap, matcher, *event)) {
        if (matchesSimple(*mUidMap, matcher, *event)) {
            count++;
            count++;
            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
@@ -183,55 +199,67 @@ bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve
        }
        }
    }
    }


    if (count > 0) {
    if (count > 0) attemptWriteToSocketLocked(mProto.size());
        // First write the payload size.
        size_t bufferSize = mProto.size();
        if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
                                       sizeof(bufferSize))) {
            return false;
        }

        VLOG("%d atoms, proto size: %zu", count, bufferSize);
        // Then write the payload.
        if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
            return false;
        }
    }

    return true;
}
}


void ShellSubscriber::onLogEvent(const LogEvent& event) {
void ShellSubscriber::onLogEvent(const LogEvent& event) {
    std::lock_guard<std::mutex> lock(mMutex);
    std::lock_guard<std::mutex> lock(mMutex);
    if (!mSubscriptionInfo) {
    if (!mSubscriptionInfo) return;
        return;
    }


    mProto.clear();
    mProto.clear();
    for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
    for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
        if (matchesSimple(*mUidMap, matcher, event)) {
        if (matchesSimple(*mUidMap, matcher, event)) {
            VLOG("%s", event.ToString().c_str());
            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
            event.ToProto(mProto);
            event.ToProto(mProto);
            mProto.end(atomToken);
            mProto.end(atomToken);
            attemptWriteToSocketLocked(mProto.size());
        }
    }
}


// Tries to write the atom encoded in mProto to the socket. If the write fails
// because the read end of the pipe has closed, signals to other threads that
// the subscription should end.
void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) {
    // First write the payload size.
    // First write the payload size.
            size_t bufferSize = mProto.size();
    if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
            if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
                                           sizeof(bufferSize))) {
        mSubscriptionInfo->mClientAlive = false;
        mSubscriptionInfo->mClientAlive = false;
        mSubscriptionShouldEnd.notify_one();
        mSubscriptionShouldEnd.notify_one();
        return;
        return;
    }
    }


            // Then write the payload.
    if (dataSize == 0) return;

    // Then, write the payload.
    if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
    if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
        mSubscriptionInfo->mClientAlive = false;
        mSubscriptionInfo->mClientAlive = false;
        mSubscriptionShouldEnd.notify_one();
        mSubscriptionShouldEnd.notify_one();
        return;
        return;
    }
    }

    mLastWriteMs = getElapsedRealtimeMillis();
}

// Send a heartbeat, consisting solely of a data size of 0, if perfd has not
// recently received any writes from statsd. When it receives the data size of
// 0, perfd will not expect any data and recheck whether the shell command is
// still running.
void ShellSubscriber::sendHeartbeats(int myToken) {
    while (true) {
        {
            std::lock_guard<std::mutex> lock(mMutex);
            if (!mSubscriptionInfo || myToken != mToken) {
                VLOG("ShellSubscriber: heartbeat thread %d done!", myToken);
                return;
            }

            if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) {
                VLOG("ShellSubscriber: sending a heartbeat to perfd");
                attemptWriteToSocketLocked(/*dataSize=*/0);
            }
        }
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats));
    }
    }
}
}


+29 −7
Original line number Original line Diff line number Diff line
@@ -38,11 +38,11 @@ namespace statsd {
 *
 *
 * A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client
 * A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client
 * communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms.
 * communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms.
 * The atoms are sent back to the client in real time, as opposed to
 * The atoms are sent back to the client in real time, as opposed to keeping the data in memory.
 * keeping the data in memory. Shell clients do not subscribe aggregated metrics, as they are
 * Shell clients do not subscribe aggregated metrics, as they are responsible for doing the
 * responsible for doing the aggregation after receiving the atom events.
 * aggregation after receiving the atom events.
 *
 *
 * Shell client pass ShellSubscription in the proto binary format. Client can update the
 * Shell clients pass ShellSubscription in the proto binary format. Clients can update the
 * subscription by sending a new subscription. The new subscription would replace the old one.
 * subscription by sending a new subscription. The new subscription would replace the old one.
 * Input data stream format is:
 * Input data stream format is:
 *
 *
@@ -54,7 +54,7 @@ namespace statsd {
 * The stream would be in the following format:
 * The stream would be in the following format:
 * |size_t|shellData proto|size_t|shellData proto|....
 * |size_t|shellData proto|size_t|shellData proto|....
 *
 *
 * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
 * Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread
 * until it exits.
 * until it exits.
 */
 */
class ShellSubscriber : public virtual RefBase {
class ShellSubscriber : public virtual RefBase {
@@ -100,11 +100,28 @@ private:


    bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);
    bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);


    void startPull(int64_t myToken);
    void spawnHelperThreadsLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken);


    bool writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
    void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo,
                                        int myToken,
                                        std::unique_lock<std::mutex>& lock,
                                        int timeoutSec);

    void startPull(int myToken);

    void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
                                const SimpleAtomMatcher& matcher);
                                const SimpleAtomMatcher& matcher);


    void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo);

    void attemptWriteToSocketLocked(size_t dataSize);

    // Send ocassional heartbeats for two reasons: (a) for statsd to detect when
    // the read end of the pipe has closed and (b) for perfd to escape a
    // blocking read call and recheck if the user has terminated the
    // subscription.
    void sendHeartbeats(int myToken);

    sp<UidMap> mUidMap;
    sp<UidMap> mUidMap;


    sp<StatsPullerManager> mPullerMgr;
    sp<StatsPullerManager> mPullerMgr;
@@ -120,6 +137,11 @@ private:
    int mToken = 0;
    int mToken = 0;


    const int32_t DEFAULT_PULL_UID = AID_SYSTEM;
    const int32_t DEFAULT_PULL_UID = AID_SYSTEM;

    // Tracks when we last send data to perfd. We need that time to determine
    // when next to send a heartbeat.
    int64_t mLastWriteMs = 0;
    const int64_t kMsBetweenHeartbeats = 1000;
};
};


}  // namespace statsd
}  // namespace statsd
+26 −20
Original line number Original line Diff line number Diff line
@@ -86,28 +86,34 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap,
    // wait for the data to be written.
    // wait for the data to be written.
    std::this_thread::sleep_for(100ms);
    std::this_thread::sleep_for(100ms);


    int expected_data_size = expectedData.ByteSize();
    // Because we might receive heartbeats from statsd, consisting of data sizes

    // of 0, encapsulate reads within a while loop.
    // now read from the pipe. firstly read the atom size.
    bool readAtom = false;
    while (!readAtom) {
        // Read the atom size.
        size_t dataSize = 0;
        size_t dataSize = 0;
    EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize)));
        read(fds_data[0], &dataSize, sizeof(dataSize));

        if (dataSize == 0) continue;
    EXPECT_EQ(expected_data_size, (int)dataSize);
        EXPECT_EQ(expectedData.ByteSize(), int(dataSize));


    // then read that much data which is the atom in proto binary format
        // Read that much data in proto binary format.
        vector<uint8_t> dataBuffer(dataSize);
        vector<uint8_t> dataBuffer(dataSize);
        EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize));
        EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize));


    // make sure the received bytes can be parsed to an atom
        // Make sure the received bytes can be parsed to an atom.
        ShellData receivedAtom;
        ShellData receivedAtom;
        EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0);
        EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0);


    // serialze the expected atom to bytes. and compare. to make sure they are the same.
        // Serialize the expected atom to byte array and compare to make sure
    vector<uint8_t> atomBuffer(expected_data_size);
        // they are the same.
    expectedData.SerializeToArray(&atomBuffer[0], expected_data_size);
        vector<uint8_t> expectedAtomBuffer(expectedData.ByteSize());
    EXPECT_EQ(atomBuffer, dataBuffer);
        expectedData.SerializeToArray(expectedAtomBuffer.data(), expectedData.ByteSize());
    close(fds_data[0]);
        EXPECT_EQ(expectedAtomBuffer, dataBuffer);


        readAtom = true;
    }

    close(fds_data[0]);
    if (reader.joinable()) {
    if (reader.joinable()) {
        reader.join();
        reader.join();
    }
    }