Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 8de4042c authored by Josh Gao's avatar Josh Gao Committed by android-build-merger
Browse files

adb: fdevent: move run queue to fdevent_context. am: 95eef6b0 am: 3f06c2ba

am: 48df9697

Change-Id: I80c06b77ff4d553b582fa58967f579a0da00f82f
parents 361c8c1e 48df9697
Loading
Loading
Loading
Loading
+26 −0
Original line number Diff line number Diff line
@@ -49,6 +49,32 @@ std::string dump_fde(const fdevent* fde) {
                                       state.c_str());
}

void fdevent_context::Run(std::function<void()> fn) {
    {
        std::lock_guard<std::mutex> lock(run_queue_mutex_);
        run_queue_.push_back(std::move(fn));
    }

    Interrupt();
}

void fdevent_context::FlushRunQueue() {
    // We need to be careful around reentrancy here, since a function we call can queue up another
    // function.
    while (true) {
        std::function<void()> fn;
        {
            std::lock_guard<std::mutex> lock(this->run_queue_mutex_);
            if (this->run_queue_.empty()) {
                break;
            }
            fn = this->run_queue_.front();
            this->run_queue_.pop_front();
        }
        fn();
    }
}

static auto& g_ambient_fdevent_context =
        *new std::unique_ptr<fdevent_context>(new fdevent_context_poll());

+18 −1
Original line number Diff line number Diff line
@@ -21,10 +21,14 @@
#include <stdint.h>

#include <chrono>
#include <deque>
#include <functional>
#include <mutex>
#include <optional>
#include <variant>

#include <android-base/thread_annotations.h>

#include "adb_unique_fd.h"

// Events that may be observed
@@ -48,6 +52,7 @@ struct fdevent;
std::string dump_fde(const fdevent* fde);

struct fdevent_context {
  public:
    virtual ~fdevent_context() = default;

    // Allocate and initialize a new fdevent object.
@@ -68,17 +73,29 @@ struct fdevent_context {
    virtual void SetTimeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) = 0;

    // Loop forever, handling events.
    // Implementations should call FlushRunQueue on every iteration.
    virtual void Loop() = 0;

    // Assert that the caller is running on the context's main thread.
    virtual void CheckMainThread() = 0;

    // Queue an operation to be run on the main thread.
    virtual void Run(std::function<void()> fn) = 0;
    void Run(std::function<void()> fn);

    // Test-only functionality:
    virtual void TerminateLoop() = 0;
    virtual size_t InstalledCount() = 0;

  protected:
    // Interrupt the run loop.
    virtual void Interrupt() = 0;

    // Run all pending functions enqueued via Run().
    void FlushRunQueue() EXCLUDES(run_queue_mutex_);

  private:
    std::mutex run_queue_mutex_;
    std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_);
};

struct fdevent {
+44 −79
Original line number Diff line number Diff line
@@ -50,6 +50,35 @@
#include "fdevent.h"
#include "sysdeps/chrono.h"

static void fdevent_interrupt(int fd, unsigned, void*) {
    char buf[BUFSIZ];
    ssize_t rc = TEMP_FAILURE_RETRY(adb_read(fd, buf, sizeof(buf)));
    if (rc == -1) {
        PLOG(FATAL) << "failed to read from fdevent interrupt fd";
    }
}

fdevent_context_poll::fdevent_context_poll() {
    int s[2];
    if (adb_socketpair(s) != 0) {
        PLOG(FATAL) << "failed to create fdevent interrupt socketpair";
    }

    if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) {
        PLOG(FATAL) << "failed to make fdevent interrupt socket nonblocking";
    }

    this->interrupt_fd_.reset(s[0]);
    fdevent* fde = this->Create(unique_fd(s[1]), fdevent_interrupt, nullptr);
    CHECK(fde != nullptr);
    this->Add(fde, FDE_READ);
}

fdevent_context_poll::~fdevent_context_poll() {
    main_thread_valid_ = false;
    this->Destroy(this->interrupt_fde_);
}

void fdevent_context_poll::CheckMainThread() {
    if (main_thread_valid_) {
        CHECK_EQ(main_thread_id_, android::base::GetThreadId());
@@ -291,79 +320,6 @@ static void fdevent_call_fdfunc(fdevent* fde) {
            fde->func);
}

static void fdevent_run_flush(fdevent_context_poll* ctx) EXCLUDES(ctx->run_queue_mutex_) {
    // We need to be careful around reentrancy here, since a function we call can queue up another
    // function.
    while (true) {
        std::function<void()> fn;
        {
            std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_);
            if (ctx->run_queue_.empty()) {
                break;
            }
            fn = ctx->run_queue_.front();
            ctx->run_queue_.pop_front();
        }
        fn();
    }
}

static void fdevent_run_func(int fd, unsigned ev, void* data) {
    CHECK_GE(fd, 0);
    CHECK(ev & FDE_READ);

    bool* run_needs_flush = static_cast<bool*>(data);
    char buf[1024];

    // Empty the fd.
    if (adb_read(fd, buf, sizeof(buf)) == -1) {
        PLOG(FATAL) << "failed to empty run queue notify fd";
    }

    // Mark that we need to flush, and then run it at the end of fdevent_loop.
    *run_needs_flush = true;
}

static void fdevent_run_setup(fdevent_context_poll* ctx) {
    {
        std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_);
        CHECK(ctx->run_queue_notify_fd_.get() == -1);
        int s[2];
        if (adb_socketpair(s) != 0) {
            PLOG(FATAL) << "failed to create run queue notify socketpair";
        }

        if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) {
            PLOG(FATAL) << "failed to make run queue notify socket nonblocking";
        }

        ctx->run_queue_notify_fd_.reset(s[0]);
        fdevent* fde = ctx->Create(unique_fd(s[1]), fdevent_run_func, &ctx->run_needs_flush_);
        CHECK(fde != nullptr);
        ctx->Add(fde, FDE_READ);
    }

    fdevent_run_flush(ctx);
}

void fdevent_context_poll::Run(std::function<void()> fn) {
    std::lock_guard<std::mutex> lock(run_queue_mutex_);
    run_queue_.push_back(std::move(fn));

    // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up.
    // In that case, rely on the setup code to flush the queue without a notification being needed.
    if (run_queue_notify_fd_ != -1) {
        int rc = adb_write(run_queue_notify_fd_.get(), "", 1);

        // It's possible that we get EAGAIN here, if lots of notifications came in while handling.
        if (rc == 0) {
            PLOG(FATAL) << "run queue notify fd was closed?";
        } else if (rc == -1 && errno != EAGAIN) {
            PLOG(FATAL) << "failed to write to run queue notify fd";
        }
    }
}

static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) {
    // Check to see if we're spinning because we forgot about an fdevent
    // by keeping track of how long fdevents have been continuously pending.
@@ -424,7 +380,6 @@ static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) {
void fdevent_context_poll::Loop() {
    this->main_thread_id_ = android::base::GetThreadId();
    this->main_thread_valid_ = true;
    fdevent_run_setup(this);

    uint64_t cycle = 0;
    while (true) {
@@ -444,17 +399,27 @@ void fdevent_context_poll::Loop() {
            fdevent_call_fdfunc(fde);
        }

        if (run_needs_flush_) {
            fdevent_run_flush(this);
            run_needs_flush_ = false;
        }
        this->FlushRunQueue();
    }
}

void fdevent_context_poll::TerminateLoop() {
    terminate_loop_ = true;
    Interrupt();
}

size_t fdevent_context_poll::InstalledCount() {
    return poll_node_map_.size();
    // We always have an installed fde for interrupt.
    return poll_node_map_.size() - 1;
}

void fdevent_context_poll::Interrupt() {
    int rc = adb_write(this->interrupt_fd_, "", 1);

    // It's possible that we get EAGAIN here, if lots of notifications came in while handling.
    if (rc == 0) {
        PLOG(FATAL) << "fdevent interrupt fd was closed?";
    } else if (rc == -1 && errno != EAGAIN) {
        PLOG(FATAL) << "failed to write to fdevent interrupt fd";
    }
}
+9 −8
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@

#include <android-base/thread_annotations.h>

#include "adb_unique_fd.h"
#include "fdevent.h"

struct PollNode {
@@ -44,7 +45,8 @@ struct PollNode {
};

struct fdevent_context_poll : public fdevent_context {
    virtual ~fdevent_context_poll() = default;
    fdevent_context_poll();
    virtual ~fdevent_context_poll();

    virtual fdevent* Create(unique_fd fd, std::variant<fd_func, fd_func2> func, void* arg) final;
    virtual unique_fd Destroy(fdevent* fde) final;
@@ -58,11 +60,13 @@ struct fdevent_context_poll : public fdevent_context {

    virtual void CheckMainThread() final;

    virtual void Run(std::function<void()> fn) final;

    virtual void TerminateLoop() final;
    virtual size_t InstalledCount() final;

  protected:
    virtual void Interrupt() final;

  public:
    // All operations to fdevent should happen only in the main thread.
    // That's why we don't need a lock for fdevent.
    std::unordered_map<int, PollNode> poll_node_map_;
@@ -71,10 +75,7 @@ struct fdevent_context_poll : public fdevent_context {
    uint64_t main_thread_id_ = 0;
    uint64_t fdevent_id_ = 0;

    bool run_needs_flush_ = false;
    unique_fd run_queue_notify_fd_;
    std::mutex run_queue_mutex_;
    std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_);

    unique_fd interrupt_fd_;
    fdevent* interrupt_fde_ = nullptr;
    std::atomic<bool> terminate_loop_ = false;
};
+2 −2
Original line number Diff line number Diff line
@@ -78,8 +78,8 @@ class FdeventTest : public ::testing::Test {
    }

    size_t GetAdditionalLocalSocketCount() {
        // dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket
        return 2;
        // dummy socket installed in PrepareThread()
        return 1;
    }

    void TerminateThread() {