Loading adb/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -298,6 +298,7 @@ cc_binary_host { "client/fastdeploycallbacks.cpp", "client/incremental.cpp", "client/incremental_server.cpp", "client/incremental_utils.cpp", "shell_service_protocol.cpp", ], Loading adb/client/commandline.cpp +40 −9 Original line number Diff line number Diff line Loading @@ -1423,6 +1423,26 @@ static bool _is_valid_ack_reply_fd(const int ack_reply_fd) { #endif } static bool _is_valid_fd(int fd) { // Disallow invalid FDs and stdin/out/err as well. if (fd < 3) { return false; } #ifdef _WIN32 HANDLE handle = adb_get_os_handle(fd); DWORD info = 0; if (GetHandleInformation(handle, &info) == 0) { return false; } #else int flags = fcntl(fd, F_GETFD); if (flags == -1) { return false; } #endif return true; } int adb_commandline(int argc, const char** argv) { bool no_daemon = false; bool is_daemon = false; Loading Loading @@ -1977,17 +1997,28 @@ int adb_commandline(int argc, const char** argv) { } } } else if (!strcmp(argv[0], "inc-server")) { if (argc < 3) { error_exit("usage: adb inc-server FD FILE1 FILE2 ..."); if (argc < 4) { #ifdef _WIN32 error_exit("usage: adb inc-server CONNECTION_HANDLE OUTPUT_HANDLE FILE1 FILE2 ..."); #else error_exit("usage: adb inc-server CONNECTION_FD OUTPUT_FD FILE1 FILE2 ..."); #endif } int fd = atoi(argv[1]); if (fd < 3) { // Disallow invalid FDs and stdin/out/err as well. error_exit("Invalid fd number given: %d", fd); int connection_fd = atoi(argv[1]); if (!_is_valid_fd(connection_fd)) { error_exit("Invalid connection_fd number given: %d", connection_fd); } connection_fd = adb_register_socket(connection_fd); close_on_exec(connection_fd); int output_fd = atoi(argv[2]); if (!_is_valid_fd(output_fd)) { error_exit("Invalid output_fd number given: %d", output_fd); } fd = adb_register_socket(fd); close_on_exec(fd); return incremental::serve(fd, argc - 2, argv + 2); output_fd = adb_register_socket(output_fd); close_on_exec(output_fd); return incremental::serve(connection_fd, output_fd, argc - 3, argv + 3); } error_exit("unknown command %s", argv[0]); Loading adb/client/incremental.cpp +56 −4 Original line number Diff line number Diff line Loading @@ -193,20 +193,72 @@ std::optional<Process> install(std::vector<std::string> files) { auto fd_param = std::to_string(osh); #endif // pipe for child process to write output int print_fds[2]; if (adb_socketpair(print_fds) != 0) { fprintf(stderr, "Failed to create socket pair for child to print to parent\n"); return {}; } auto [pipe_read_fd, pipe_write_fd] = print_fds; auto pipe_write_fd_param = std::to_string(pipe_write_fd); close_on_exec(pipe_read_fd); std::vector<std::string> args(std::move(files)); args.insert(args.begin(), {"inc-server", fd_param}); auto child = adb_launch_process(adb_path, std::move(args), {connection_fd.get()}); args.insert(args.begin(), {"inc-server", fd_param, pipe_write_fd_param}); auto child = adb_launch_process(adb_path, std::move(args), {connection_fd.get(), pipe_write_fd}); if (!child) { fprintf(stderr, "adb: failed to fork: %s\n", strerror(errno)); return {}; } adb_close(pipe_write_fd); auto killOnExit = [](Process* p) { p->kill(); }; std::unique_ptr<Process, decltype(killOnExit)> serverKiller(&child, killOnExit); // TODO: Terminate server process if installation fails. serverKiller.release(); Result result = wait_for_installation(pipe_read_fd); adb_close(pipe_read_fd); if (result == Result::Success) { // adb client exits now but inc-server can continue serverKiller.release(); } return child; } Result wait_for_installation(int read_fd) { static constexpr int maxMessageSize = 256; std::vector<char> child_stdout(CHUNK_SIZE); int bytes_read; int buf_size = 0; // TODO(b/150865433): optimize child's output parsing while ((bytes_read = adb_read(read_fd, child_stdout.data() + buf_size, child_stdout.size() - buf_size)) > 0) { // print to parent's stdout fprintf(stdout, "%.*s", bytes_read, child_stdout.data() + buf_size); buf_size += bytes_read; const std::string_view stdout_str(child_stdout.data(), buf_size); // wait till installation either succeeds or fails if (stdout_str.find("Success") != std::string::npos) { return Result::Success; } // on failure, wait for full message static constexpr auto failure_msg_head = "Failure ["sv; if (const auto begin_itr = stdout_str.find(failure_msg_head); begin_itr != std::string::npos) { if (buf_size >= maxMessageSize) { return Result::Failure; } const auto end_itr = stdout_str.rfind("]"); if (end_itr != std::string::npos && end_itr >= begin_itr + failure_msg_head.size()) { return Result::Failure; } } child_stdout.resize(buf_size + CHUNK_SIZE); } return Result::None; } } // namespace incremental adb/client/incremental.h +3 −0 Original line number Diff line number Diff line Loading @@ -27,4 +27,7 @@ namespace incremental { std::optional<Process> install(std::vector<std::string> files); enum class Result { Success, Failure, None }; Result wait_for_installation(int read_fd); } // namespace incremental adb/client/incremental_server.cpp +87 −43 Original line number Diff line number Diff line Loading @@ -18,13 +18,6 @@ #include "incremental_server.h" #include "adb.h" #include "adb_io.h" #include "adb_trace.h" #include "adb_unique_fd.h" #include "adb_utils.h" #include "sysdeps.h" #include <android-base/endian.h> #include <android-base/strings.h> #include <inttypes.h> Loading @@ -41,6 +34,14 @@ #include <type_traits> #include <unordered_set> #include "adb.h" #include "adb_io.h" #include "adb_trace.h" #include "adb_unique_fd.h" #include "adb_utils.h" #include "incremental_utils.h" #include "sysdeps.h" namespace incremental { static constexpr int kBlockSize = 4096; Loading @@ -49,6 +50,7 @@ static constexpr short kCompressionNone = 0; static constexpr short kCompressionLZ4 = 1; static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize)); static constexpr auto kReadBufferSize = 128 * 1024; static constexpr int kPollTimeoutMillis = 300000; // 5 minutes using BlockSize = int16_t; using FileId = int16_t; Loading @@ -61,9 +63,10 @@ using MagicType = uint32_t; static constexpr MagicType INCR = 0x494e4352; // LE INCR static constexpr RequestType EXIT = 0; static constexpr RequestType SERVING_COMPLETE = 0; static constexpr RequestType BLOCK_MISSING = 1; static constexpr RequestType PREFETCH = 2; static constexpr RequestType DESTROY = 3; static constexpr inline int64_t roundDownToBlockOffset(int64_t val) { return val & ~(kBlockSize - 1); Loading Loading @@ -134,6 +137,7 @@ class File { // Plain file File(const char* filepath, FileId id, int64_t size, unique_fd fd) : File(filepath, id, size) { this->fd_ = std::move(fd); priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size); } int64_t ReadBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed, std::string* error) const { Loading @@ -145,6 +149,7 @@ class File { } const unique_fd& RawFd() const { return fd_; } const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; } std::vector<bool> sentBlocks; NumBlocks sentBlocksCount = 0; Loading @@ -158,12 +163,13 @@ class File { sentBlocks.resize(numBytesToNumBlocks(size)); } unique_fd fd_; std::vector<BlockIdx> priority_blocks_; }; class IncrementalServer { public: IncrementalServer(unique_fd fd, std::vector<File> files) : adb_fd_(std::move(fd)), files_(std::move(files)) { IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files) : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) { buffer_.reserve(kReadBufferSize); } Loading @@ -174,14 +180,23 @@ class IncrementalServer { const File* file; BlockIdx overallIndex = 0; BlockIdx overallEnd = 0; BlockIdx priorityIndex = 0; PrefetchState(const File& f) : file(&f), overallEnd((BlockIdx)f.sentBlocks.size()) {} PrefetchState(const File& f, BlockIdx start, int count) explicit PrefetchState(const File& f, BlockIdx start, int count) : file(&f), overallIndex(start), overallEnd(std::min<BlockIdx>(start + count, f.sentBlocks.size())) {} bool done() const { return overallIndex >= overallEnd; } explicit PrefetchState(const File& f) : PrefetchState(f, 0, (BlockIdx)f.sentBlocks.size()) {} bool done() const { const bool overallSent = (overallIndex >= overallEnd); if (file->PriorityBlocks().empty()) { return overallSent; } return overallSent && (priorityIndex >= (BlockIdx)file->PriorityBlocks().size()); } }; bool SkipToRequest(void* buffer, size_t* size, bool blocking); Loading @@ -197,9 +212,10 @@ class IncrementalServer { void Send(const void* data, size_t size, bool flush); void Flush(); using TimePoint = decltype(std::chrono::high_resolution_clock::now()); bool Exit(std::optional<TimePoint> startTime, int missesCount, int missesSent); bool ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent); unique_fd const adb_fd_; unique_fd const output_fd_; std::vector<File> files_; // Incoming data buffer. Loading @@ -210,6 +226,9 @@ class IncrementalServer { long long sentSize_ = 0; std::vector<char> pendingBlocks_; // True when client notifies that all the data has been received bool servingComplete_; }; bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) { Loading @@ -217,7 +236,8 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) // Looking for INCR magic. bool magic_found = false; int bcur = 0; for (int bsize = buffer_.size(); bcur + 4 < bsize; ++bcur) { int bsize = buffer_.size(); for (bcur = 0; bcur + 4 < bsize; ++bcur) { uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur)); if (magic == INCR) { magic_found = true; Loading @@ -226,8 +246,8 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) } if (bcur > 0) { // Stream the rest to stderr. fprintf(stderr, "%.*s", bcur, buffer_.data()); // output the rest. WriteFdExactly(output_fd_, buffer_.data(), bcur); erase_buffer_head(bcur); } Loading @@ -239,17 +259,26 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) } adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0}; auto res = adb_poll(&pfd, 1, blocking ? -1 : 0); auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0); if (res != 1) { WriteFdExactly(output_fd_, buffer_.data(), buffer_.size()); if (res < 0) { fprintf(stderr, "Failed to poll: %s\n", strerror(errno)); D("Failed to poll: %s\n", strerror(errno)); return false; } if (blocking) { fprintf(stderr, "Timed out waiting for data from device.\n"); } if (blocking && servingComplete_) { // timeout waiting from client. Serving is complete, so quit. return false; } *size = 0; return true; } auto bsize = buffer_.size(); bsize = buffer_.size(); buffer_.resize(kReadBufferSize); int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize); if (r > 0) { Loading @@ -257,21 +286,19 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) continue; } if (r == -1) { fprintf(stderr, "Failed to read from fd %d: %d. Exit\n", adb_fd_.get(), errno); return false; D("Failed to read from fd %d: %d. Exit\n", adb_fd_.get(), errno); break; } // socket is closed // socket is closed. print remaining messages WriteFdExactly(output_fd_, buffer_.data(), buffer_.size()); return false; } } std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) { uint8_t commandBuf[sizeof(RequestCommand)]; auto size = sizeof(commandBuf); if (!SkipToRequest(&commandBuf, &size, blocking)) { return {{EXIT}}; return {{DESTROY}}; } if (size < sizeof(RequestCommand)) { return {}; Loading Loading @@ -351,6 +378,17 @@ void IncrementalServer::RunPrefetching() { while (!prefetches_.empty() && blocksToSend > 0) { auto& prefetch = prefetches_.front(); const auto& file = *prefetch.file; const auto& priority_blocks = file.PriorityBlocks(); if (!priority_blocks.empty()) { for (auto& i = prefetch.priorityIndex; blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) { if (auto res = SendBlock(file.id, priority_blocks[i]); res == SendResult::Sent) { --blocksToSend; } else if (res == SendResult::Error) { fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i); } } } for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) { if (auto res = SendBlock(file.id, i); res == SendResult::Sent) { --blocksToSend; Loading Loading @@ -391,17 +429,17 @@ void IncrementalServer::Flush() { pendingBlocks_.clear(); } bool IncrementalServer::Exit(std::optional<TimePoint> startTime, int missesCount, int missesSent) { bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent) { servingComplete_ = true; using namespace std::chrono; auto endTime = high_resolution_clock::now(); fprintf(stderr, "Connection failed or received exit command. Exit.\n" D("Streaming completed.\n" "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: " "%d, mb: %.3f\n" "Total time taken: %.3fms\n", missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0, duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0); duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0); return true; } Loading @@ -425,7 +463,7 @@ bool IncrementalServer::Serve() { std::all_of(files_.begin(), files_.end(), [](const File& f) { return f.sentBlocksCount == NumBlocks(f.sentBlocks.size()); })) { fprintf(stdout, "All files should be loaded. Notifying the device.\n"); fprintf(stderr, "All files should be loaded. Notifying the device.\n"); SendDone(); doneSent = true; } Loading @@ -446,9 +484,14 @@ bool IncrementalServer::Serve() { BlockIdx blockIdx = request->block_idx; switch (request->request_type) { case EXIT: { case DESTROY: { // Stop everything. return Exit(startTime, missesCount, missesSent); return true; } case SERVING_COMPLETE: { // Not stopping the server here. ServingComplete(startTime, missesCount, missesSent); break; } case BLOCK_MISSING: { ++missesCount; Loading Loading @@ -502,8 +545,9 @@ bool IncrementalServer::Serve() { } } bool serve(int adb_fd, int argc, const char** argv) { auto connection_fd = unique_fd(adb_fd); bool serve(int connection_fd, int output_fd, int argc, const char** argv) { auto connection_ufd = unique_fd(connection_fd); auto output_ufd = unique_fd(output_fd); if (argc <= 0) { error_exit("inc-server: must specify at least one file."); } Loading @@ -526,7 +570,7 @@ bool serve(int adb_fd, int argc, const char** argv) { files.emplace_back(filepath, i, st.st_size, std::move(fd)); } IncrementalServer server(std::move(connection_fd), std::move(files)); IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files)); printf("Serving...\n"); fclose(stdin); fclose(stdout); Loading Loading
adb/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -298,6 +298,7 @@ cc_binary_host { "client/fastdeploycallbacks.cpp", "client/incremental.cpp", "client/incremental_server.cpp", "client/incremental_utils.cpp", "shell_service_protocol.cpp", ], Loading
adb/client/commandline.cpp +40 −9 Original line number Diff line number Diff line Loading @@ -1423,6 +1423,26 @@ static bool _is_valid_ack_reply_fd(const int ack_reply_fd) { #endif } static bool _is_valid_fd(int fd) { // Disallow invalid FDs and stdin/out/err as well. if (fd < 3) { return false; } #ifdef _WIN32 HANDLE handle = adb_get_os_handle(fd); DWORD info = 0; if (GetHandleInformation(handle, &info) == 0) { return false; } #else int flags = fcntl(fd, F_GETFD); if (flags == -1) { return false; } #endif return true; } int adb_commandline(int argc, const char** argv) { bool no_daemon = false; bool is_daemon = false; Loading Loading @@ -1977,17 +1997,28 @@ int adb_commandline(int argc, const char** argv) { } } } else if (!strcmp(argv[0], "inc-server")) { if (argc < 3) { error_exit("usage: adb inc-server FD FILE1 FILE2 ..."); if (argc < 4) { #ifdef _WIN32 error_exit("usage: adb inc-server CONNECTION_HANDLE OUTPUT_HANDLE FILE1 FILE2 ..."); #else error_exit("usage: adb inc-server CONNECTION_FD OUTPUT_FD FILE1 FILE2 ..."); #endif } int fd = atoi(argv[1]); if (fd < 3) { // Disallow invalid FDs and stdin/out/err as well. error_exit("Invalid fd number given: %d", fd); int connection_fd = atoi(argv[1]); if (!_is_valid_fd(connection_fd)) { error_exit("Invalid connection_fd number given: %d", connection_fd); } connection_fd = adb_register_socket(connection_fd); close_on_exec(connection_fd); int output_fd = atoi(argv[2]); if (!_is_valid_fd(output_fd)) { error_exit("Invalid output_fd number given: %d", output_fd); } fd = adb_register_socket(fd); close_on_exec(fd); return incremental::serve(fd, argc - 2, argv + 2); output_fd = adb_register_socket(output_fd); close_on_exec(output_fd); return incremental::serve(connection_fd, output_fd, argc - 3, argv + 3); } error_exit("unknown command %s", argv[0]); Loading
adb/client/incremental.cpp +56 −4 Original line number Diff line number Diff line Loading @@ -193,20 +193,72 @@ std::optional<Process> install(std::vector<std::string> files) { auto fd_param = std::to_string(osh); #endif // pipe for child process to write output int print_fds[2]; if (adb_socketpair(print_fds) != 0) { fprintf(stderr, "Failed to create socket pair for child to print to parent\n"); return {}; } auto [pipe_read_fd, pipe_write_fd] = print_fds; auto pipe_write_fd_param = std::to_string(pipe_write_fd); close_on_exec(pipe_read_fd); std::vector<std::string> args(std::move(files)); args.insert(args.begin(), {"inc-server", fd_param}); auto child = adb_launch_process(adb_path, std::move(args), {connection_fd.get()}); args.insert(args.begin(), {"inc-server", fd_param, pipe_write_fd_param}); auto child = adb_launch_process(adb_path, std::move(args), {connection_fd.get(), pipe_write_fd}); if (!child) { fprintf(stderr, "adb: failed to fork: %s\n", strerror(errno)); return {}; } adb_close(pipe_write_fd); auto killOnExit = [](Process* p) { p->kill(); }; std::unique_ptr<Process, decltype(killOnExit)> serverKiller(&child, killOnExit); // TODO: Terminate server process if installation fails. serverKiller.release(); Result result = wait_for_installation(pipe_read_fd); adb_close(pipe_read_fd); if (result == Result::Success) { // adb client exits now but inc-server can continue serverKiller.release(); } return child; } Result wait_for_installation(int read_fd) { static constexpr int maxMessageSize = 256; std::vector<char> child_stdout(CHUNK_SIZE); int bytes_read; int buf_size = 0; // TODO(b/150865433): optimize child's output parsing while ((bytes_read = adb_read(read_fd, child_stdout.data() + buf_size, child_stdout.size() - buf_size)) > 0) { // print to parent's stdout fprintf(stdout, "%.*s", bytes_read, child_stdout.data() + buf_size); buf_size += bytes_read; const std::string_view stdout_str(child_stdout.data(), buf_size); // wait till installation either succeeds or fails if (stdout_str.find("Success") != std::string::npos) { return Result::Success; } // on failure, wait for full message static constexpr auto failure_msg_head = "Failure ["sv; if (const auto begin_itr = stdout_str.find(failure_msg_head); begin_itr != std::string::npos) { if (buf_size >= maxMessageSize) { return Result::Failure; } const auto end_itr = stdout_str.rfind("]"); if (end_itr != std::string::npos && end_itr >= begin_itr + failure_msg_head.size()) { return Result::Failure; } } child_stdout.resize(buf_size + CHUNK_SIZE); } return Result::None; } } // namespace incremental
adb/client/incremental.h +3 −0 Original line number Diff line number Diff line Loading @@ -27,4 +27,7 @@ namespace incremental { std::optional<Process> install(std::vector<std::string> files); enum class Result { Success, Failure, None }; Result wait_for_installation(int read_fd); } // namespace incremental
adb/client/incremental_server.cpp +87 −43 Original line number Diff line number Diff line Loading @@ -18,13 +18,6 @@ #include "incremental_server.h" #include "adb.h" #include "adb_io.h" #include "adb_trace.h" #include "adb_unique_fd.h" #include "adb_utils.h" #include "sysdeps.h" #include <android-base/endian.h> #include <android-base/strings.h> #include <inttypes.h> Loading @@ -41,6 +34,14 @@ #include <type_traits> #include <unordered_set> #include "adb.h" #include "adb_io.h" #include "adb_trace.h" #include "adb_unique_fd.h" #include "adb_utils.h" #include "incremental_utils.h" #include "sysdeps.h" namespace incremental { static constexpr int kBlockSize = 4096; Loading @@ -49,6 +50,7 @@ static constexpr short kCompressionNone = 0; static constexpr short kCompressionLZ4 = 1; static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize)); static constexpr auto kReadBufferSize = 128 * 1024; static constexpr int kPollTimeoutMillis = 300000; // 5 minutes using BlockSize = int16_t; using FileId = int16_t; Loading @@ -61,9 +63,10 @@ using MagicType = uint32_t; static constexpr MagicType INCR = 0x494e4352; // LE INCR static constexpr RequestType EXIT = 0; static constexpr RequestType SERVING_COMPLETE = 0; static constexpr RequestType BLOCK_MISSING = 1; static constexpr RequestType PREFETCH = 2; static constexpr RequestType DESTROY = 3; static constexpr inline int64_t roundDownToBlockOffset(int64_t val) { return val & ~(kBlockSize - 1); Loading Loading @@ -134,6 +137,7 @@ class File { // Plain file File(const char* filepath, FileId id, int64_t size, unique_fd fd) : File(filepath, id, size) { this->fd_ = std::move(fd); priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size); } int64_t ReadBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed, std::string* error) const { Loading @@ -145,6 +149,7 @@ class File { } const unique_fd& RawFd() const { return fd_; } const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; } std::vector<bool> sentBlocks; NumBlocks sentBlocksCount = 0; Loading @@ -158,12 +163,13 @@ class File { sentBlocks.resize(numBytesToNumBlocks(size)); } unique_fd fd_; std::vector<BlockIdx> priority_blocks_; }; class IncrementalServer { public: IncrementalServer(unique_fd fd, std::vector<File> files) : adb_fd_(std::move(fd)), files_(std::move(files)) { IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files) : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) { buffer_.reserve(kReadBufferSize); } Loading @@ -174,14 +180,23 @@ class IncrementalServer { const File* file; BlockIdx overallIndex = 0; BlockIdx overallEnd = 0; BlockIdx priorityIndex = 0; PrefetchState(const File& f) : file(&f), overallEnd((BlockIdx)f.sentBlocks.size()) {} PrefetchState(const File& f, BlockIdx start, int count) explicit PrefetchState(const File& f, BlockIdx start, int count) : file(&f), overallIndex(start), overallEnd(std::min<BlockIdx>(start + count, f.sentBlocks.size())) {} bool done() const { return overallIndex >= overallEnd; } explicit PrefetchState(const File& f) : PrefetchState(f, 0, (BlockIdx)f.sentBlocks.size()) {} bool done() const { const bool overallSent = (overallIndex >= overallEnd); if (file->PriorityBlocks().empty()) { return overallSent; } return overallSent && (priorityIndex >= (BlockIdx)file->PriorityBlocks().size()); } }; bool SkipToRequest(void* buffer, size_t* size, bool blocking); Loading @@ -197,9 +212,10 @@ class IncrementalServer { void Send(const void* data, size_t size, bool flush); void Flush(); using TimePoint = decltype(std::chrono::high_resolution_clock::now()); bool Exit(std::optional<TimePoint> startTime, int missesCount, int missesSent); bool ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent); unique_fd const adb_fd_; unique_fd const output_fd_; std::vector<File> files_; // Incoming data buffer. Loading @@ -210,6 +226,9 @@ class IncrementalServer { long long sentSize_ = 0; std::vector<char> pendingBlocks_; // True when client notifies that all the data has been received bool servingComplete_; }; bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) { Loading @@ -217,7 +236,8 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) // Looking for INCR magic. bool magic_found = false; int bcur = 0; for (int bsize = buffer_.size(); bcur + 4 < bsize; ++bcur) { int bsize = buffer_.size(); for (bcur = 0; bcur + 4 < bsize; ++bcur) { uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur)); if (magic == INCR) { magic_found = true; Loading @@ -226,8 +246,8 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) } if (bcur > 0) { // Stream the rest to stderr. fprintf(stderr, "%.*s", bcur, buffer_.data()); // output the rest. WriteFdExactly(output_fd_, buffer_.data(), bcur); erase_buffer_head(bcur); } Loading @@ -239,17 +259,26 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) } adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0}; auto res = adb_poll(&pfd, 1, blocking ? -1 : 0); auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0); if (res != 1) { WriteFdExactly(output_fd_, buffer_.data(), buffer_.size()); if (res < 0) { fprintf(stderr, "Failed to poll: %s\n", strerror(errno)); D("Failed to poll: %s\n", strerror(errno)); return false; } if (blocking) { fprintf(stderr, "Timed out waiting for data from device.\n"); } if (blocking && servingComplete_) { // timeout waiting from client. Serving is complete, so quit. return false; } *size = 0; return true; } auto bsize = buffer_.size(); bsize = buffer_.size(); buffer_.resize(kReadBufferSize); int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize); if (r > 0) { Loading @@ -257,21 +286,19 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) continue; } if (r == -1) { fprintf(stderr, "Failed to read from fd %d: %d. Exit\n", adb_fd_.get(), errno); return false; D("Failed to read from fd %d: %d. Exit\n", adb_fd_.get(), errno); break; } // socket is closed // socket is closed. print remaining messages WriteFdExactly(output_fd_, buffer_.data(), buffer_.size()); return false; } } std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) { uint8_t commandBuf[sizeof(RequestCommand)]; auto size = sizeof(commandBuf); if (!SkipToRequest(&commandBuf, &size, blocking)) { return {{EXIT}}; return {{DESTROY}}; } if (size < sizeof(RequestCommand)) { return {}; Loading Loading @@ -351,6 +378,17 @@ void IncrementalServer::RunPrefetching() { while (!prefetches_.empty() && blocksToSend > 0) { auto& prefetch = prefetches_.front(); const auto& file = *prefetch.file; const auto& priority_blocks = file.PriorityBlocks(); if (!priority_blocks.empty()) { for (auto& i = prefetch.priorityIndex; blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) { if (auto res = SendBlock(file.id, priority_blocks[i]); res == SendResult::Sent) { --blocksToSend; } else if (res == SendResult::Error) { fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i); } } } for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) { if (auto res = SendBlock(file.id, i); res == SendResult::Sent) { --blocksToSend; Loading Loading @@ -391,17 +429,17 @@ void IncrementalServer::Flush() { pendingBlocks_.clear(); } bool IncrementalServer::Exit(std::optional<TimePoint> startTime, int missesCount, int missesSent) { bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent) { servingComplete_ = true; using namespace std::chrono; auto endTime = high_resolution_clock::now(); fprintf(stderr, "Connection failed or received exit command. Exit.\n" D("Streaming completed.\n" "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: " "%d, mb: %.3f\n" "Total time taken: %.3fms\n", missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0, duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0); duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0); return true; } Loading @@ -425,7 +463,7 @@ bool IncrementalServer::Serve() { std::all_of(files_.begin(), files_.end(), [](const File& f) { return f.sentBlocksCount == NumBlocks(f.sentBlocks.size()); })) { fprintf(stdout, "All files should be loaded. Notifying the device.\n"); fprintf(stderr, "All files should be loaded. Notifying the device.\n"); SendDone(); doneSent = true; } Loading @@ -446,9 +484,14 @@ bool IncrementalServer::Serve() { BlockIdx blockIdx = request->block_idx; switch (request->request_type) { case EXIT: { case DESTROY: { // Stop everything. return Exit(startTime, missesCount, missesSent); return true; } case SERVING_COMPLETE: { // Not stopping the server here. ServingComplete(startTime, missesCount, missesSent); break; } case BLOCK_MISSING: { ++missesCount; Loading Loading @@ -502,8 +545,9 @@ bool IncrementalServer::Serve() { } } bool serve(int adb_fd, int argc, const char** argv) { auto connection_fd = unique_fd(adb_fd); bool serve(int connection_fd, int output_fd, int argc, const char** argv) { auto connection_ufd = unique_fd(connection_fd); auto output_ufd = unique_fd(output_fd); if (argc <= 0) { error_exit("inc-server: must specify at least one file."); } Loading @@ -526,7 +570,7 @@ bool serve(int adb_fd, int argc, const char** argv) { files.emplace_back(filepath, i, st.st_size, std::move(fd)); } IncrementalServer server(std::move(connection_fd), std::move(files)); IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files)); printf("Serving...\n"); fclose(stdin); fclose(stdout); Loading