Loading adb/adb.cpp +48 −36 Original line number Diff line number Diff line Loading @@ -920,13 +920,45 @@ int launch_server(const std::string& socket_spec) { } #endif /* ADB_HOST */ bool handle_forward_request(const char* service, atransport* transport, int reply_fd) { return handle_forward_request(service, [transport](std::string*) { return transport; }, reply_fd); } // Try to handle a network forwarding request. // This returns 1 on success, 0 on failure, and -1 to indicate this is not // a forwarding-related request. int handle_forward_request(const char* service, atransport* transport, int reply_fd) { bool handle_forward_request(const char* service, std::function<atransport*(std::string* error)> transport_acquirer, int reply_fd) { if (!strcmp(service, "list-forward")) { // Create the list of forward redirections. std::string listeners = format_listeners(); #if ADB_HOST SendOkay(reply_fd); #endif SendProtocolString(reply_fd, listeners); return true; } if (!strcmp(service, "killforward-all")) { remove_all_listeners(); #if ADB_HOST /* On the host: 1st OKAY is connect, 2nd OKAY is status */ SendOkay(reply_fd); #endif SendOkay(reply_fd); return true; } if (!strncmp(service, "forward:", 8) || !strncmp(service, "killforward:", 12)) { // killforward:local // forward:(norebind:)?local;remote std::string error; atransport* transport = transport_acquirer(&error); if (!transport) { SendFail(reply_fd, error); return true; } bool kill_forward = false; bool no_rebind = false; if (android::base::StartsWith(service, "killforward:")) { Loading @@ -946,17 +978,16 @@ int handle_forward_request(const char* service, atransport* transport, int reply // Check killforward: parameter format: '<local>' if (pieces.size() != 1 || pieces[0].empty()) { SendFail(reply_fd, android::base::StringPrintf("bad killforward: %s", service)); return 1; return true; } } else { // Check forward: parameter format: '<local>;<remote>' if (pieces.size() != 2 || pieces[0].empty() || pieces[1].empty() || pieces[1][0] == '*') { SendFail(reply_fd, android::base::StringPrintf("bad forward: %s", service)); return 1; return true; } } std::string error; InstallStatus r; int resolved_tcp_port = 0; if (kill_forward) { Loading @@ -977,7 +1008,7 @@ int handle_forward_request(const char* service, atransport* transport, int reply SendProtocolString(reply_fd, android::base::StringPrintf("%d", resolved_tcp_port)); } return 1; return true; } std::string message; Loading @@ -996,9 +1027,10 @@ int handle_forward_request(const char* service, atransport* transport, int reply break; } SendFail(reply_fd, message); return 1; return true; } return 0; return false; } #if ADB_HOST Loading Loading @@ -1186,35 +1218,15 @@ int handle_host_request(const char* service, TransportType type, const char* ser return SendOkay(reply_fd, response); } if (!strcmp(service, "list-forward")) { // Create the list of forward redirections. std::string listeners = format_listeners(); #if ADB_HOST SendOkay(reply_fd); #endif return SendProtocolString(reply_fd, listeners); } if (!strcmp(service, "killforward-all")) { remove_all_listeners(); #if ADB_HOST /* On the host: 1st OKAY is connect, 2nd OKAY is status */ SendOkay(reply_fd); #endif SendOkay(reply_fd); return 1; } std::string error; atransport* t = acquire_one_transport(type, serial, transport_id, nullptr, &error); if (!t) { SendFail(reply_fd, error); return 1; if (handle_forward_request(service, [=](std::string* error) { return acquire_one_transport(type, serial, transport_id, nullptr, error); }, reply_fd)) { return 0; } int ret = handle_forward_request(service, t, reply_fd); if (ret >= 0) return ret - 1; return -1; } Loading adb/adb.h +4 −1 Original line number Diff line number Diff line Loading @@ -158,7 +158,10 @@ asocket* create_jdwp_tracker_service_socket(); unique_fd create_jdwp_connection_fd(int jdwp_pid); #endif int handle_forward_request(const char* service, atransport* transport, int reply_fd); bool handle_forward_request(const char* service, atransport* transport, int reply_fd); bool handle_forward_request(const char* service, std::function<atransport*(std::string* error)> transport_acquirer, int reply_fd); /* packet allocator */ apacket* get_apacket(void); Loading adb/client/commandline.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -1614,9 +1614,9 @@ int adb_commandline(int argc, const char** argv) { return bugreport.DoIt(argc, argv); } else if (!strcmp(argv[0], "forward") || !strcmp(argv[0], "reverse")) { bool reverse = !strcmp(argv[0], "reverse"); ++argv; --argc; if (argc < 1) return syntax_error("%s requires an argument", argv[0]); ++argv; // Determine the <host-prefix> for this command. std::string host_prefix; Loading adb/daemon/services.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -157,7 +157,7 @@ unique_fd reverse_service(const char* command, atransport* transport) { return unique_fd{}; } VLOG(SERVICES) << "service socketpair: " << s[0] << ", " << s[1]; if (handle_forward_request(command, transport, s[1]) < 0) { if (!handle_forward_request(command, transport, s[1])) { SendFail(s[1], "not a reverse forwarding command"); } adb_close(s[1]); Loading adb/transport.cpp +51 −18 Original line number Diff line number Diff line Loading @@ -50,6 +50,7 @@ #include "adb_trace.h" #include "adb_utils.h" #include "fdevent.h" #include "sysdeps/chrono.h" static void register_transport(atransport* transport); static void remove_transport(atransport* transport); Loading Loading @@ -80,6 +81,7 @@ class SCOPED_CAPABILITY ScopedAssumeLocked { ~ScopedAssumeLocked() RELEASE() {} }; #if ADB_HOST // Tracks and handles atransport*s that are attempting reconnection. class ReconnectHandler { public: Loading @@ -102,12 +104,18 @@ class ReconnectHandler { // Tracks a reconnection attempt. struct ReconnectAttempt { atransport* transport; std::chrono::system_clock::time_point deadline; std::chrono::steady_clock::time_point reconnect_time; size_t attempts_left; bool operator<(const ReconnectAttempt& rhs) const { // std::priority_queue returns the largest element first, so we want attempts that have // less time remaining (i.e. smaller time_points) to compare greater. return reconnect_time > rhs.reconnect_time; } }; // Only retry for up to one minute. static constexpr const std::chrono::seconds kDefaultTimeout = std::chrono::seconds(10); static constexpr const std::chrono::seconds kDefaultTimeout = 10s; static constexpr const size_t kMaxAttempts = 6; // Protects all members. Loading @@ -115,7 +123,7 @@ class ReconnectHandler { bool running_ GUARDED_BY(reconnect_mutex_) = true; std::thread handler_thread_; std::condition_variable reconnect_cv_; std::queue<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_); std::priority_queue<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_); DISALLOW_COPY_AND_ASSIGN(ReconnectHandler); }; Loading @@ -137,7 +145,7 @@ void ReconnectHandler::Stop() { // Drain the queue to free all resources. std::lock_guard<std::mutex> lock(reconnect_mutex_); while (!reconnect_queue_.empty()) { ReconnectAttempt attempt = reconnect_queue_.front(); ReconnectAttempt attempt = reconnect_queue_.top(); reconnect_queue_.pop(); remove_transport(attempt.transport); } Loading @@ -148,9 +156,10 @@ void ReconnectHandler::TrackTransport(atransport* transport) { { std::lock_guard<std::mutex> lock(reconnect_mutex_); if (!running_) return; reconnect_queue_.emplace(ReconnectAttempt{ transport, std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout, ReconnectHandler::kMaxAttempts}); // Arbitrary sleep to give adbd time to get ready, if we disconnected because it exited. auto reconnect_time = std::chrono::steady_clock::now() + 250ms; reconnect_queue_.emplace( ReconnectAttempt{transport, reconnect_time, ReconnectHandler::kMaxAttempts}); } reconnect_cv_.notify_one(); } Loading @@ -162,15 +171,27 @@ void ReconnectHandler::Run() { std::unique_lock<std::mutex> lock(reconnect_mutex_); ScopedAssumeLocked assume_lock(reconnect_mutex_); auto deadline = std::chrono::time_point<std::chrono::system_clock>::max(); if (!reconnect_queue_.empty()) deadline = reconnect_queue_.front().deadline; reconnect_cv_.wait_until(lock, deadline, [&]() REQUIRES(reconnect_mutex_) { return !running_ || (!reconnect_queue_.empty() && reconnect_queue_.front().deadline < deadline); }); if (!reconnect_queue_.empty()) { // FIXME: libstdc++ (used on Windows) implements condition_variable with // system_clock as its clock, so we're probably hosed if the clock changes, // even if we use steady_clock throughout. This problem goes away once we // switch to libc++. reconnect_cv_.wait_until(lock, reconnect_queue_.top().reconnect_time); } else { reconnect_cv_.wait(lock); } if (!running_) return; attempt = reconnect_queue_.front(); if (reconnect_queue_.empty()) continue; // Go back to sleep in case |reconnect_cv_| woke up spuriously and we still // have more time to wait for the current attempt. auto now = std::chrono::steady_clock::now(); if (reconnect_queue_.top().reconnect_time > now) { continue; } attempt = reconnect_queue_.top(); reconnect_queue_.pop(); if (attempt.transport->kicked()) { D("transport %s was kicked. giving up on it.", attempt.transport->serial.c_str()); Loading @@ -192,7 +213,7 @@ void ReconnectHandler::Run() { std::lock_guard<std::mutex> lock(reconnect_mutex_); reconnect_queue_.emplace(ReconnectAttempt{ attempt.transport, std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout, std::chrono::steady_clock::now() + ReconnectHandler::kDefaultTimeout, attempt.attempts_left - 1}); continue; } Loading @@ -204,6 +225,8 @@ void ReconnectHandler::Run() { static auto& reconnect_handler = *new ReconnectHandler(); #endif } // namespace TransportId NextTransportId() { Loading Loading @@ -677,9 +700,11 @@ static void transport_registration_func(int _fd, unsigned ev, void*) { update_transports(); } #if ADB_HOST void init_reconnect_handler(void) { reconnect_handler.Start(); } #endif void init_transport_registration(void) { int s[2]; Loading @@ -698,7 +723,9 @@ void init_transport_registration(void) { } void kick_all_transports() { #if ADB_HOST reconnect_handler.Stop(); #endif // To avoid only writing part of a packet to a transport after exit, kick all transports. std::lock_guard<std::recursive_mutex> lock(transport_lock); for (auto t : transport_list) { Loading Loading @@ -736,13 +763,19 @@ static void transport_unref(atransport* t) { t->ref_count--; if (t->ref_count == 0) { t->connection()->Stop(); #if ADB_HOST if (t->IsTcpDevice() && !t->kicked()) { D("transport: %s unref (attempting reconnection) %d", t->serial.c_str(), t->kicked()); D("transport: %s unref (attempting reconnection)", t->serial.c_str()); reconnect_handler.TrackTransport(t); } else { D("transport: %s unref (kicking and closing)", t->serial.c_str()); remove_transport(t); } #else D("transport: %s unref (kicking and closing)", t->serial.c_str()); remove_transport(t); #endif } else { D("transport: %s unref (count=%zu)", t->serial.c_str(), t->ref_count); } Loading Loading
adb/adb.cpp +48 −36 Original line number Diff line number Diff line Loading @@ -920,13 +920,45 @@ int launch_server(const std::string& socket_spec) { } #endif /* ADB_HOST */ bool handle_forward_request(const char* service, atransport* transport, int reply_fd) { return handle_forward_request(service, [transport](std::string*) { return transport; }, reply_fd); } // Try to handle a network forwarding request. // This returns 1 on success, 0 on failure, and -1 to indicate this is not // a forwarding-related request. int handle_forward_request(const char* service, atransport* transport, int reply_fd) { bool handle_forward_request(const char* service, std::function<atransport*(std::string* error)> transport_acquirer, int reply_fd) { if (!strcmp(service, "list-forward")) { // Create the list of forward redirections. std::string listeners = format_listeners(); #if ADB_HOST SendOkay(reply_fd); #endif SendProtocolString(reply_fd, listeners); return true; } if (!strcmp(service, "killforward-all")) { remove_all_listeners(); #if ADB_HOST /* On the host: 1st OKAY is connect, 2nd OKAY is status */ SendOkay(reply_fd); #endif SendOkay(reply_fd); return true; } if (!strncmp(service, "forward:", 8) || !strncmp(service, "killforward:", 12)) { // killforward:local // forward:(norebind:)?local;remote std::string error; atransport* transport = transport_acquirer(&error); if (!transport) { SendFail(reply_fd, error); return true; } bool kill_forward = false; bool no_rebind = false; if (android::base::StartsWith(service, "killforward:")) { Loading @@ -946,17 +978,16 @@ int handle_forward_request(const char* service, atransport* transport, int reply // Check killforward: parameter format: '<local>' if (pieces.size() != 1 || pieces[0].empty()) { SendFail(reply_fd, android::base::StringPrintf("bad killforward: %s", service)); return 1; return true; } } else { // Check forward: parameter format: '<local>;<remote>' if (pieces.size() != 2 || pieces[0].empty() || pieces[1].empty() || pieces[1][0] == '*') { SendFail(reply_fd, android::base::StringPrintf("bad forward: %s", service)); return 1; return true; } } std::string error; InstallStatus r; int resolved_tcp_port = 0; if (kill_forward) { Loading @@ -977,7 +1008,7 @@ int handle_forward_request(const char* service, atransport* transport, int reply SendProtocolString(reply_fd, android::base::StringPrintf("%d", resolved_tcp_port)); } return 1; return true; } std::string message; Loading @@ -996,9 +1027,10 @@ int handle_forward_request(const char* service, atransport* transport, int reply break; } SendFail(reply_fd, message); return 1; return true; } return 0; return false; } #if ADB_HOST Loading Loading @@ -1186,35 +1218,15 @@ int handle_host_request(const char* service, TransportType type, const char* ser return SendOkay(reply_fd, response); } if (!strcmp(service, "list-forward")) { // Create the list of forward redirections. std::string listeners = format_listeners(); #if ADB_HOST SendOkay(reply_fd); #endif return SendProtocolString(reply_fd, listeners); } if (!strcmp(service, "killforward-all")) { remove_all_listeners(); #if ADB_HOST /* On the host: 1st OKAY is connect, 2nd OKAY is status */ SendOkay(reply_fd); #endif SendOkay(reply_fd); return 1; } std::string error; atransport* t = acquire_one_transport(type, serial, transport_id, nullptr, &error); if (!t) { SendFail(reply_fd, error); return 1; if (handle_forward_request(service, [=](std::string* error) { return acquire_one_transport(type, serial, transport_id, nullptr, error); }, reply_fd)) { return 0; } int ret = handle_forward_request(service, t, reply_fd); if (ret >= 0) return ret - 1; return -1; } Loading
adb/adb.h +4 −1 Original line number Diff line number Diff line Loading @@ -158,7 +158,10 @@ asocket* create_jdwp_tracker_service_socket(); unique_fd create_jdwp_connection_fd(int jdwp_pid); #endif int handle_forward_request(const char* service, atransport* transport, int reply_fd); bool handle_forward_request(const char* service, atransport* transport, int reply_fd); bool handle_forward_request(const char* service, std::function<atransport*(std::string* error)> transport_acquirer, int reply_fd); /* packet allocator */ apacket* get_apacket(void); Loading
adb/client/commandline.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -1614,9 +1614,9 @@ int adb_commandline(int argc, const char** argv) { return bugreport.DoIt(argc, argv); } else if (!strcmp(argv[0], "forward") || !strcmp(argv[0], "reverse")) { bool reverse = !strcmp(argv[0], "reverse"); ++argv; --argc; if (argc < 1) return syntax_error("%s requires an argument", argv[0]); ++argv; // Determine the <host-prefix> for this command. std::string host_prefix; Loading
adb/daemon/services.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -157,7 +157,7 @@ unique_fd reverse_service(const char* command, atransport* transport) { return unique_fd{}; } VLOG(SERVICES) << "service socketpair: " << s[0] << ", " << s[1]; if (handle_forward_request(command, transport, s[1]) < 0) { if (!handle_forward_request(command, transport, s[1])) { SendFail(s[1], "not a reverse forwarding command"); } adb_close(s[1]); Loading
adb/transport.cpp +51 −18 Original line number Diff line number Diff line Loading @@ -50,6 +50,7 @@ #include "adb_trace.h" #include "adb_utils.h" #include "fdevent.h" #include "sysdeps/chrono.h" static void register_transport(atransport* transport); static void remove_transport(atransport* transport); Loading Loading @@ -80,6 +81,7 @@ class SCOPED_CAPABILITY ScopedAssumeLocked { ~ScopedAssumeLocked() RELEASE() {} }; #if ADB_HOST // Tracks and handles atransport*s that are attempting reconnection. class ReconnectHandler { public: Loading @@ -102,12 +104,18 @@ class ReconnectHandler { // Tracks a reconnection attempt. struct ReconnectAttempt { atransport* transport; std::chrono::system_clock::time_point deadline; std::chrono::steady_clock::time_point reconnect_time; size_t attempts_left; bool operator<(const ReconnectAttempt& rhs) const { // std::priority_queue returns the largest element first, so we want attempts that have // less time remaining (i.e. smaller time_points) to compare greater. return reconnect_time > rhs.reconnect_time; } }; // Only retry for up to one minute. static constexpr const std::chrono::seconds kDefaultTimeout = std::chrono::seconds(10); static constexpr const std::chrono::seconds kDefaultTimeout = 10s; static constexpr const size_t kMaxAttempts = 6; // Protects all members. Loading @@ -115,7 +123,7 @@ class ReconnectHandler { bool running_ GUARDED_BY(reconnect_mutex_) = true; std::thread handler_thread_; std::condition_variable reconnect_cv_; std::queue<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_); std::priority_queue<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_); DISALLOW_COPY_AND_ASSIGN(ReconnectHandler); }; Loading @@ -137,7 +145,7 @@ void ReconnectHandler::Stop() { // Drain the queue to free all resources. std::lock_guard<std::mutex> lock(reconnect_mutex_); while (!reconnect_queue_.empty()) { ReconnectAttempt attempt = reconnect_queue_.front(); ReconnectAttempt attempt = reconnect_queue_.top(); reconnect_queue_.pop(); remove_transport(attempt.transport); } Loading @@ -148,9 +156,10 @@ void ReconnectHandler::TrackTransport(atransport* transport) { { std::lock_guard<std::mutex> lock(reconnect_mutex_); if (!running_) return; reconnect_queue_.emplace(ReconnectAttempt{ transport, std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout, ReconnectHandler::kMaxAttempts}); // Arbitrary sleep to give adbd time to get ready, if we disconnected because it exited. auto reconnect_time = std::chrono::steady_clock::now() + 250ms; reconnect_queue_.emplace( ReconnectAttempt{transport, reconnect_time, ReconnectHandler::kMaxAttempts}); } reconnect_cv_.notify_one(); } Loading @@ -162,15 +171,27 @@ void ReconnectHandler::Run() { std::unique_lock<std::mutex> lock(reconnect_mutex_); ScopedAssumeLocked assume_lock(reconnect_mutex_); auto deadline = std::chrono::time_point<std::chrono::system_clock>::max(); if (!reconnect_queue_.empty()) deadline = reconnect_queue_.front().deadline; reconnect_cv_.wait_until(lock, deadline, [&]() REQUIRES(reconnect_mutex_) { return !running_ || (!reconnect_queue_.empty() && reconnect_queue_.front().deadline < deadline); }); if (!reconnect_queue_.empty()) { // FIXME: libstdc++ (used on Windows) implements condition_variable with // system_clock as its clock, so we're probably hosed if the clock changes, // even if we use steady_clock throughout. This problem goes away once we // switch to libc++. reconnect_cv_.wait_until(lock, reconnect_queue_.top().reconnect_time); } else { reconnect_cv_.wait(lock); } if (!running_) return; attempt = reconnect_queue_.front(); if (reconnect_queue_.empty()) continue; // Go back to sleep in case |reconnect_cv_| woke up spuriously and we still // have more time to wait for the current attempt. auto now = std::chrono::steady_clock::now(); if (reconnect_queue_.top().reconnect_time > now) { continue; } attempt = reconnect_queue_.top(); reconnect_queue_.pop(); if (attempt.transport->kicked()) { D("transport %s was kicked. giving up on it.", attempt.transport->serial.c_str()); Loading @@ -192,7 +213,7 @@ void ReconnectHandler::Run() { std::lock_guard<std::mutex> lock(reconnect_mutex_); reconnect_queue_.emplace(ReconnectAttempt{ attempt.transport, std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout, std::chrono::steady_clock::now() + ReconnectHandler::kDefaultTimeout, attempt.attempts_left - 1}); continue; } Loading @@ -204,6 +225,8 @@ void ReconnectHandler::Run() { static auto& reconnect_handler = *new ReconnectHandler(); #endif } // namespace TransportId NextTransportId() { Loading Loading @@ -677,9 +700,11 @@ static void transport_registration_func(int _fd, unsigned ev, void*) { update_transports(); } #if ADB_HOST void init_reconnect_handler(void) { reconnect_handler.Start(); } #endif void init_transport_registration(void) { int s[2]; Loading @@ -698,7 +723,9 @@ void init_transport_registration(void) { } void kick_all_transports() { #if ADB_HOST reconnect_handler.Stop(); #endif // To avoid only writing part of a packet to a transport after exit, kick all transports. std::lock_guard<std::recursive_mutex> lock(transport_lock); for (auto t : transport_list) { Loading Loading @@ -736,13 +763,19 @@ static void transport_unref(atransport* t) { t->ref_count--; if (t->ref_count == 0) { t->connection()->Stop(); #if ADB_HOST if (t->IsTcpDevice() && !t->kicked()) { D("transport: %s unref (attempting reconnection) %d", t->serial.c_str(), t->kicked()); D("transport: %s unref (attempting reconnection)", t->serial.c_str()); reconnect_handler.TrackTransport(t); } else { D("transport: %s unref (kicking and closing)", t->serial.c_str()); remove_transport(t); } #else D("transport: %s unref (kicking and closing)", t->serial.c_str()); remove_transport(t); #endif } else { D("transport: %s unref (count=%zu)", t->serial.c_str(), t->ref_count); } Loading