Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 606c83bc authored by Josh Gao's avatar Josh Gao Committed by android-build-merger
Browse files

Merge changes Iff1ec14b,I32123c51,I517dbcba,Ic215c7fe

am: b9b967e2

Change-Id: Ia0c1c07dcc2ddfae072cb43a20960efe0de7e951
parents 3cdbced8 b9b967e2
Loading
Loading
Loading
Loading
+8 −1
Original line number Diff line number Diff line
@@ -75,6 +75,7 @@ static std::atomic<bool> terminate_loop(false);
static bool main_thread_valid;
static uint64_t main_thread_id;

static bool run_needs_flush = false;
static auto& run_queue_notify_fd = *new unique_fd();
static auto& run_queue_mutex = *new std::mutex();
static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::deque<std::function<void()>>();
@@ -317,7 +318,8 @@ static void fdevent_run_func(int fd, unsigned ev, void* /* userdata */) {
        PLOG(FATAL) << "failed to empty run queue notify fd";
    }

    fdevent_run_flush();
    // Mark that we need to flush, and then run it at the end of fdevent_loop.
    run_needs_flush = true;
}

static void fdevent_run_setup() {
@@ -378,6 +380,11 @@ void fdevent_loop() {
            g_pending_list.pop_front();
            fdevent_call_fdfunc(fde);
        }

        if (run_needs_flush) {
            fdevent_run_flush();
            run_needs_flush = false;
        }
    }
}

+28 −33
Original line number Diff line number Diff line
@@ -80,30 +80,7 @@ struct ThreadArg {

TEST_F(FdeventTest, fdevent_terminate) {
    PrepareThread();

    std::thread thread(fdevent_loop);
    TerminateThread(thread);
}

static void FdEventThreadFunc(ThreadArg* arg) {
    std::vector<int> read_fds;
    std::vector<int> write_fds;

    read_fds.push_back(arg->first_read_fd);
    for (size_t i = 0; i < arg->middle_pipe_count; ++i) {
        int fds[2];
        ASSERT_EQ(0, adb_socketpair(fds));
        read_fds.push_back(fds[0]);
        write_fds.push_back(fds[1]);
    }
    write_fds.push_back(arg->last_write_fd);

    std::vector<std::unique_ptr<FdHandler>> fd_handlers;
    for (size_t i = 0; i < read_fds.size(); ++i) {
        fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
    }

    fdevent_loop();
    TerminateThread();
}

TEST_F(FdeventTest, smoke) {
@@ -122,7 +99,26 @@ TEST_F(FdeventTest, smoke) {
    int reader = fd_pair2[0];

    PrepareThread();
    std::thread thread(FdEventThreadFunc, &thread_arg);

    std::vector<std::unique_ptr<FdHandler>> fd_handlers;
    fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() {
        std::vector<int> read_fds;
        std::vector<int> write_fds;

        read_fds.push_back(thread_arg.first_read_fd);
        for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
            int fds[2];
            ASSERT_EQ(0, adb_socketpair(fds));
            read_fds.push_back(fds[0]);
            write_fds.push_back(fds[1]);
        }
        write_fds.push_back(thread_arg.last_write_fd);

        for (size_t i = 0; i < read_fds.size(); ++i) {
            fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
        }
    });
    WaitForFdeventLoop();

    for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
        std::string read_buffer = MESSAGE;
@@ -132,7 +128,10 @@ TEST_F(FdeventTest, smoke) {
        ASSERT_EQ(read_buffer, write_buffer);
    }

    TerminateThread(thread);
    fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); });
    WaitForFdeventLoop();

    TerminateThread();
    ASSERT_EQ(0, adb_close(writer));
    ASSERT_EQ(0, adb_close(reader));
}
@@ -143,7 +142,7 @@ struct InvalidFdArg {
    size_t* happened_event_count;
};

static void InvalidFdEventCallback(int fd, unsigned events, void* userdata) {
static void InvalidFdEventCallback(int, unsigned events, void* userdata) {
    InvalidFdArg* arg = reinterpret_cast<InvalidFdArg*>(userdata);
    ASSERT_EQ(arg->expected_events, events);
    fdevent_remove(&arg->fde);
@@ -179,7 +178,6 @@ TEST_F(FdeventTest, run_on_main_thread) {
    std::vector<int> vec;

    PrepareThread();
    std::thread thread(fdevent_loop);

    // Block the main thread for a long time while we queue our callbacks.
    fdevent_run_on_main_thread([]() {
@@ -194,7 +192,7 @@ TEST_F(FdeventTest, run_on_main_thread) {
        });
    }

    TerminateThread(thread);
    TerminateThread();

    ASSERT_EQ(1000000u, vec.size());
    for (int i = 0; i < 1000000; ++i) {
@@ -218,11 +216,8 @@ TEST_F(FdeventTest, run_on_main_thread_reentrant) {
    std::vector<int> vec;

    PrepareThread();
    std::thread thread(fdevent_loop);

    fdevent_run_on_main_thread(make_appender(&vec, 0));

    TerminateThread(thread);
    TerminateThread();

    ASSERT_EQ(100u, vec.size());
    for (int i = 0; i < 100; ++i) {
+28 −2
Original line number Diff line number Diff line
@@ -16,10 +16,31 @@

#include <gtest/gtest.h>

#include <condition_variable>
#include <mutex>
#include <thread>

#include "socket.h"
#include "sysdeps.h"
#include "sysdeps/chrono.h"

static void WaitForFdeventLoop() {
    // Sleep for a bit to make sure that network events have propagated.
    std::this_thread::sleep_for(100ms);

    // fdevent_run_on_main_thread has a guaranteed ordering, and is guaranteed to happen after
    // socket events, so as soon as our function is called, we know that we've processed all
    // previous events.
    std::mutex mutex;
    std::condition_variable cv;
    std::unique_lock<std::mutex> lock(mutex);
    fdevent_run_on_main_thread([&]() {
        mutex.lock();
        mutex.unlock();
        cv.notify_one();
    });
    cv.wait(lock);
}

class FdeventTest : public ::testing::Test {
  protected:
@@ -49,6 +70,9 @@ class FdeventTest : public ::testing::Test {
        }
        dummy_socket->ready(dummy_socket);
        dummy = dummy_fds[0];

        thread_ = std::thread([]() { fdevent_loop(); });
        WaitForFdeventLoop();
    }

    size_t GetAdditionalLocalSocketCount() {
@@ -56,10 +80,12 @@ class FdeventTest : public ::testing::Test {
        return 2;
    }

    void TerminateThread(std::thread& thread) {
    void TerminateThread() {
        fdevent_terminate_loop();
        ASSERT_TRUE(WriteFdExactly(dummy, "", 1));
        thread.join();
        thread_.join();
        ASSERT_EQ(0, adb_close(dummy));
    }

    std::thread thread_;
};
+48 −49
Original line number Diff line number Diff line
@@ -42,10 +42,6 @@ struct ThreadArg {

class LocalSocketTest : public FdeventTest {};

static void WaitForFdeventLoop() {
    std::this_thread::sleep_for(100ms);
}

TEST_F(LocalSocketTest, smoke) {
    // Join two socketpairs with a chain of intermediate socketpairs.
    int first[2];
@@ -86,7 +82,6 @@ TEST_F(LocalSocketTest, smoke) {
    connect(prev_tail, end);

    PrepareThread();
    std::thread thread(fdevent_loop);

    for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
        std::string read_buffer = MESSAGE;
@@ -102,7 +97,7 @@ TEST_F(LocalSocketTest, smoke) {
    // Wait until the local sockets are closed.
    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

struct CloseWithPacketArg {
@@ -111,24 +106,39 @@ struct CloseWithPacketArg {
    int cause_close_fd;
};

static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) {
static void CreateCloser(CloseWithPacketArg* arg) {
    fdevent_run_on_main_thread([arg]() {
        asocket* s = create_local_socket(arg->socket_fd);
        ASSERT_TRUE(s != nullptr);
        arg->bytes_written = 0;

        // On platforms that implement sockets via underlying sockets (e.g. Wine),
        // a socket can appear to be full, and then become available for writes
        // again without read being called on the other end. Loop and sleep after
        // each write to give the underlying implementation time to flush.
        bool socket_filled = false;
        for (int i = 0; i < 128; ++i) {
            std::string data;
            data.resize(MAX_PAYLOAD);
            arg->bytes_written += data.size();
            int ret = s->enqueue(s, std::move(data));
    ASSERT_EQ(1, ret);
            if (ret == 1) {
                socket_filled = true;
                break;
            }
            ASSERT_NE(-1, ret);

            std::this_thread::sleep_for(250ms);
        }
        ASSERT_TRUE(socket_filled);

        asocket* cause_close_s = create_local_socket(arg->cause_close_fd);
        ASSERT_TRUE(cause_close_s != nullptr);
        cause_close_s->peer = s;
        s->peer = cause_close_s;
        cause_close_s->ready(cause_close_s);

    fdevent_loop();
    });
    WaitForFdeventLoop();
}

// This test checks if we can close local socket in the following situation:
@@ -145,9 +155,8 @@ TEST_F(LocalSocketTest, close_socket_with_packet) {
    arg.cause_close_fd = cause_close_fd[1];

    PrepareThread();
    std::thread thread(CloseWithPacketThreadFunc, &arg);
    CreateCloser(&arg);

    WaitForFdeventLoop();
    ASSERT_EQ(0, adb_close(cause_close_fd[0]));

    WaitForFdeventLoop();
@@ -156,7 +165,7 @@ TEST_F(LocalSocketTest, close_socket_with_packet) {

    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

// This test checks if we can read packets from a closing local socket.
@@ -170,7 +179,7 @@ TEST_F(LocalSocketTest, read_from_closing_socket) {
    arg.cause_close_fd = cause_close_fd[1];

    PrepareThread();
    std::thread thread(CloseWithPacketThreadFunc, &arg);
    CreateCloser(&arg);

    WaitForFdeventLoop();
    ASSERT_EQ(0, adb_close(cause_close_fd[0]));
@@ -186,7 +195,7 @@ TEST_F(LocalSocketTest, read_from_closing_socket) {

    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

// This test checks if we can close local socket in the following situation:
@@ -203,7 +212,7 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) {
    arg.cause_close_fd = cause_close_fd[1];

    PrepareThread();
    std::thread thread(CloseWithPacketThreadFunc, &arg);
    CreateCloser(&arg);

    WaitForFdeventLoop();
    EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count());
@@ -211,7 +220,7 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) {

    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

// Ensure that if we fail to write output to an fd, we will still flush data coming from it.
@@ -231,7 +240,6 @@ TEST_F(LocalSocketTest, flush_after_shutdown) {
    tail->ready(tail);

    PrepareThread();
    std::thread thread(fdevent_loop);

    EXPECT_TRUE(WriteFdExactly(head_fd[0], "foo", 3));

@@ -249,7 +257,7 @@ TEST_F(LocalSocketTest, flush_after_shutdown) {

    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

#if defined(__linux__)
@@ -258,21 +266,10 @@ static void ClientThreadFunc() {
    std::string error;
    int fd = network_loopback_client(5038, SOCK_STREAM, &error);
    ASSERT_GE(fd, 0) << error;
    std::this_thread::sleep_for(200ms);
    std::this_thread::sleep_for(1s);
    ASSERT_EQ(0, adb_close(fd));
}

struct CloseRdHupSocketArg {
    int socket_fd;
};

static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) {
    asocket* s = create_local_socket(arg->socket_fd);
    ASSERT_TRUE(s != nullptr);

    fdevent_loop();
}

// This test checks if we can close sockets in CLOSE_WAIT state.
TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {
    std::string error;
@@ -283,11 +280,13 @@ TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {

    int accept_fd = adb_socket_accept(listen_fd, nullptr, nullptr);
    ASSERT_GE(accept_fd, 0);
    CloseRdHupSocketArg arg;
    arg.socket_fd = accept_fd;

    PrepareThread();
    std::thread thread(CloseRdHupSocketThreadFunc, &arg);

    fdevent_run_on_main_thread([accept_fd]() {
        asocket* s = create_local_socket(accept_fd);
        ASSERT_TRUE(s != nullptr);
    });

    WaitForFdeventLoop();
    EXPECT_EQ(1u + GetAdditionalLocalSocketCount(), fdevent_installed_count());
@@ -297,7 +296,7 @@ TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {

    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

#endif  // defined(__linux__)
+5 −5
Original line number Diff line number Diff line
@@ -126,13 +126,13 @@ static SocketFlushResult local_socket_flush_incoming(asocket* s) {
        } else if (rc == -1 && errno == EAGAIN) {
            fdevent_add(&s->fde, FDE_WRITE);
            return SocketFlushResult::TryAgain;
        }

        } else {
            // We failed to write, but it's possible that we can still read from the socket.
            // Give that a try before giving up.
            s->has_write_error = true;
            break;
        }
    }

    // If we sent the last packet of a closing socket, we can now destroy it.
    if (s->closing) {