Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 451197cb authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "[incfs] Stream the hash tree for incremental installation"

parents 1c146547 82c1c971
Loading
Loading
Loading
Loading
+5 −75
Original line number Original line Diff line number Diff line
@@ -16,13 +16,13 @@


#include "incremental.h"
#include "incremental.h"


#include <android-base/endian.h>
#include "incremental_utils.h"

#include <android-base/file.h>
#include <android-base/file.h>
#include <android-base/stringprintf.h>
#include <android-base/stringprintf.h>
#include <openssl/base64.h>
#include <openssl/base64.h>


#include "adb_client.h"
#include "adb_client.h"
#include "adb_io.h"
#include "adb_utils.h"
#include "adb_utils.h"
#include "commandline.h"
#include "commandline.h"
#include "sysdeps.h"
#include "sysdeps.h"
@@ -31,65 +31,8 @@ using namespace std::literals;


namespace incremental {
namespace incremental {


namespace {

static constexpr auto IDSIG = ".idsig"sv;

using android::base::StringPrintf;
using android::base::StringPrintf;


using Size = int64_t;

static inline int32_t read_int32(borrowed_fd fd) {
    int32_t result;
    return ReadFdExactly(fd, &result, sizeof(result)) ? result : -1;
}

static inline void append_int(borrowed_fd fd, std::vector<char>* bytes) {
    int32_t le_val = read_int32(fd);
    auto old_size = bytes->size();
    bytes->resize(old_size + sizeof(le_val));
    memcpy(bytes->data() + old_size, &le_val, sizeof(le_val));
}

static inline void append_bytes_with_size(borrowed_fd fd, std::vector<char>* bytes) {
    int32_t le_size = read_int32(fd);
    if (le_size < 0) {
        return;
    }
    int32_t size = int32_t(le32toh(le_size));
    auto old_size = bytes->size();
    bytes->resize(old_size + sizeof(le_size) + size);
    memcpy(bytes->data() + old_size, &le_size, sizeof(le_size));
    ReadFdExactly(fd, bytes->data() + old_size + sizeof(le_size), size);
}

static inline std::pair<std::vector<char>, int32_t> read_id_sig_headers(borrowed_fd fd) {
    std::vector<char> result;
    append_int(fd, &result);              // version
    append_bytes_with_size(fd, &result);  // hashingInfo
    append_bytes_with_size(fd, &result);  // signingInfo
    auto le_tree_size = read_int32(fd);
    auto tree_size = int32_t(le32toh(le_tree_size));  // size of the verity tree
    return {std::move(result), tree_size};
}

static inline Size verity_tree_size_for_file(Size fileSize) {
    constexpr int INCFS_DATA_FILE_BLOCK_SIZE = 4096;
    constexpr int SHA256_DIGEST_SIZE = 32;
    constexpr int digest_size = SHA256_DIGEST_SIZE;
    constexpr int hash_per_block = INCFS_DATA_FILE_BLOCK_SIZE / digest_size;

    Size total_tree_block_count = 0;

    auto block_count = 1 + (fileSize - 1) / INCFS_DATA_FILE_BLOCK_SIZE;
    auto hash_block_count = block_count;
    for (auto i = 0; hash_block_count > 1; i++) {
        hash_block_count = (hash_block_count + hash_per_block - 1) / hash_per_block;
        total_tree_block_count += hash_block_count;
    }
    return total_tree_block_count * INCFS_DATA_FILE_BLOCK_SIZE;
}

// Read, verify and return the signature bytes. Keeping fd at the position of start of verity tree.
// Read, verify and return the signature bytes. Keeping fd at the position of start of verity tree.
static std::pair<unique_fd, std::vector<char>> read_signature(Size file_size,
static std::pair<unique_fd, std::vector<char>> read_signature(Size file_size,
                                                              std::string signature_file,
                                                              std::string signature_file,
@@ -104,7 +47,7 @@ static std::pair<unique_fd, std::vector<char>> read_signature(Size file_size,
        return {};
        return {};
    }
    }


    unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY | O_CLOEXEC));
    unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY));
    if (fd < 0) {
    if (fd < 0) {
        if (!silent) {
        if (!silent) {
            fprintf(stderr, "Failed to open signature file: %s. Abort.\n", signature_file.c_str());
            fprintf(stderr, "Failed to open signature file: %s. Abort.\n", signature_file.c_str());
@@ -172,9 +115,8 @@ static unique_fd start_install(const Files& files, bool silent) {
            return {};
            return {};
        }
        }


        auto file_desc =
        auto file_desc = StringPrintf("%s:%lld:%d:%s:1", android::base::Basename(file).c_str(),
                StringPrintf("%s:%lld:%s:%s", android::base::Basename(file).c_str(),
                                      (long long)st.st_size, i, signature.c_str());
                             (long long)st.st_size, std::to_string(i).c_str(), signature.c_str());
        command_args.push_back(std::move(file_desc));
        command_args.push_back(std::move(file_desc));


        signature_fds.push_back(std::move(signature_fd));
        signature_fds.push_back(std::move(signature_fd));
@@ -190,21 +132,9 @@ static unique_fd start_install(const Files& files, bool silent) {
        return {};
        return {};
    }
    }


    // Pushing verity trees for all installation files.
    for (auto&& local_fd : signature_fds) {
        if (!copy_to_file(local_fd.get(), connection_fd.get())) {
            if (!silent) {
                fprintf(stderr, "Failed to stream tree bytes: %s. Abort.\n", strerror(errno));
            }
            return {};
        }
    }

    return connection_fd;
    return connection_fd;
}
}


}  // namespace

bool can_install(const Files& files) {
bool can_install(const Files& files) {
    for (const auto& file : files) {
    for (const auto& file : files) {
        struct stat st;
        struct stat st;
+195 −65
Original line number Original line Diff line number Diff line
/*
/*
 * Copyright (C) 2020 The Android Open Source Project
 * Copyright (C) 2020 The Android Open Source Project
 *
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * Licensed under the Apache License, Version 2.0 (the "License");
@@ -44,9 +44,10 @@


namespace incremental {
namespace incremental {


static constexpr int kBlockSize = 4096;
static constexpr int kHashesPerBlock = kBlockSize / kDigestSize;
static constexpr int kCompressedSizeMax = kBlockSize * 0.95;
static constexpr int kCompressedSizeMax = kBlockSize * 0.95;
static constexpr int8_t kTypeData = 0;
static constexpr int8_t kTypeData = 0;
static constexpr int8_t kTypeHash = 1;
static constexpr int8_t kCompressionNone = 0;
static constexpr int8_t kCompressionNone = 0;
static constexpr int8_t kCompressionLZ4 = 1;
static constexpr int8_t kCompressionLZ4 = 1;
static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize));
static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize));
@@ -132,41 +133,64 @@ struct ResponseHeader {
    CompressionType compression_type;  // 1 byte
    CompressionType compression_type;  // 1 byte
    BlockIdx block_idx;                // 4 bytes
    BlockIdx block_idx;                // 4 bytes
    BlockSize block_size;              // 2 bytes
    BlockSize block_size;              // 2 bytes

    static constexpr size_t responseSizeFor(size_t dataSize) {
        return dataSize + sizeof(ResponseHeader);
    }
} __attribute__((packed));

template <size_t Size = kBlockSize>
struct BlockBuffer {
    ResponseHeader header;
    char data[Size];
} __attribute__((packed));
} __attribute__((packed));


// Holds streaming state for a file
// Holds streaming state for a file
class File {
class File {
  public:
  public:
    // Plain file
    // Plain file
    File(const char* filepath, FileId id, int64_t size, unique_fd fd) : File(filepath, id, size) {
    File(const char* filepath, FileId id, int64_t size, unique_fd fd, int64_t tree_offset,
         unique_fd tree_fd)
        : File(filepath, id, size, tree_offset) {
        this->fd_ = std::move(fd);
        this->fd_ = std::move(fd);
        this->tree_fd_ = std::move(tree_fd);
        priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size);
        priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size);
    }
    }
    int64_t ReadBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed,
    int64_t ReadDataBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed) const {
                      std::string* error) const {
        char* buf_ptr = static_cast<char*>(buf);
        int64_t bytes_read = -1;
        int64_t bytes_read = -1;
        const off64_t offsetStart = blockIndexToOffset(block_idx);
        const off64_t offsetStart = blockIndexToOffset(block_idx);
        bytes_read = adb_pread(fd_, &buf_ptr[sizeof(ResponseHeader)], kBlockSize, offsetStart);
        bytes_read = adb_pread(fd_, buf, kBlockSize, offsetStart);
        return bytes_read;
    }
    int64_t ReadTreeBlock(BlockIdx block_idx, void* buf) const {
        int64_t bytes_read = -1;
        const off64_t offsetStart = tree_offset_ + blockIndexToOffset(block_idx);
        bytes_read = adb_pread(tree_fd_, buf, kBlockSize, offsetStart);
        return bytes_read;
        return bytes_read;
    }
    }


    const unique_fd& RawFd() const { return fd_; }
    const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; }
    const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; }


    std::vector<bool> sentBlocks;
    std::vector<bool> sentBlocks;
    NumBlocks sentBlocksCount = 0;
    NumBlocks sentBlocksCount = 0;


    std::vector<bool> sentTreeBlocks;

    const char* const filepath;
    const char* const filepath;
    const FileId id;
    const FileId id;
    const int64_t size;
    const int64_t size;


  private:
  private:
    File(const char* filepath, FileId id, int64_t size) : filepath(filepath), id(id), size(size) {
    File(const char* filepath, FileId id, int64_t size, int64_t tree_offset)
        : filepath(filepath), id(id), size(size), tree_offset_(tree_offset) {
        sentBlocks.resize(numBytesToNumBlocks(size));
        sentBlocks.resize(numBytesToNumBlocks(size));
        sentTreeBlocks.resize(verity_tree_blocks_for_file(size));
    }
    }
    unique_fd fd_;
    unique_fd fd_;
    std::vector<BlockIdx> priority_blocks_;
    std::vector<BlockIdx> priority_blocks_;

    unique_fd tree_fd_;
    const int64_t tree_offset_;
};
};


class IncrementalServer {
class IncrementalServer {
@@ -174,6 +198,8 @@ class IncrementalServer {
    IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files)
    IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files)
        : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) {
        : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) {
        buffer_.reserve(kReadBufferSize);
        buffer_.reserve(kReadBufferSize);
        pendingBlocksBuffer_.resize(kChunkFlushSize + 2 * kBlockSize);
        pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
    }
    }


    bool Serve();
    bool Serve();
@@ -208,7 +234,11 @@ class IncrementalServer {
    void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); }
    void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); }


    enum class SendResult { Sent, Skipped, Error };
    enum class SendResult { Sent, Skipped, Error };
    SendResult SendBlock(FileId fileId, BlockIdx blockIdx, bool flush = false);
    SendResult SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush = false);

    bool SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx);
    bool SendTreeBlocksForDataBlock(FileId fileId, BlockIdx blockIdx);

    bool SendDone();
    bool SendDone();
    void RunPrefetching();
    void RunPrefetching();


@@ -228,7 +258,10 @@ class IncrementalServer {
    int compressed_ = 0, uncompressed_ = 0;
    int compressed_ = 0, uncompressed_ = 0;
    long long sentSize_ = 0;
    long long sentSize_ = 0;


    std::vector<char> pendingBlocks_;
    static constexpr auto kChunkFlushSize = 31 * kBlockSize;

    std::vector<char> pendingBlocksBuffer_;
    char* pendingBlocks_ = nullptr;


    // True when client notifies that all the data has been received
    // True when client notifies that all the data has been received
    bool servingComplete_;
    bool servingComplete_;
@@ -250,7 +283,7 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking)


        if (bcur > 0) {
        if (bcur > 0) {
            // output the rest.
            // output the rest.
            WriteFdExactly(output_fd_, buffer_.data(), bcur);
            (void)WriteFdExactly(output_fd_, buffer_.data(), bcur);
            erase_buffer_head(bcur);
            erase_buffer_head(bcur);
        }
        }


@@ -265,9 +298,10 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking)
        auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0);
        auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0);


        if (res != 1) {
        if (res != 1) {
            WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
            auto err = errno;
            (void)WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
            if (res < 0) {
            if (res < 0) {
                D("Failed to poll: %s\n", strerror(errno));
                D("Failed to poll: %s", strerror(err));
                return false;
                return false;
            }
            }
            if (blocking) {
            if (blocking) {
@@ -289,7 +323,7 @@ bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking)
            continue;
            continue;
        }
        }


        D("Failed to read from fd %d: %d. Exit\n", adb_fd_.get(), errno);
        D("Failed to read from fd %d: %d. Exit", adb_fd_.get(), errno);
        break;
        break;
    }
    }
    // socket is closed. print remaining messages
    // socket is closed. print remaining messages
@@ -313,56 +347,113 @@ std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) {
    return request;
    return request;
}
}


auto IncrementalServer::SendBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult {
bool IncrementalServer::SendTreeBlocksForDataBlock(const FileId fileId, const BlockIdx blockIdx) {
    auto& file = files_[fileId];
    auto& file = files_[fileId];
    if (blockIdx >= static_cast<long>(file.sentBlocks.size())) {
    const int32_t data_block_count = numBytesToNumBlocks(file.size);
        fprintf(stderr, "Failed to read file %s at block %" PRId32 " (past end).\n", file.filepath,

    const int32_t total_nodes_count(file.sentTreeBlocks.size());
    const int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;

    const int32_t leaf_nodes_offset = total_nodes_count - leaf_nodes_count;

    // Leaf level, sending only 1 block.
    const int32_t leaf_idx = leaf_nodes_offset + blockIdx / kHashesPerBlock;
    if (file.sentTreeBlocks[leaf_idx]) {
        return true;
    }
    if (!SendTreeBlock(fileId, blockIdx, leaf_idx)) {
        return false;
    }
    file.sentTreeBlocks[leaf_idx] = true;

    // Non-leaf, sending EVERYTHING. This should be done only once.
    if (leaf_nodes_offset == 0 || file.sentTreeBlocks[0]) {
        return true;
    }

    for (int32_t i = 0; i < leaf_nodes_offset; ++i) {
        if (!SendTreeBlock(fileId, blockIdx, i)) {
            return false;
        }
        file.sentTreeBlocks[i] = true;
    }
    return true;
}

bool IncrementalServer::SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx) {
    const auto& file = files_[fileId];

    BlockBuffer buffer;
    const int64_t bytesRead = file.ReadTreeBlock(blockIdx, buffer.data);
    if (bytesRead <= 0) {
        fprintf(stderr, "Failed to get data for %s.idsig at blockIdx=%d.\n", file.filepath,
                blockIdx);
                blockIdx);
        return SendResult::Error;
        return false;
    }

    buffer.header.compression_type = kCompressionNone;
    buffer.header.block_type = kTypeHash;
    buffer.header.file_id = toBigEndian(fileId);
    buffer.header.block_size = toBigEndian(int16_t(bytesRead));
    buffer.header.block_idx = toBigEndian(blockIdx);

    Send(&buffer, ResponseHeader::responseSizeFor(bytesRead), /*flush=*/false);

    return true;
}

auto IncrementalServer::SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult {
    auto& file = files_[fileId];
    if (blockIdx >= static_cast<long>(file.sentBlocks.size())) {
        // may happen as we schedule some extra blocks for reported page misses
        D("Skipped reading file %s at block %" PRId32 " (past end).", file.filepath, blockIdx);
        return SendResult::Skipped;
    }
    }
    if (file.sentBlocks[blockIdx]) {
    if (file.sentBlocks[blockIdx]) {
        return SendResult::Skipped;
        return SendResult::Skipped;
    }
    }
    std::string error;

    char raw[sizeof(ResponseHeader) + kBlockSize];
    if (!SendTreeBlocksForDataBlock(fileId, blockIdx)) {
        return SendResult::Error;
    }

    BlockBuffer raw;
    bool isZipCompressed = false;
    bool isZipCompressed = false;
    const int64_t bytesRead = file.ReadBlock(blockIdx, &raw, &isZipCompressed, &error);
    const int64_t bytesRead = file.ReadDataBlock(blockIdx, raw.data, &isZipCompressed);
    if (bytesRead < 0) {
    if (bytesRead < 0) {
        fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%s).\n", file.filepath, blockIdx,
        fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%d).\n", file.filepath, blockIdx,
                error.c_str());
                errno);
        return SendResult::Error;
        return SendResult::Error;
    }
    }


    ResponseHeader* header = nullptr;
    BlockBuffer<kCompressBound> compressed;
    char data[sizeof(ResponseHeader) + kCompressBound];
    char* compressed = data + sizeof(*header);
    int16_t compressedSize = 0;
    int16_t compressedSize = 0;
    if (!isZipCompressed) {
    if (!isZipCompressed) {
        compressedSize =
        compressedSize = LZ4_compress_default(raw.data, compressed.data, bytesRead, kCompressBound);
                LZ4_compress_default(raw + sizeof(*header), compressed, bytesRead, kCompressBound);
    }
    }
    int16_t blockSize;
    int16_t blockSize;
    ResponseHeader* header;
    if (compressedSize > 0 && compressedSize < kCompressedSizeMax) {
    if (compressedSize > 0 && compressedSize < kCompressedSizeMax) {
        ++compressed_;
        ++compressed_;
        blockSize = compressedSize;
        blockSize = compressedSize;
        header = reinterpret_cast<ResponseHeader*>(data);
        header = &compressed.header;
        header->compression_type = kCompressionLZ4;
        header->compression_type = kCompressionLZ4;
    } else {
    } else {
        ++uncompressed_;
        ++uncompressed_;
        blockSize = bytesRead;
        blockSize = bytesRead;
        header = reinterpret_cast<ResponseHeader*>(raw);
        header = &raw.header;
        header->compression_type = kCompressionNone;
        header->compression_type = kCompressionNone;
    }
    }


    header->block_type = kTypeData;
    header->block_type = kTypeData;

    header->file_id = toBigEndian(fileId);
    header->file_id = toBigEndian(fileId);
    header->block_size = toBigEndian(blockSize);
    header->block_size = toBigEndian(blockSize);
    header->block_idx = toBigEndian(blockIdx);
    header->block_idx = toBigEndian(blockIdx);


    file.sentBlocks[blockIdx] = true;
    file.sentBlocks[blockIdx] = true;
    file.sentBlocksCount += 1;
    file.sentBlocksCount += 1;
    Send(header, sizeof(*header) + blockSize, flush);
    Send(header, ResponseHeader::responseSizeFor(blockSize), flush);

    return SendResult::Sent;
    return SendResult::Sent;
}
}


@@ -388,7 +479,8 @@ void IncrementalServer::RunPrefetching() {
        if (!priority_blocks.empty()) {
        if (!priority_blocks.empty()) {
            for (auto& i = prefetch.priorityIndex;
            for (auto& i = prefetch.priorityIndex;
                 blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) {
                 blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) {
                if (auto res = SendBlock(file.id, priority_blocks[i]); res == SendResult::Sent) {
                if (auto res = SendDataBlock(file.id, priority_blocks[i]);
                    res == SendResult::Sent) {
                    --blocksToSend;
                    --blocksToSend;
                } else if (res == SendResult::Error) {
                } else if (res == SendResult::Error) {
                    fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i);
                    fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i);
@@ -396,7 +488,7 @@ void IncrementalServer::RunPrefetching() {
            }
            }
        }
        }
        for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) {
        for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) {
            if (auto res = SendBlock(file.id, i); res == SendResult::Sent) {
            if (auto res = SendDataBlock(file.id, i); res == SendResult::Sent) {
                --blocksToSend;
                --blocksToSend;
            } else if (res == SendResult::Error) {
            } else if (res == SendResult::Error) {
                fprintf(stderr, "Failed to send block %" PRId32 "\n", i);
                fprintf(stderr, "Failed to send block %" PRId32 "\n", i);
@@ -409,30 +501,25 @@ void IncrementalServer::RunPrefetching() {
}
}


void IncrementalServer::Send(const void* data, size_t size, bool flush) {
void IncrementalServer::Send(const void* data, size_t size, bool flush) {
    constexpr auto kChunkFlushSize = 31 * kBlockSize;
    pendingBlocks_ = std::copy_n(static_cast<const char*>(data), size, pendingBlocks_);

    if (flush || pendingBlocks_ - pendingBlocksBuffer_.data() > kChunkFlushSize) {
    if (pendingBlocks_.empty()) {
        pendingBlocks_.resize(sizeof(ChunkHeader));
    }
    pendingBlocks_.insert(pendingBlocks_.end(), static_cast<const char*>(data),
                          static_cast<const char*>(data) + size);
    if (flush || pendingBlocks_.size() > kChunkFlushSize) {
        Flush();
        Flush();
    }
    }
}
}


void IncrementalServer::Flush() {
void IncrementalServer::Flush() {
    if (pendingBlocks_.empty()) {
    auto dataBytes = pendingBlocks_ - (pendingBlocksBuffer_.data() + sizeof(ChunkHeader));
    if (dataBytes == 0) {
        return;
        return;
    }
    }


    *(ChunkHeader*)pendingBlocks_.data() =
    *(ChunkHeader*)pendingBlocksBuffer_.data() = toBigEndian<int32_t>(dataBytes);
            toBigEndian<int32_t>(pendingBlocks_.size() - sizeof(ChunkHeader));
    auto totalBytes = sizeof(ChunkHeader) + dataBytes;
    if (!WriteFdExactly(adb_fd_, pendingBlocks_.data(), pendingBlocks_.size())) {
    if (!WriteFdExactly(adb_fd_, pendingBlocksBuffer_.data(), totalBytes)) {
        fprintf(stderr, "Failed to write %d bytes\n", int(pendingBlocks_.size()));
        fprintf(stderr, "Failed to write %d bytes\n", int(totalBytes));
    }
    }
    sentSize_ += pendingBlocks_.size();
    sentSize_ += totalBytes;
    pendingBlocks_.clear();
    pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
}
}


bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount,
bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount,
@@ -443,7 +530,7 @@ bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int
    D("Streaming completed.\n"
    D("Streaming completed.\n"
      "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: "
      "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: "
      "%d, mb: %.3f\n"
      "%d, mb: %.3f\n"
      "Total time taken: %.3fms\n",
      "Total time taken: %.3fms",
      missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0,
      missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0,
      duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0);
      duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0);
    return true;
    return true;
@@ -510,9 +597,21 @@ bool IncrementalServer::Serve() {
                                fileId, blockIdx);
                                fileId, blockIdx);
                        break;
                        break;
                    }
                    }
                    // fprintf(stderr, "\treading file %d block %04d\n", (int)fileId,

                    // (int)blockIdx);
                    if (VLOG_IS_ON(INCREMENTAL)) {
                    if (auto res = SendBlock(fileId, blockIdx, true); res == SendResult::Error) {
                        auto& file = files_[fileId];
                        auto posP = std::find(file.PriorityBlocks().begin(),
                                              file.PriorityBlocks().end(), blockIdx);
                        D("\tMISSING BLOCK: reading file %d block %04d (in priority: %d of %d)",
                          (int)fileId, (int)blockIdx,
                          posP == file.PriorityBlocks().end()
                                  ? -1
                                  : int(posP - file.PriorityBlocks().begin()),
                          int(file.PriorityBlocks().size()));
                    }

                    if (auto res = SendDataBlock(fileId, blockIdx, true);
                        res == SendResult::Error) {
                        fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx);
                        fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx);
                    } else if (res == SendResult::Sent) {
                    } else if (res == SendResult::Sent) {
                        ++missesSent;
                        ++missesSent;
@@ -536,7 +635,7 @@ bool IncrementalServer::Serve() {
                                fileId);
                                fileId);
                        break;
                        break;
                    }
                    }
                    D("Received prefetch request for file_id %" PRId16 ".\n", fileId);
                    D("Received prefetch request for file_id %" PRId16 ".", fileId);
                    prefetches_.emplace_back(files_[fileId]);
                    prefetches_.emplace_back(files_[fileId]);
                    break;
                    break;
                }
                }
@@ -551,6 +650,43 @@ bool IncrementalServer::Serve() {
    }
    }
}
}


static std::pair<unique_fd, int64_t> open_fd(const char* filepath) {
    struct stat st;
    if (stat(filepath, &st)) {
        error_exit("inc-server: failed to stat input file '%s'.", filepath);
    }

    unique_fd fd(adb_open(filepath, O_RDONLY));
    if (fd < 0) {
        error_exit("inc-server: failed to open file '%s'.", filepath);
    }

    return {std::move(fd), st.st_size};
}

static std::pair<unique_fd, int64_t> open_signature(int64_t file_size, const char* filepath) {
    std::string signature_file(filepath);
    signature_file += IDSIG;

    unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY));
    if (fd < 0) {
        error_exit("inc-server: failed to open file '%s'.", signature_file.c_str());
    }

    auto [tree_offset, tree_size] = skip_id_sig_headers(fd);
    if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) {
        error_exit("Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n",
                   signature_file.c_str(), (long long)tree_size, (long long)expected);
    }

    int32_t data_block_count = numBytesToNumBlocks(file_size);
    int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
    D("Verity tree loaded: %s, tree size: %d (%d blocks, %d leafs)", signature_file.c_str(),
      int(tree_size), int(numBytesToNumBlocks(tree_size)), int(leaf_nodes_count));

    return {std::move(fd), tree_offset};
}

bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
    auto connection_ufd = unique_fd(connection_fd);
    auto connection_ufd = unique_fd(connection_fd);
    auto output_ufd = unique_fd(output_fd);
    auto output_ufd = unique_fd(output_fd);
@@ -563,17 +699,11 @@ bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
    for (int i = 0; i < argc; ++i) {
    for (int i = 0; i < argc; ++i) {
        auto filepath = argv[i];
        auto filepath = argv[i];


        struct stat st;
        auto [file_fd, file_size] = open_fd(filepath);
        if (stat(filepath, &st)) {
        auto [sign_fd, sign_offset] = open_signature(file_size, filepath);
            fprintf(stderr, "Failed to stat input file %s. Abort.\n", filepath);
            return {};
        }


        unique_fd fd(adb_open(filepath, O_RDONLY));
        files.emplace_back(filepath, i, file_size, std::move(file_fd), sign_offset,
        if (fd < 0) {
                           std::move(sign_fd));
            error_exit("inc-server: failed to open file '%s'.", filepath);
        }
        files.emplace_back(filepath, i, st.st_size, std::move(fd));
    }
    }


    IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files));
    IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files));
+101 −17

File changed.

Preview size limit exceeded, changes collapsed.

+23 −3
Original line number Original line Diff line number Diff line
@@ -16,11 +16,31 @@


#pragma once
#pragma once


#include <stdint.h>
#include "adb_unique_fd.h"


#include <string>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include <vector>


#include <stdint.h>

namespace incremental {
namespace incremental {
std::vector<int32_t> PriorityBlocksForFile(const std::string& filepath, int fd, int64_t fileSize);

using Size = int64_t;
constexpr int kBlockSize = 4096;
constexpr int kSha256DigestSize = 32;
constexpr int kDigestSize = kSha256DigestSize;

constexpr std::string_view IDSIG = ".idsig";

std::vector<int32_t> PriorityBlocksForFile(const std::string& filepath, borrowed_fd fd,
                                           Size fileSize);

Size verity_tree_blocks_for_file(Size fileSize);
Size verity_tree_size_for_file(Size fileSize);

std::pair<std::vector<char>, int32_t> read_id_sig_headers(borrowed_fd fd);
std::pair<off64_t, ssize_t> skip_id_sig_headers(borrowed_fd fd);

}  // namespace incremental
}  // namespace incremental