Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c2872d83 authored by Yurii Zubrytskyi's avatar Yurii Zubrytskyi Committed by Android (Google) Code Review
Browse files

Merge changes Ifda7de48,Ie33505f9 into rvc-dev

* changes:
  [adb incremental] send priority blocks first
  [adb data server] wait for installation results before terminates
parents 2e46b62e 8cdefd4f
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -298,6 +298,7 @@ cc_binary_host {
        "client/fastdeploycallbacks.cpp",
        "client/incremental.cpp",
        "client/incremental_server.cpp",
        "client/incremental_utils.cpp",
        "shell_service_protocol.cpp",
    ],

+40 −9
Original line number Diff line number Diff line
@@ -1423,6 +1423,26 @@ static bool _is_valid_ack_reply_fd(const int ack_reply_fd) {
#endif
}

static bool _is_valid_fd(int fd) {
    // Disallow invalid FDs and stdin/out/err as well.
    if (fd < 3) {
        return false;
    }
#ifdef _WIN32
    HANDLE handle = adb_get_os_handle(fd);
    DWORD info = 0;
    if (GetHandleInformation(handle, &info) == 0) {
        return false;
    }
#else
    int flags = fcntl(fd, F_GETFD);
    if (flags == -1) {
        return false;
    }
#endif
    return true;
}

int adb_commandline(int argc, const char** argv) {
    bool no_daemon = false;
    bool is_daemon = false;
@@ -1977,17 +1997,28 @@ int adb_commandline(int argc, const char** argv) {
            }
        }
    } else if (!strcmp(argv[0], "inc-server")) {
        if (argc < 3) {
            error_exit("usage: adb inc-server FD FILE1 FILE2 ...");
        if (argc < 4) {
#ifdef _WIN32
            error_exit("usage: adb inc-server CONNECTION_HANDLE OUTPUT_HANDLE FILE1 FILE2 ...");
#else
            error_exit("usage: adb inc-server CONNECTION_FD OUTPUT_FD FILE1 FILE2 ...");
#endif
        }
        int fd = atoi(argv[1]);
        if (fd < 3) {
            // Disallow invalid FDs and stdin/out/err as well.
            error_exit("Invalid fd number given: %d", fd);
        int connection_fd = atoi(argv[1]);
        if (!_is_valid_fd(connection_fd)) {
            error_exit("Invalid connection_fd number given: %d", connection_fd);
        }

        connection_fd = adb_register_socket(connection_fd);
        close_on_exec(connection_fd);

        int output_fd = atoi(argv[2]);
        if (!_is_valid_fd(output_fd)) {
            error_exit("Invalid output_fd number given: %d", output_fd);
        }
        fd = adb_register_socket(fd);
        close_on_exec(fd);
        return incremental::serve(fd, argc - 2, argv + 2);
        output_fd = adb_register_socket(output_fd);
        close_on_exec(output_fd);
        return incremental::serve(connection_fd, output_fd, argc - 3, argv + 3);
    }

    error_exit("unknown command %s", argv[0]);
+56 −4
Original line number Diff line number Diff line
@@ -193,20 +193,72 @@ std::optional<Process> install(std::vector<std::string> files) {
    auto fd_param = std::to_string(osh);
#endif

    // pipe for child process to write output
    int print_fds[2];
    if (adb_socketpair(print_fds) != 0) {
        fprintf(stderr, "Failed to create socket pair for child to print to parent\n");
        return {};
    }
    auto [pipe_read_fd, pipe_write_fd] = print_fds;
    auto pipe_write_fd_param = std::to_string(pipe_write_fd);
    close_on_exec(pipe_read_fd);

    std::vector<std::string> args(std::move(files));
    args.insert(args.begin(), {"inc-server", fd_param});
    auto child = adb_launch_process(adb_path, std::move(args), {connection_fd.get()});
    args.insert(args.begin(), {"inc-server", fd_param, pipe_write_fd_param});
    auto child =
            adb_launch_process(adb_path, std::move(args), {connection_fd.get(), pipe_write_fd});
    if (!child) {
        fprintf(stderr, "adb: failed to fork: %s\n", strerror(errno));
        return {};
    }

    adb_close(pipe_write_fd);

    auto killOnExit = [](Process* p) { p->kill(); };
    std::unique_ptr<Process, decltype(killOnExit)> serverKiller(&child, killOnExit);
    // TODO: Terminate server process if installation fails.
    serverKiller.release();

    Result result = wait_for_installation(pipe_read_fd);
    adb_close(pipe_read_fd);

    if (result == Result::Success) {
        // adb client exits now but inc-server can continue
        serverKiller.release();
    }
    return child;
}

Result wait_for_installation(int read_fd) {
    static constexpr int maxMessageSize = 256;
    std::vector<char> child_stdout(CHUNK_SIZE);
    int bytes_read;
    int buf_size = 0;
    // TODO(b/150865433): optimize child's output parsing
    while ((bytes_read = adb_read(read_fd, child_stdout.data() + buf_size,
                                  child_stdout.size() - buf_size)) > 0) {
        // print to parent's stdout
        fprintf(stdout, "%.*s", bytes_read, child_stdout.data() + buf_size);

        buf_size += bytes_read;
        const std::string_view stdout_str(child_stdout.data(), buf_size);
        // wait till installation either succeeds or fails
        if (stdout_str.find("Success") != std::string::npos) {
            return Result::Success;
        }
        // on failure, wait for full message
        static constexpr auto failure_msg_head = "Failure ["sv;
        if (const auto begin_itr = stdout_str.find(failure_msg_head);
            begin_itr != std::string::npos) {
            if (buf_size >= maxMessageSize) {
                return Result::Failure;
            }
            const auto end_itr = stdout_str.rfind("]");
            if (end_itr != std::string::npos && end_itr >= begin_itr + failure_msg_head.size()) {
                return Result::Failure;
            }
        }
        child_stdout.resize(buf_size + CHUNK_SIZE);
    }
    return Result::None;
}

}  // namespace incremental
+3 −0
Original line number Diff line number Diff line
@@ -27,4 +27,7 @@ namespace incremental {

std::optional<Process> install(std::vector<std::string> files);

enum class Result { Success, Failure, None };
Result wait_for_installation(int read_fd);

}  // namespace incremental
+87 −43
Original line number Diff line number Diff line
@@ -18,13 +18,6 @@

#include "incremental_server.h"

#include "adb.h"
#include "adb_io.h"
#include "adb_trace.h"
#include "adb_unique_fd.h"
#include "adb_utils.h"
#include "sysdeps.h"

#include <android-base/endian.h>
#include <android-base/strings.h>
#include <inttypes.h>
@@ -41,6 +34,14 @@
#include <type_traits>
#include <unordered_set>

#include "adb.h"
#include "adb_io.h"
#include "adb_trace.h"
#include "adb_unique_fd.h"
#include "adb_utils.h"
#include "incremental_utils.h"
#include "sysdeps.h"

namespace incremental {

static constexpr int kBlockSize = 4096;
@@ -49,6 +50,7 @@ static constexpr short kCompressionNone = 0;
static constexpr short kCompressionLZ4 = 1;
static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize));
static constexpr auto kReadBufferSize = 128 * 1024;
static constexpr int kPollTimeoutMillis = 300000;  // 5 minutes

using BlockSize = int16_t;
using FileId = int16_t;
@@ -61,9 +63,10 @@ using MagicType = uint32_t;

static constexpr MagicType INCR = 0x494e4352;  // LE INCR

static constexpr RequestType EXIT = 0;
static constexpr RequestType SERVING_COMPLETE = 0;
static constexpr RequestType BLOCK_MISSING = 1;
static constexpr RequestType PREFETCH = 2;
static constexpr RequestType DESTROY = 3;

static constexpr inline int64_t roundDownToBlockOffset(int64_t val) {
    return val & ~(kBlockSize - 1);
@@ -134,6 +137,7 @@ class File {
    // Plain file
    File(const char* filepath, FileId id, int64_t size, unique_fd fd) : File(filepath, id, size) {
        this->fd_ = std::move(fd);
        priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size);
    }
    int64_t ReadBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed,
                      std::string* error) const {
@@ -145,6 +149,7 @@ class File {
    }

    const unique_fd& RawFd() const { return fd_; }
    const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; }

    std::vector<bool> sentBlocks;
    NumBlocks sentBlocksCount = 0;
@@ -158,12 +163,13 @@ class File {
        sentBlocks.resize(numBytesToNumBlocks(size));
    }
    unique_fd fd_;
    std::vector<BlockIdx> priority_blocks_;
};

class IncrementalServer {
  public:
    IncrementalServer(unique_fd fd, std::vector<File> files)
        : adb_fd_(std::move(fd)), files_(std::move(files)) {
    IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files)
        : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) {
        buffer_.reserve(kReadBufferSize);
    }

@@ -174,14 +180,23 @@ class IncrementalServer {
        const File* file;
        BlockIdx overallIndex = 0;
        BlockIdx overallEnd = 0;
        BlockIdx priorityIndex = 0;

        PrefetchState(const File& f) : file(&f), overallEnd((BlockIdx)f.sentBlocks.size()) {}
        PrefetchState(const File& f, BlockIdx start, int count)
        explicit PrefetchState(const File& f, BlockIdx start, int count)
            : file(&f),
              overallIndex(start),
              overallEnd(std::min<BlockIdx>(start + count, f.sentBlocks.size())) {}

        bool done() const { return overallIndex >= overallEnd; }
        explicit PrefetchState(const File& f)
            : PrefetchState(f, 0, (BlockIdx)f.sentBlocks.size()) {}

        bool done() const {
            const bool overallSent = (overallIndex >= overallEnd);
            if (file->PriorityBlocks().empty()) {
                return overallSent;
            }
            return overallSent && (priorityIndex >= (BlockIdx)file->PriorityBlocks().size());
        }
    };

    bool SkipToRequest(void* buffer, size_t* size, bool blocking);
@@ -197,9 +212,10 @@ class IncrementalServer {
    void Send(const void* data, size_t size, bool flush);
    void Flush();
    using TimePoint = decltype(std::chrono::high_resolution_clock::now());
    bool Exit(std::optional<TimePoint> startTime, int missesCount, int missesSent);
    bool ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent);

    unique_fd const adb_fd_;
    unique_fd const output_fd_;
    std::vector<File> files_;

    // Incoming data buffer.
@@ -210,6 +226,9 @@ class IncrementalServer {
    long long sentSize_ = 0;

    std::vector<char> pendingBlocks_;

    // True when client notifies that all the data has been received
    bool servingComplete_;
};

bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) {
@@ -217,7 +236,8 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking)
        // Looking for INCR magic.
        bool magic_found = false;
        int bcur = 0;
        for (int bsize = buffer_.size(); bcur + 4 < bsize; ++bcur) {
        int bsize = buffer_.size();
        for (bcur = 0; bcur + 4 < bsize; ++bcur) {
            uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur));
            if (magic == INCR) {
                magic_found = true;
@@ -226,8 +246,8 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking)
        }

        if (bcur > 0) {
            // Stream the rest to stderr.
            fprintf(stderr, "%.*s", bcur, buffer_.data());
            // output the rest.
            WriteFdExactly(output_fd_, buffer_.data(), bcur);
            erase_buffer_head(bcur);
        }

@@ -239,17 +259,26 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking)
        }

        adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0};
        auto res = adb_poll(&pfd, 1, blocking ? -1 : 0);
        auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0);

        if (res != 1) {
            WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
            if (res < 0) {
                fprintf(stderr, "Failed to poll: %s\n", strerror(errno));
                D("Failed to poll: %s\n", strerror(errno));
                return false;
            }
            if (blocking) {
                fprintf(stderr, "Timed out waiting for data from device.\n");
            }
            if (blocking && servingComplete_) {
                // timeout waiting from client. Serving is complete, so quit.
                return false;
            }
            *size = 0;
            return true;
        }

        auto bsize = buffer_.size();
        bsize = buffer_.size();
        buffer_.resize(kReadBufferSize);
        int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize);
        if (r > 0) {
@@ -257,21 +286,19 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking)
            continue;
        }

        if (r == -1) {
            fprintf(stderr, "Failed to read from fd %d: %d. Exit\n", adb_fd_.get(), errno);
            return false;
        D("Failed to read from fd %d: %d. Exit\n", adb_fd_.get(), errno);
        break;
    }

        // socket is closed
    // socket is closed. print remaining messages
    WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
    return false;
}
}

std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) {
    uint8_t commandBuf[sizeof(RequestCommand)];
    auto size = sizeof(commandBuf);
    if (!SkipToRequest(&commandBuf, &size, blocking)) {
        return {{EXIT}};
        return {{DESTROY}};
    }
    if (size < sizeof(RequestCommand)) {
        return {};
@@ -351,6 +378,17 @@ void IncrementalServer::RunPrefetching() {
    while (!prefetches_.empty() && blocksToSend > 0) {
        auto& prefetch = prefetches_.front();
        const auto& file = *prefetch.file;
        const auto& priority_blocks = file.PriorityBlocks();
        if (!priority_blocks.empty()) {
            for (auto& i = prefetch.priorityIndex;
                 blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) {
                if (auto res = SendBlock(file.id, priority_blocks[i]); res == SendResult::Sent) {
                    --blocksToSend;
                } else if (res == SendResult::Error) {
                    fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i);
                }
            }
        }
        for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) {
            if (auto res = SendBlock(file.id, i); res == SendResult::Sent) {
                --blocksToSend;
@@ -391,17 +429,17 @@ void IncrementalServer::Flush() {
    pendingBlocks_.clear();
}

bool IncrementalServer::Exit(std::optional<TimePoint> startTime, int missesCount, int missesSent) {
bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount,
                                        int missesSent) {
    servingComplete_ = true;
    using namespace std::chrono;
    auto endTime = high_resolution_clock::now();
    fprintf(stderr,
            "Connection failed or received exit command. Exit.\n"
    D("Streaming completed.\n"
      "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: "
      "%d, mb: %.3f\n"
      "Total time taken: %.3fms\n",
      missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0,
            duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() /
                    1000.0);
      duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0);
    return true;
}

@@ -425,7 +463,7 @@ bool IncrementalServer::Serve() {
            std::all_of(files_.begin(), files_.end(), [](const File& f) {
                return f.sentBlocksCount == NumBlocks(f.sentBlocks.size());
            })) {
            fprintf(stdout, "All files should be loaded. Notifying the device.\n");
            fprintf(stderr, "All files should be loaded. Notifying the device.\n");
            SendDone();
            doneSent = true;
        }
@@ -446,9 +484,14 @@ bool IncrementalServer::Serve() {
            BlockIdx blockIdx = request->block_idx;

            switch (request->request_type) {
                case EXIT: {
                case DESTROY: {
                    // Stop everything.
                    return Exit(startTime, missesCount, missesSent);
                    return true;
                }
                case SERVING_COMPLETE: {
                    // Not stopping the server here.
                    ServingComplete(startTime, missesCount, missesSent);
                    break;
                }
                case BLOCK_MISSING: {
                    ++missesCount;
@@ -502,8 +545,9 @@ bool IncrementalServer::Serve() {
    }
}

bool serve(int adb_fd, int argc, const char** argv) {
    auto connection_fd = unique_fd(adb_fd);
bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
    auto connection_ufd = unique_fd(connection_fd);
    auto output_ufd = unique_fd(output_fd);
    if (argc <= 0) {
        error_exit("inc-server: must specify at least one file.");
    }
@@ -526,7 +570,7 @@ bool serve(int adb_fd, int argc, const char** argv) {
        files.emplace_back(filepath, i, st.st_size, std::move(fd));
    }

    IncrementalServer server(std::move(connection_fd), std::move(files));
    IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files));
    printf("Serving...\n");
    fclose(stdin);
    fclose(stdout);
Loading