Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7ab55713 authored by Josh Gao's avatar Josh Gao
Browse files

adb: move ownership of the fdevent thread into FdeventTest.

Previously, each of the tests was spawning the fdevent thread manually,
in order to be able to set up listeners and such before running
fdevent_loop. Now that we have a way to run arbitrary code on the
fdevent thread, switch to having a generic fdevent thread and running
setup code via fdevent_run_on_main_thread.

Test: adb_test
Test: wine adb_test.exe
Change-Id: I517dbcbad31067b45087d9fbed67a75b75a75aec
parent fa30bf39
Loading
Loading
Loading
Loading
+28 −33
Original line number Diff line number Diff line
@@ -80,30 +80,7 @@ struct ThreadArg {

TEST_F(FdeventTest, fdevent_terminate) {
    PrepareThread();

    std::thread thread(fdevent_loop);
    TerminateThread(thread);
}

static void FdEventThreadFunc(ThreadArg* arg) {
    std::vector<int> read_fds;
    std::vector<int> write_fds;

    read_fds.push_back(arg->first_read_fd);
    for (size_t i = 0; i < arg->middle_pipe_count; ++i) {
        int fds[2];
        ASSERT_EQ(0, adb_socketpair(fds));
        read_fds.push_back(fds[0]);
        write_fds.push_back(fds[1]);
    }
    write_fds.push_back(arg->last_write_fd);

    std::vector<std::unique_ptr<FdHandler>> fd_handlers;
    for (size_t i = 0; i < read_fds.size(); ++i) {
        fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
    }

    fdevent_loop();
    TerminateThread();
}

TEST_F(FdeventTest, smoke) {
@@ -122,7 +99,26 @@ TEST_F(FdeventTest, smoke) {
    int reader = fd_pair2[0];

    PrepareThread();
    std::thread thread(FdEventThreadFunc, &thread_arg);

    std::vector<std::unique_ptr<FdHandler>> fd_handlers;
    fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() {
        std::vector<int> read_fds;
        std::vector<int> write_fds;

        read_fds.push_back(thread_arg.first_read_fd);
        for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
            int fds[2];
            ASSERT_EQ(0, adb_socketpair(fds));
            read_fds.push_back(fds[0]);
            write_fds.push_back(fds[1]);
        }
        write_fds.push_back(thread_arg.last_write_fd);

        for (size_t i = 0; i < read_fds.size(); ++i) {
            fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
        }
    });
    WaitForFdeventLoop();

    for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
        std::string read_buffer = MESSAGE;
@@ -132,7 +128,10 @@ TEST_F(FdeventTest, smoke) {
        ASSERT_EQ(read_buffer, write_buffer);
    }

    TerminateThread(thread);
    fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); });
    WaitForFdeventLoop();

    TerminateThread();
    ASSERT_EQ(0, adb_close(writer));
    ASSERT_EQ(0, adb_close(reader));
}
@@ -143,7 +142,7 @@ struct InvalidFdArg {
    size_t* happened_event_count;
};

static void InvalidFdEventCallback(int fd, unsigned events, void* userdata) {
static void InvalidFdEventCallback(int, unsigned events, void* userdata) {
    InvalidFdArg* arg = reinterpret_cast<InvalidFdArg*>(userdata);
    ASSERT_EQ(arg->expected_events, events);
    fdevent_remove(&arg->fde);
@@ -179,7 +178,6 @@ TEST_F(FdeventTest, run_on_main_thread) {
    std::vector<int> vec;

    PrepareThread();
    std::thread thread(fdevent_loop);

    // Block the main thread for a long time while we queue our callbacks.
    fdevent_run_on_main_thread([]() {
@@ -194,7 +192,7 @@ TEST_F(FdeventTest, run_on_main_thread) {
        });
    }

    TerminateThread(thread);
    TerminateThread();

    ASSERT_EQ(1000000u, vec.size());
    for (int i = 0; i < 1000000; ++i) {
@@ -218,11 +216,8 @@ TEST_F(FdeventTest, run_on_main_thread_reentrant) {
    std::vector<int> vec;

    PrepareThread();
    std::thread thread(fdevent_loop);

    fdevent_run_on_main_thread(make_appender(&vec, 0));

    TerminateThread(thread);
    TerminateThread();

    ASSERT_EQ(100u, vec.size());
    for (int i = 0; i < 100; ++i) {
+10 −2
Original line number Diff line number Diff line
@@ -16,10 +16,13 @@

#include <gtest/gtest.h>

#include <condition_variable>
#include <mutex>
#include <thread>

#include "socket.h"
#include "sysdeps.h"
#include "sysdeps/chrono.h"

static void WaitForFdeventLoop() {
    // Sleep for a bit to make sure that network events have propagated.
@@ -67,6 +70,9 @@ class FdeventTest : public ::testing::Test {
        }
        dummy_socket->ready(dummy_socket);
        dummy = dummy_fds[0];

        thread_ = std::thread([]() { fdevent_loop(); });
        WaitForFdeventLoop();
    }

    size_t GetAdditionalLocalSocketCount() {
@@ -74,10 +80,12 @@ class FdeventTest : public ::testing::Test {
        return 2;
    }

    void TerminateThread(std::thread& thread) {
    void TerminateThread() {
        fdevent_terminate_loop();
        ASSERT_TRUE(WriteFdExactly(dummy, "", 1));
        thread.join();
        thread_.join();
        ASSERT_EQ(0, adb_close(dummy));
    }

    std::thread thread_;
};
+34 −45
Original line number Diff line number Diff line
@@ -82,7 +82,6 @@ TEST_F(LocalSocketTest, smoke) {
    connect(prev_tail, end);

    PrepareThread();
    std::thread thread(fdevent_loop);

    for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
        std::string read_buffer = MESSAGE;
@@ -98,7 +97,7 @@ TEST_F(LocalSocketTest, smoke) {
    // Wait until the local sockets are closed.
    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

struct CloseWithPacketArg {
@@ -107,7 +106,8 @@ struct CloseWithPacketArg {
    int cause_close_fd;
};

static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) {
static void CreateCloser(CloseWithPacketArg* arg) {
    fdevent_run_on_main_thread([arg]() {
        asocket* s = create_local_socket(arg->socket_fd);
        ASSERT_TRUE(s != nullptr);
        arg->bytes_written = 0;
@@ -123,8 +123,8 @@ static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) {
        cause_close_s->peer = s;
        s->peer = cause_close_s;
        cause_close_s->ready(cause_close_s);

    fdevent_loop();
    });
    WaitForFdeventLoop();
}

// This test checks if we can close local socket in the following situation:
@@ -141,9 +141,8 @@ TEST_F(LocalSocketTest, close_socket_with_packet) {
    arg.cause_close_fd = cause_close_fd[1];

    PrepareThread();
    std::thread thread(CloseWithPacketThreadFunc, &arg);
    CreateCloser(&arg);

    WaitForFdeventLoop();
    ASSERT_EQ(0, adb_close(cause_close_fd[0]));

    WaitForFdeventLoop();
@@ -152,7 +151,7 @@ TEST_F(LocalSocketTest, close_socket_with_packet) {

    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

// This test checks if we can read packets from a closing local socket.
@@ -166,7 +165,7 @@ TEST_F(LocalSocketTest, read_from_closing_socket) {
    arg.cause_close_fd = cause_close_fd[1];

    PrepareThread();
    std::thread thread(CloseWithPacketThreadFunc, &arg);
    CreateCloser(&arg);

    WaitForFdeventLoop();
    ASSERT_EQ(0, adb_close(cause_close_fd[0]));
@@ -182,7 +181,7 @@ TEST_F(LocalSocketTest, read_from_closing_socket) {

    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

// This test checks if we can close local socket in the following situation:
@@ -199,7 +198,7 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) {
    arg.cause_close_fd = cause_close_fd[1];

    PrepareThread();
    std::thread thread(CloseWithPacketThreadFunc, &arg);
    CreateCloser(&arg);

    WaitForFdeventLoop();
    EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count());
@@ -207,7 +206,7 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) {

    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

// Ensure that if we fail to write output to an fd, we will still flush data coming from it.
@@ -227,7 +226,6 @@ TEST_F(LocalSocketTest, flush_after_shutdown) {
    tail->ready(tail);

    PrepareThread();
    std::thread thread(fdevent_loop);

    EXPECT_TRUE(WriteFdExactly(head_fd[0], "foo", 3));

@@ -245,7 +243,7 @@ TEST_F(LocalSocketTest, flush_after_shutdown) {

    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

#if defined(__linux__)
@@ -254,21 +252,10 @@ static void ClientThreadFunc() {
    std::string error;
    int fd = network_loopback_client(5038, SOCK_STREAM, &error);
    ASSERT_GE(fd, 0) << error;
    std::this_thread::sleep_for(200ms);
    std::this_thread::sleep_for(1s);
    ASSERT_EQ(0, adb_close(fd));
}

struct CloseRdHupSocketArg {
    int socket_fd;
};

static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) {
    asocket* s = create_local_socket(arg->socket_fd);
    ASSERT_TRUE(s != nullptr);

    fdevent_loop();
}

// This test checks if we can close sockets in CLOSE_WAIT state.
TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {
    std::string error;
@@ -279,11 +266,13 @@ TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {

    int accept_fd = adb_socket_accept(listen_fd, nullptr, nullptr);
    ASSERT_GE(accept_fd, 0);
    CloseRdHupSocketArg arg;
    arg.socket_fd = accept_fd;

    PrepareThread();
    std::thread thread(CloseRdHupSocketThreadFunc, &arg);

    fdevent_run_on_main_thread([accept_fd]() {
        asocket* s = create_local_socket(accept_fd);
        ASSERT_TRUE(s != nullptr);
    });

    WaitForFdeventLoop();
    EXPECT_EQ(1u + GetAdditionalLocalSocketCount(), fdevent_installed_count());
@@ -293,7 +282,7 @@ TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {

    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread(thread);
    TerminateThread();
}

#endif  // defined(__linux__)