Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 41e606c1 authored by Yao Chen's avatar Yao Chen
Browse files

Add pulled atom subscription for shell.

+ Changed the output format from Atom to ShellData, which is a wrapper for repeated Atom
  This is useful because pulled atoms are usually a list of atoms.

Test: statsd_test added
Bug: 110536553

Change-Id: I0e2f55bdd9015c9bc95b87a630297c6f13e39636
parent 002f63d0
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -189,6 +189,7 @@ LOCAL_SRC_FILES := \
    src/atom_field_options.proto \
    src/atoms.proto \
    src/stats_log.proto \
    src/shell/shell_data.proto \
    tests/AlarmMonitor_test.cpp \
    tests/anomaly/AlarmTracker_test.cpp \
    tests/anomaly/AnomalyTracker_test.cpp \
+1 −1
Original line number Diff line number Diff line
@@ -319,7 +319,7 @@ status_t StatsService::command(int in, int out, int err, Vector<String8>& args,
        }
        if (!args[0].compare(String8("data-subscribe"))) {
            if (mShellSubscriber == nullptr) {
                mShellSubscriber = new ShellSubscriber(mUidMap);
                mShellSubscriber = new ShellSubscriber(mUidMap, mPullerManager);
            }
            mShellSubscriber->startNewSubscription(in, out, resultReceiver);
            return NO_ERROR;
+98 −10
Original line number Diff line number Diff line
@@ -18,9 +18,9 @@

#include "ShellSubscriber.h"

#include "matchers/matcher_util.h"

#include <android-base/file.h>
#include "matchers/matcher_util.h"
#include "stats_log_util.h"

using android::util::ProtoOutputStream;

@@ -28,6 +28,8 @@ namespace android {
namespace os {
namespace statsd {

const static int FIELD_ID_ATOM = 1;

void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver) {
    VLOG("start new shell subscription");
    {
@@ -42,25 +44,106 @@ void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver>
        IInterface::asBinder(mResultReceiver)->linkToDeath(this);
    }

    // Spawn another thread to read the config updates from the input file descriptor
    std::thread reader([in, this] { readConfig(in); });
    reader.detach();
    // Note that the following is blocking, and it's intended as we cannot return until the shell
    // cmd exits, otherwise all resources & FDs will be automatically closed.

    std::unique_lock<std::mutex> lk(mMutex);
    // Read config forever until EOF is reached. Clients may send multiple configs -- each new
    // config replace the previous one.
    readConfig(in);

    // Now we have read an EOF we now wait for the semaphore until the client exits.
    VLOG("Now wait for client to exit");
    std::unique_lock<std::mutex> lk(mMutex);
    mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
    if (reader.joinable()) {
        reader.join();
    }
}

void ShellSubscriber::updateConfig(const ShellSubscription& config) {
    std::lock_guard<std::mutex> lock(mMutex);
    mPushedMatchers.clear();
    mPulledInfo.clear();

    for (const auto& pushed : config.pushed()) {
        mPushedMatchers.push_back(pushed);
        VLOG("adding matcher for atom %d", pushed.atom_id());
    }

    int64_t token = getElapsedRealtimeNs();
    mPullToken = token;

    int64_t minInterval = -1;
    for (const auto& pulled : config.pulled()) {
        // All intervals need to be multiples of the min interval.
        if (minInterval < 0 || pulled.freq_millis() < minInterval) {
            minInterval = pulled.freq_millis();
        }

        mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
        VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
    }

    if (mPulledInfo.size() > 0 && minInterval > 0) {
        // This thread is guaranteed to terminate after it detects the token is different or
        // cleaned up.
        std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
        puller.detach();
    }
}

void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
                                          const SimpleAtomMatcher& matcher) {
    if (mOutput == 0) return;
    int count = 0;
    mProto.clear();
    for (const auto& event : data) {
        VLOG("%s", event->ToString().c_str());
        if (matchesSimple(*mUidMap, matcher, *event)) {
            VLOG("matched");
            count++;
            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
            event->ToProto(mProto);
            mProto.end(atomToken);
        }
    }

    if (count > 0) {
        // First write the payload size.
        size_t bufferSize = mProto.size();
        write(mOutput, &bufferSize, sizeof(bufferSize));
        VLOG("%d atoms, proto size: %zu", count, bufferSize);
        // Then write the payload.
        mProto.flush(mOutput);
    }
    mProto.clear();
}

void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
    while (1) {
        int64_t nowMillis = getElapsedRealtimeMillis();
        {
            std::lock_guard<std::mutex> lock(mMutex);
            if (mPulledInfo.size() == 0 || mPullToken != token) {
                VLOG("Pulling thread %lld done!", (long long)token);
                return;
            }
            for (auto& pullInfo : mPulledInfo) {
                if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
                    VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());

                    vector<std::shared_ptr<LogEvent>> data;
                    mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), nowMillis * 1000000L,
                                     &data);
                    VLOG("pulled %zu atoms", data.size());
                    if (data.size() > 0) {
                        writeToOutputLocked(data, pullInfo.mPullerMatcher);
                    }
                    pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
                }
            }
        }
        VLOG("Pulling thread %lld sleep....", (long long)token);
        std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
    }
}

void ShellSubscriber::readConfig(int in) {
@@ -101,6 +184,8 @@ void ShellSubscriber::cleanUpLocked() {
    mOutput = 0;
    mResultReceiver = nullptr;
    mPushedMatchers.clear();
    mPulledInfo.clear();
    mPullToken = 0;
    VLOG("done clean up");
}

@@ -110,10 +195,13 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) {
    if (mOutput <= 0) {
        return;
    }

    for (const auto& matcher : mPushedMatchers) {
        if (matchesSimple(*mUidMap, matcher, event)) {
            VLOG("%s", event.ToString().c_str());
            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
            event.ToProto(mProto);
            mProto.end(atomToken);
            // First write the payload size.
            size_t bufferSize = mProto.size();
            write(mOutput, &bufferSize, sizeof(bufferSize));
+22 −3
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@
#include <mutex>
#include <string>
#include <thread>
#include "external/StatsPullerManager.h"
#include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
#include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
#include "packages/UidMap.h"
@@ -51,14 +52,15 @@ namespace statsd {
 * with sizeof(size_t) bytes indicating the size of the proto message payload.
 *
 * The stream would be in the following format:
 * |size_t|atom1 proto|size_t|atom2 proto|....
 * |size_t|shellData proto|size_t|shellData proto|....
 *
 * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
 * until it exits.
 */
class ShellSubscriber : public virtual IBinder::DeathRecipient {
public:
    ShellSubscriber(sp<UidMap> uidMap) : mUidMap(uidMap){};
    ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr)
        : mUidMap(uidMap), mPullerMgr(pullerMgr){};

    /**
     * Start a new subscription.
@@ -70,15 +72,28 @@ public:
    void onLogEvent(const LogEvent& event);

private:
    struct PullInfo {
        PullInfo(const SimpleAtomMatcher& matcher, int64_t interval)
            : mPullerMatcher(matcher), mInterval(interval), mPrevPullElapsedRealtimeMs(0) {
        }
        SimpleAtomMatcher mPullerMatcher;
        int64_t mInterval;
        int64_t mPrevPullElapsedRealtimeMs;
    };
    void readConfig(int in);

    void updateConfig(const ShellSubscription& config);

    void startPull(int64_t token, int64_t intervalMillis);

    void cleanUpLocked();

    void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
                             const SimpleAtomMatcher& matcher);

    sp<UidMap> mUidMap;

    // bool mWritten = false;
    sp<StatsPullerManager> mPullerMgr;

    android::util::ProtoOutputStream mProto;

@@ -93,6 +108,10 @@ private:
    sp<IResultReceiver> mResultReceiver;

    std::vector<SimpleAtomMatcher> mPushedMatchers;

    std::vector<PullInfo> mPulledInfo;

    int64_t mPullToken = 0;  // A unique token to identify a puller thread.
};

}  // namespace statsd
+1 −1
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ option java_outer_classname = "ShellConfig";
import "frameworks/base/cmds/statsd/src/statsd_config.proto";

message PulledAtomSubscription {
    optional int32 atom_id = 1;
    optional SimpleAtomMatcher matcher = 1;

    /* gap between two pulls in milliseconds */
    optional int32 freq_millis = 2;
Loading