Loading adb/adb.cpp +5 −0 Original line number Original line Diff line number Diff line Loading @@ -395,6 +395,11 @@ void handle_packet(apacket *p, atransport *t) D("Invalid A_OKAY(%d,%d), expected A_OKAY(%d,%d) on transport %s", D("Invalid A_OKAY(%d,%d), expected A_OKAY(%d,%d) on transport %s", p->msg.arg0, p->msg.arg1, s->peer->id, p->msg.arg1, t->serial); p->msg.arg0, p->msg.arg1, s->peer->id, p->msg.arg1, t->serial); } } } else { // When receiving A_OKAY from device for A_OPEN request, the host server may // have closed the local socket because of client disconnection. Then we need // to send A_CLSE back to device to close the service on device. send_close(p->msg.arg1, p->msg.arg0, t); } } } } break; break; Loading adb/fdevent.cpp +11 −0 Original line number Original line Diff line number Diff line Loading @@ -58,6 +58,12 @@ struct PollNode { PollNode(fdevent* fde) : fde(fde) { PollNode(fdevent* fde) : fde(fde) { memset(&pollfd, 0, sizeof(pollfd)); memset(&pollfd, 0, sizeof(pollfd)); pollfd.fd = fde->fd; pollfd.fd = fde->fd; #if defined(__linux__) // Always enable POLLRDHUP, so the host server can take action when some clients disconnect. // Then we can avoid leaving many sockets in CLOSE_WAIT state. See http://b/23314034. pollfd.events = POLLRDHUP; #endif } } }; }; Loading Loading @@ -234,6 +240,11 @@ static void fdevent_process() { // be detected at that point. // be detected at that point. events |= FDE_READ | FDE_ERROR; events |= FDE_READ | FDE_ERROR; } } #if defined(__linux__) if (pollfd.revents & POLLRDHUP) { events |= FDE_READ | FDE_ERROR; } #endif if (events != 0) { if (events != 0) { auto it = g_poll_node_map.find(pollfd.fd); auto it = g_poll_node_map.find(pollfd.fd); CHECK(it != g_poll_node_map.end()); CHECK(it != g_poll_node_map.end()); Loading adb/socket_test.cpp +48 −39 Original line number Original line Diff line number Diff line Loading @@ -168,7 +168,7 @@ static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) { // The socket is closing but having some packets, so it is not closed. Then // The socket is closing but having some packets, so it is not closed. Then // some write error happens in the socket's file handler, e.g., the file // some write error happens in the socket's file handler, e.g., the file // handler is closed. // handler is closed. TEST_F(LocalSocketTest, close_with_packet) { TEST_F(LocalSocketTest, close_socket_with_packet) { int socket_fd[2]; int socket_fd[2]; ASSERT_EQ(0, adb_socketpair(socket_fd)); ASSERT_EQ(0, adb_socketpair(socket_fd)); int cause_close_fd[2]; int cause_close_fd[2]; Loading @@ -193,13 +193,8 @@ TEST_F(LocalSocketTest, close_with_packet) { ASSERT_EQ(0, pthread_join(thread, nullptr)); ASSERT_EQ(0, pthread_join(thread, nullptr)); } } #undef shutdown // This test checks if we can read packets from a closing local socket. // This test checks if we can read packets from a closing local socket. // The socket's file handler may be non readable if the other side has TEST_F(LocalSocketTest, read_from_closing_socket) { // called shutdown(SHUT_WR). But we should always write packets // successfully to the other side. TEST_F(LocalSocketTest, half_close_with_packet) { int socket_fd[2]; int socket_fd[2]; ASSERT_EQ(0, adb_socketpair(socket_fd)); ASSERT_EQ(0, adb_socketpair(socket_fd)); int cause_close_fd[2]; int cause_close_fd[2]; Loading @@ -217,7 +212,6 @@ TEST_F(LocalSocketTest, half_close_with_packet) { ASSERT_EQ(0, adb_close(cause_close_fd[0])); ASSERT_EQ(0, adb_close(cause_close_fd[0])); sleep(1); sleep(1); ASSERT_EQ(2u, fdevent_installed_count()); ASSERT_EQ(2u, fdevent_installed_count()); ASSERT_EQ(0, shutdown(socket_fd[0], SHUT_WR)); // Verify if we can read successfully. // Verify if we can read successfully. std::vector<char> buf(arg.bytes_written); std::vector<char> buf(arg.bytes_written); Loading Loading @@ -260,11 +254,21 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { ASSERT_EQ(0, pthread_join(thread, nullptr)); ASSERT_EQ(0, pthread_join(thread, nullptr)); } } struct CloseNoEventsArg { #if defined(__linux__) static void ClientThreadFunc() { std::string error; int fd = network_loopback_client(5038, SOCK_STREAM, &error); ASSERT_GE(fd, 0) << error; sleep(2); ASSERT_EQ(0, adb_close(fd)); } struct CloseRdHupSocketArg { int socket_fd; int socket_fd; }; }; static void CloseNoEventsThreadFunc(CloseNoEventsArg* arg) { static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) { asocket* s = create_local_socket(arg->socket_fd); asocket* s = create_local_socket(arg->socket_fd); ASSERT_TRUE(s != nullptr); ASSERT_TRUE(s != nullptr); Loading @@ -272,29 +276,34 @@ static void CloseNoEventsThreadFunc(CloseNoEventsArg* arg) { fdevent_loop(); fdevent_loop(); } } // This test checks when a local socket doesn't enable FDE_READ/FDE_WRITE/FDE_ERROR, it // This test checks if we can close sockets in CLOSE_WAIT state. // can still be closed when some error happens on its file handler. TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) { // This test successes on linux but fails on mac because of different implementation of std::string error; // poll(). I think the function tested here is useful to make adb server more stable on int listen_fd = network_inaddr_any_server(5038, SOCK_STREAM, &error); // linux. ASSERT_GE(listen_fd, 0); TEST_F(LocalSocketTest, close_with_no_events_installed) { pthread_t client_thread; int socket_fd[2]; ASSERT_EQ(0, pthread_create(&client_thread, nullptr, ASSERT_EQ(0, adb_socketpair(socket_fd)); reinterpret_cast<void* (*)(void*)>(ClientThreadFunc), nullptr)); CloseNoEventsArg arg; struct sockaddr addr; arg.socket_fd = socket_fd[1]; socklen_t alen; alen = sizeof(addr); int accept_fd = adb_socket_accept(listen_fd, &addr, &alen); ASSERT_GE(accept_fd, 0); CloseRdHupSocketArg arg; arg.socket_fd = accept_fd; pthread_t thread; pthread_t thread; ASSERT_EQ(0, pthread_create(&thread, nullptr, ASSERT_EQ(0, pthread_create(&thread, nullptr, reinterpret_cast<void* (*)(void*)>(CloseNoEventsThreadFunc), reinterpret_cast<void* (*)(void*)>(CloseRdHupSocketThreadFunc), &arg)); &arg)); // Wait until the fdevent_loop() starts. // Wait until the fdevent_loop() starts. sleep(1); sleep(1); ASSERT_EQ(2u, fdevent_installed_count()); ASSERT_EQ(2u, fdevent_installed_count()); ASSERT_EQ(0, adb_close(socket_fd[0])); // Wait until the client closes its socket. ASSERT_EQ(0, pthread_join(client_thread, nullptr)); // Wait until the socket is closed. sleep(2); sleep(1); ASSERT_EQ(0, pthread_kill(thread, SIGUSR1)); ASSERT_EQ(0, pthread_kill(thread, SIGUSR1)); ASSERT_EQ(0, pthread_join(thread, nullptr)); ASSERT_EQ(0, pthread_join(thread, nullptr)); } } #endif // defined(__linux__) Loading
adb/adb.cpp +5 −0 Original line number Original line Diff line number Diff line Loading @@ -395,6 +395,11 @@ void handle_packet(apacket *p, atransport *t) D("Invalid A_OKAY(%d,%d), expected A_OKAY(%d,%d) on transport %s", D("Invalid A_OKAY(%d,%d), expected A_OKAY(%d,%d) on transport %s", p->msg.arg0, p->msg.arg1, s->peer->id, p->msg.arg1, t->serial); p->msg.arg0, p->msg.arg1, s->peer->id, p->msg.arg1, t->serial); } } } else { // When receiving A_OKAY from device for A_OPEN request, the host server may // have closed the local socket because of client disconnection. Then we need // to send A_CLSE back to device to close the service on device. send_close(p->msg.arg1, p->msg.arg0, t); } } } } break; break; Loading
adb/fdevent.cpp +11 −0 Original line number Original line Diff line number Diff line Loading @@ -58,6 +58,12 @@ struct PollNode { PollNode(fdevent* fde) : fde(fde) { PollNode(fdevent* fde) : fde(fde) { memset(&pollfd, 0, sizeof(pollfd)); memset(&pollfd, 0, sizeof(pollfd)); pollfd.fd = fde->fd; pollfd.fd = fde->fd; #if defined(__linux__) // Always enable POLLRDHUP, so the host server can take action when some clients disconnect. // Then we can avoid leaving many sockets in CLOSE_WAIT state. See http://b/23314034. pollfd.events = POLLRDHUP; #endif } } }; }; Loading Loading @@ -234,6 +240,11 @@ static void fdevent_process() { // be detected at that point. // be detected at that point. events |= FDE_READ | FDE_ERROR; events |= FDE_READ | FDE_ERROR; } } #if defined(__linux__) if (pollfd.revents & POLLRDHUP) { events |= FDE_READ | FDE_ERROR; } #endif if (events != 0) { if (events != 0) { auto it = g_poll_node_map.find(pollfd.fd); auto it = g_poll_node_map.find(pollfd.fd); CHECK(it != g_poll_node_map.end()); CHECK(it != g_poll_node_map.end()); Loading
adb/socket_test.cpp +48 −39 Original line number Original line Diff line number Diff line Loading @@ -168,7 +168,7 @@ static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) { // The socket is closing but having some packets, so it is not closed. Then // The socket is closing but having some packets, so it is not closed. Then // some write error happens in the socket's file handler, e.g., the file // some write error happens in the socket's file handler, e.g., the file // handler is closed. // handler is closed. TEST_F(LocalSocketTest, close_with_packet) { TEST_F(LocalSocketTest, close_socket_with_packet) { int socket_fd[2]; int socket_fd[2]; ASSERT_EQ(0, adb_socketpair(socket_fd)); ASSERT_EQ(0, adb_socketpair(socket_fd)); int cause_close_fd[2]; int cause_close_fd[2]; Loading @@ -193,13 +193,8 @@ TEST_F(LocalSocketTest, close_with_packet) { ASSERT_EQ(0, pthread_join(thread, nullptr)); ASSERT_EQ(0, pthread_join(thread, nullptr)); } } #undef shutdown // This test checks if we can read packets from a closing local socket. // This test checks if we can read packets from a closing local socket. // The socket's file handler may be non readable if the other side has TEST_F(LocalSocketTest, read_from_closing_socket) { // called shutdown(SHUT_WR). But we should always write packets // successfully to the other side. TEST_F(LocalSocketTest, half_close_with_packet) { int socket_fd[2]; int socket_fd[2]; ASSERT_EQ(0, adb_socketpair(socket_fd)); ASSERT_EQ(0, adb_socketpair(socket_fd)); int cause_close_fd[2]; int cause_close_fd[2]; Loading @@ -217,7 +212,6 @@ TEST_F(LocalSocketTest, half_close_with_packet) { ASSERT_EQ(0, adb_close(cause_close_fd[0])); ASSERT_EQ(0, adb_close(cause_close_fd[0])); sleep(1); sleep(1); ASSERT_EQ(2u, fdevent_installed_count()); ASSERT_EQ(2u, fdevent_installed_count()); ASSERT_EQ(0, shutdown(socket_fd[0], SHUT_WR)); // Verify if we can read successfully. // Verify if we can read successfully. std::vector<char> buf(arg.bytes_written); std::vector<char> buf(arg.bytes_written); Loading Loading @@ -260,11 +254,21 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { ASSERT_EQ(0, pthread_join(thread, nullptr)); ASSERT_EQ(0, pthread_join(thread, nullptr)); } } struct CloseNoEventsArg { #if defined(__linux__) static void ClientThreadFunc() { std::string error; int fd = network_loopback_client(5038, SOCK_STREAM, &error); ASSERT_GE(fd, 0) << error; sleep(2); ASSERT_EQ(0, adb_close(fd)); } struct CloseRdHupSocketArg { int socket_fd; int socket_fd; }; }; static void CloseNoEventsThreadFunc(CloseNoEventsArg* arg) { static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) { asocket* s = create_local_socket(arg->socket_fd); asocket* s = create_local_socket(arg->socket_fd); ASSERT_TRUE(s != nullptr); ASSERT_TRUE(s != nullptr); Loading @@ -272,29 +276,34 @@ static void CloseNoEventsThreadFunc(CloseNoEventsArg* arg) { fdevent_loop(); fdevent_loop(); } } // This test checks when a local socket doesn't enable FDE_READ/FDE_WRITE/FDE_ERROR, it // This test checks if we can close sockets in CLOSE_WAIT state. // can still be closed when some error happens on its file handler. TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) { // This test successes on linux but fails on mac because of different implementation of std::string error; // poll(). I think the function tested here is useful to make adb server more stable on int listen_fd = network_inaddr_any_server(5038, SOCK_STREAM, &error); // linux. ASSERT_GE(listen_fd, 0); TEST_F(LocalSocketTest, close_with_no_events_installed) { pthread_t client_thread; int socket_fd[2]; ASSERT_EQ(0, pthread_create(&client_thread, nullptr, ASSERT_EQ(0, adb_socketpair(socket_fd)); reinterpret_cast<void* (*)(void*)>(ClientThreadFunc), nullptr)); CloseNoEventsArg arg; struct sockaddr addr; arg.socket_fd = socket_fd[1]; socklen_t alen; alen = sizeof(addr); int accept_fd = adb_socket_accept(listen_fd, &addr, &alen); ASSERT_GE(accept_fd, 0); CloseRdHupSocketArg arg; arg.socket_fd = accept_fd; pthread_t thread; pthread_t thread; ASSERT_EQ(0, pthread_create(&thread, nullptr, ASSERT_EQ(0, pthread_create(&thread, nullptr, reinterpret_cast<void* (*)(void*)>(CloseNoEventsThreadFunc), reinterpret_cast<void* (*)(void*)>(CloseRdHupSocketThreadFunc), &arg)); &arg)); // Wait until the fdevent_loop() starts. // Wait until the fdevent_loop() starts. sleep(1); sleep(1); ASSERT_EQ(2u, fdevent_installed_count()); ASSERT_EQ(2u, fdevent_installed_count()); ASSERT_EQ(0, adb_close(socket_fd[0])); // Wait until the client closes its socket. ASSERT_EQ(0, pthread_join(client_thread, nullptr)); // Wait until the socket is closed. sleep(2); sleep(1); ASSERT_EQ(0, pthread_kill(thread, SIGUSR1)); ASSERT_EQ(0, pthread_kill(thread, SIGUSR1)); ASSERT_EQ(0, pthread_join(thread, nullptr)); ASSERT_EQ(0, pthread_join(thread, nullptr)); } } #endif // defined(__linux__)