Loading adb/client/file_sync_client.cpp +99 −33 Original line number Diff line number Diff line Loading @@ -209,7 +209,8 @@ struct TransferLedger { class SyncConnection { public: SyncConnection() { SyncConnection() : acknowledgement_buffer_(sizeof(sync_status) + SYNC_DATA_MAX) { acknowledgement_buffer_.resize(0); max = SYNC_DATA_MAX; // TODO: decide at runtime. std::string error; Loading Loading @@ -507,45 +508,108 @@ class SyncConnection { return WriteOrDie(lpath, rpath, &msg.data, sizeof(msg.data)); } bool ReadAcknowledgments() { bool result = true; while (!deferred_acknowledgements_.empty()) { auto [from, to] = std::move(deferred_acknowledgements_.front()); bool ReportCopyFailure(const std::string& from, const std::string& to, const syncmsg& msg) { std::vector<char> buf(msg.status.msglen + 1); if (!ReadFdExactly(fd, &buf[0], msg.status.msglen)) { Error("failed to copy '%s' to '%s'; failed to read reason (!): %s", from.c_str(), to.c_str(), strerror(errno)); return false; } buf[msg.status.msglen] = 0; Error("failed to copy '%s' to '%s': remote %s", from.c_str(), to.c_str(), &buf[0]); return false; } void CopyDone() { deferred_acknowledgements_.pop_front(); } void ReportDeferredCopyFailure(const std::string& msg) { auto& [from, to] = deferred_acknowledgements_.front(); Error("failed to copy '%s' to '%s': remote %s", from.c_str(), to.c_str(), msg.c_str()); deferred_acknowledgements_.pop_front(); result &= CopyDone(from, to); } return result; bool ReadAcknowledgements(bool read_all = false) { // We need to read enough such that adbd's intermediate socket's write buffer can't be // full. The default buffer on Linux is 212992 bytes, but there's 576 bytes of bookkeeping // overhead per write. The worst case scenario is a continuous string of failures, since // each logical packet is divided into two writes. If our packet size if conservatively 512 // bytes long, this leaves us with space for 128 responses. constexpr size_t max_deferred_acks = 128; auto& buf = acknowledgement_buffer_; adb_pollfd pfd = {.fd = fd.get(), .events = POLLIN}; while (!deferred_acknowledgements_.empty()) { bool should_block = read_all || deferred_acknowledgements_.size() >= max_deferred_acks; ssize_t rc = adb_poll(&pfd, 1, should_block ? -1 : 0); if (rc == 0) { CHECK(!should_block); return true; } bool CopyDone(const std::string& from, const std::string& to) { syncmsg msg; if (!ReadFdExactly(fd, &msg.status, sizeof(msg.status))) { Error("failed to copy '%s' to '%s': couldn't read from device", from.c_str(), to.c_str()); if (acknowledgement_buffer_.size() < sizeof(sync_status)) { const ssize_t header_bytes_left = sizeof(sync_status) - buf.size(); ssize_t rc = adb_read(fd, buf.end(), header_bytes_left); if (rc <= 0) { Error("failed to read copy response"); return false; } if (msg.status.id == ID_OKAY) { buf.resize(buf.size() + rc); if (rc != header_bytes_left) { // Early exit if we run out of data in the socket. return true; } if (msg.status.id != ID_FAIL) { Error("failed to copy '%s' to '%s': unknown reason %d", from.c_str(), to.c_str(), msg.status.id); if (!should_block) { // We don't want to read again yet, because the socket might be empty. continue; } } auto* hdr = reinterpret_cast<sync_status*>(buf.data()); if (hdr->id == ID_OKAY) { buf.resize(0); if (hdr->msglen != 0) { Error("received ID_OKAY with msg_len (%" PRIu32 " != 0", hdr->msglen); return false; } return ReportCopyFailure(from, to, msg); CopyDone(); continue; } else if (hdr->id != ID_FAIL) { Error("unexpected response from daemon: id = %#" PRIx32, hdr->id); return false; } else if (hdr->msglen > SYNC_DATA_MAX) { Error("too-long message length from daemon: msglen = %" PRIu32, hdr->msglen); return false; } bool ReportCopyFailure(const std::string& from, const std::string& to, const syncmsg& msg) { std::vector<char> buf(msg.status.msglen + 1); if (!ReadFdExactly(fd, &buf[0], msg.status.msglen)) { Error("failed to copy '%s' to '%s'; failed to read reason (!): %s", from.c_str(), to.c_str(), strerror(errno)); const ssize_t msg_bytes_left = hdr->msglen + sizeof(sync_status) - buf.size(); CHECK_GE(msg_bytes_left, 0); if (msg_bytes_left > 0) { ssize_t rc = adb_read(fd, buf.end(), msg_bytes_left); if (rc <= 0) { Error("failed to read copy failure message"); return false; } buf[msg.status.msglen] = 0; Error("failed to copy '%s' to '%s': remote %s", from.c_str(), to.c_str(), &buf[0]); buf.resize(buf.size() + rc); if (rc != msg_bytes_left) { if (should_block) { continue; } else { return true; } } std::string msg(buf.begin() + sizeof(sync_status), buf.end()); ReportDeferredCopyFailure(msg); buf.resize(0); return false; } } return true; } void Printf(const char* fmt, ...) __attribute__((__format__(__printf__, 2, 3))) { std::string s; Loading Loading @@ -613,6 +677,7 @@ class SyncConnection { private: std::deque<std::pair<std::string, std::string>> deferred_acknowledgements_; Block acknowledgement_buffer_; FeatureSet features_; bool have_stat_v2_; bool have_ls_v2_; Loading Loading @@ -721,7 +786,7 @@ static bool sync_send(SyncConnection& sc, const std::string& lpath, const std::s if (!sc.SendSmallFile(rpath, mode, lpath, rpath, mtime, buf, data_length)) { return false; } return true; return sc.ReadAcknowledgements(); #endif } Loading @@ -744,7 +809,7 @@ static bool sync_send(SyncConnection& sc, const std::string& lpath, const std::s return false; } } return true; return sc.ReadAcknowledgements(); } static bool sync_recv(SyncConnection& sc, const char* rpath, const char* lpath, Loading Loading @@ -971,8 +1036,9 @@ static bool copy_local_dir_remote(SyncConnection& sc, std::string lpath, } sc.RecordFilesSkipped(skipped); bool success = sc.ReadAcknowledgements(true); sc.ReportTransferRate(lpath, TransferDirection::push); return true; return success; } bool do_sync_push(const std::vector<const char*>& srcs, const char* dst, bool sync) { Loading Loading @@ -1065,7 +1131,7 @@ bool do_sync_push(const std::vector<const char*>& srcs, const char* dst, bool sy sc.ReportTransferRate(src_path, TransferDirection::push); } success &= sc.ReadAcknowledgments(); success &= sc.ReadAcknowledgements(true); sc.ReportOverallTransferRate(TransferDirection::push); return success; } Loading Loading
adb/client/file_sync_client.cpp +99 −33 Original line number Diff line number Diff line Loading @@ -209,7 +209,8 @@ struct TransferLedger { class SyncConnection { public: SyncConnection() { SyncConnection() : acknowledgement_buffer_(sizeof(sync_status) + SYNC_DATA_MAX) { acknowledgement_buffer_.resize(0); max = SYNC_DATA_MAX; // TODO: decide at runtime. std::string error; Loading Loading @@ -507,45 +508,108 @@ class SyncConnection { return WriteOrDie(lpath, rpath, &msg.data, sizeof(msg.data)); } bool ReadAcknowledgments() { bool result = true; while (!deferred_acknowledgements_.empty()) { auto [from, to] = std::move(deferred_acknowledgements_.front()); bool ReportCopyFailure(const std::string& from, const std::string& to, const syncmsg& msg) { std::vector<char> buf(msg.status.msglen + 1); if (!ReadFdExactly(fd, &buf[0], msg.status.msglen)) { Error("failed to copy '%s' to '%s'; failed to read reason (!): %s", from.c_str(), to.c_str(), strerror(errno)); return false; } buf[msg.status.msglen] = 0; Error("failed to copy '%s' to '%s': remote %s", from.c_str(), to.c_str(), &buf[0]); return false; } void CopyDone() { deferred_acknowledgements_.pop_front(); } void ReportDeferredCopyFailure(const std::string& msg) { auto& [from, to] = deferred_acknowledgements_.front(); Error("failed to copy '%s' to '%s': remote %s", from.c_str(), to.c_str(), msg.c_str()); deferred_acknowledgements_.pop_front(); result &= CopyDone(from, to); } return result; bool ReadAcknowledgements(bool read_all = false) { // We need to read enough such that adbd's intermediate socket's write buffer can't be // full. The default buffer on Linux is 212992 bytes, but there's 576 bytes of bookkeeping // overhead per write. The worst case scenario is a continuous string of failures, since // each logical packet is divided into two writes. If our packet size if conservatively 512 // bytes long, this leaves us with space for 128 responses. constexpr size_t max_deferred_acks = 128; auto& buf = acknowledgement_buffer_; adb_pollfd pfd = {.fd = fd.get(), .events = POLLIN}; while (!deferred_acknowledgements_.empty()) { bool should_block = read_all || deferred_acknowledgements_.size() >= max_deferred_acks; ssize_t rc = adb_poll(&pfd, 1, should_block ? -1 : 0); if (rc == 0) { CHECK(!should_block); return true; } bool CopyDone(const std::string& from, const std::string& to) { syncmsg msg; if (!ReadFdExactly(fd, &msg.status, sizeof(msg.status))) { Error("failed to copy '%s' to '%s': couldn't read from device", from.c_str(), to.c_str()); if (acknowledgement_buffer_.size() < sizeof(sync_status)) { const ssize_t header_bytes_left = sizeof(sync_status) - buf.size(); ssize_t rc = adb_read(fd, buf.end(), header_bytes_left); if (rc <= 0) { Error("failed to read copy response"); return false; } if (msg.status.id == ID_OKAY) { buf.resize(buf.size() + rc); if (rc != header_bytes_left) { // Early exit if we run out of data in the socket. return true; } if (msg.status.id != ID_FAIL) { Error("failed to copy '%s' to '%s': unknown reason %d", from.c_str(), to.c_str(), msg.status.id); if (!should_block) { // We don't want to read again yet, because the socket might be empty. continue; } } auto* hdr = reinterpret_cast<sync_status*>(buf.data()); if (hdr->id == ID_OKAY) { buf.resize(0); if (hdr->msglen != 0) { Error("received ID_OKAY with msg_len (%" PRIu32 " != 0", hdr->msglen); return false; } return ReportCopyFailure(from, to, msg); CopyDone(); continue; } else if (hdr->id != ID_FAIL) { Error("unexpected response from daemon: id = %#" PRIx32, hdr->id); return false; } else if (hdr->msglen > SYNC_DATA_MAX) { Error("too-long message length from daemon: msglen = %" PRIu32, hdr->msglen); return false; } bool ReportCopyFailure(const std::string& from, const std::string& to, const syncmsg& msg) { std::vector<char> buf(msg.status.msglen + 1); if (!ReadFdExactly(fd, &buf[0], msg.status.msglen)) { Error("failed to copy '%s' to '%s'; failed to read reason (!): %s", from.c_str(), to.c_str(), strerror(errno)); const ssize_t msg_bytes_left = hdr->msglen + sizeof(sync_status) - buf.size(); CHECK_GE(msg_bytes_left, 0); if (msg_bytes_left > 0) { ssize_t rc = adb_read(fd, buf.end(), msg_bytes_left); if (rc <= 0) { Error("failed to read copy failure message"); return false; } buf[msg.status.msglen] = 0; Error("failed to copy '%s' to '%s': remote %s", from.c_str(), to.c_str(), &buf[0]); buf.resize(buf.size() + rc); if (rc != msg_bytes_left) { if (should_block) { continue; } else { return true; } } std::string msg(buf.begin() + sizeof(sync_status), buf.end()); ReportDeferredCopyFailure(msg); buf.resize(0); return false; } } return true; } void Printf(const char* fmt, ...) __attribute__((__format__(__printf__, 2, 3))) { std::string s; Loading Loading @@ -613,6 +677,7 @@ class SyncConnection { private: std::deque<std::pair<std::string, std::string>> deferred_acknowledgements_; Block acknowledgement_buffer_; FeatureSet features_; bool have_stat_v2_; bool have_ls_v2_; Loading Loading @@ -721,7 +786,7 @@ static bool sync_send(SyncConnection& sc, const std::string& lpath, const std::s if (!sc.SendSmallFile(rpath, mode, lpath, rpath, mtime, buf, data_length)) { return false; } return true; return sc.ReadAcknowledgements(); #endif } Loading @@ -744,7 +809,7 @@ static bool sync_send(SyncConnection& sc, const std::string& lpath, const std::s return false; } } return true; return sc.ReadAcknowledgements(); } static bool sync_recv(SyncConnection& sc, const char* rpath, const char* lpath, Loading Loading @@ -971,8 +1036,9 @@ static bool copy_local_dir_remote(SyncConnection& sc, std::string lpath, } sc.RecordFilesSkipped(skipped); bool success = sc.ReadAcknowledgements(true); sc.ReportTransferRate(lpath, TransferDirection::push); return true; return success; } bool do_sync_push(const std::vector<const char*>& srcs, const char* dst, bool sync) { Loading Loading @@ -1065,7 +1131,7 @@ bool do_sync_push(const std::vector<const char*>& srcs, const char* dst, bool sy sc.ReportTransferRate(src_path, TransferDirection::push); } success &= sc.ReadAcknowledgments(); success &= sc.ReadAcknowledgements(true); sc.ReportOverallTransferRate(TransferDirection::push); return success; } Loading