Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6687cb65 authored by Kelvin Zhang's avatar Kelvin Zhang
Browse files

Support multi-threaded compression in COW v3

Performance of COW v3 is now on par with v2 in both multi-threaded and
single threaded configurations. Note, v2 cow writer can cache up to 1024
blocks in memory if multi-threaded compression is enabled(even though
batch size is configured as 200). For a fair comparison, benchmarks are
ran with batch size of 256. For batch size of 256 or greater, v2 and v3
have similar multi-threaded performance.

Test: th
Bug: 313962438
Change-Id: I377c8291689a7a038bb00b09d7371a155e6972e9
parent 004187ac
Loading
Loading
Loading
Loading
+100 −19
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@
#include <zlib.h>

#include <fcntl.h>
#include <libsnapshot/cow_compress.h>
#include <libsnapshot_cow/parser_v3.h>
#include <linux/fs.h>
#include <sys/ioctl.h>
@@ -55,11 +56,35 @@ static_assert(sizeof(off_t) == sizeof(uint64_t));

using android::base::unique_fd;

// Divide |x| by |y| and round up to the nearest integer.
constexpr uint64_t DivRoundUp(uint64_t x, uint64_t y) {
    return (x + y - 1) / y;
}

CowWriterV3::CowWriterV3(const CowOptions& options, unique_fd&& fd)
    : CowWriterBase(options, std::move(fd)), batch_size_(std::max<size_t>(options.cluster_ops, 1)) {
    SetupHeaders();
}

void CowWriterV3::InitWorkers() {
    if (num_compress_threads_ <= 1) {
        LOG_INFO << "Not creating new threads for compression.";
        return;
    }
    compress_threads_.reserve(num_compress_threads_);
    compress_threads_.clear();
    threads_.reserve(num_compress_threads_);
    threads_.clear();
    for (size_t i = 0; i < num_compress_threads_; i++) {
        std::unique_ptr<ICompressor> compressor =
                ICompressor::Create(compression_, header_.block_size);
        auto&& wt = compress_threads_.emplace_back(
                std::make_unique<CompressWorker>(std::move(compressor), header_.block_size));
        threads_.emplace_back(std::thread([wt = wt.get()]() { wt->RunThread(); }));
    }
    LOG(INFO) << num_compress_threads_ << " thread used for compression";
}

void CowWriterV3::SetupHeaders() {
    header_ = {};
    header_.prefix.magic = kCowMagicNumber;
@@ -135,10 +160,24 @@ bool CowWriterV3::ParseOptions() {
    } else {
        LOG(INFO) << "Batch writes: disabled";
    }
    if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false) &&
        options_.num_compress_threads) {
        num_compress_threads_ = options_.num_compress_threads;
    }
    InitWorkers();
    return true;
}

CowWriterV3::~CowWriterV3() {}
CowWriterV3::~CowWriterV3() {
    for (const auto& t : compress_threads_) {
        t->Finalize();
    }
    for (auto& t : threads_) {
        if (t.joinable()) {
            t.join();
        }
    }
}

bool CowWriterV3::Initialize(std::optional<uint64_t> label) {
    if (!InitFd() || !ParseOptions()) {
@@ -289,19 +328,24 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t
                   << " but compressor is uninitialized.";
        return false;
    }
    const auto bytes = reinterpret_cast<const uint8_t*>(data);
    const size_t num_blocks = (size / header_.block_size);

    for (size_t i = 0; i < num_blocks;) {
        const auto blocks_to_write =
                std::min<size_t>(batch_size_ - cached_data_.size(), num_blocks - i);
        size_t compressed_bytes = 0;
        auto&& blocks = CompressBlocks(blocks_to_write, bytes + header_.block_size * i);
        if (blocks.size() != blocks_to_write) {
            LOG(ERROR) << "Failed to compress blocks " << new_block_start + i << ", "
                       << blocks_to_write << ", actual number of blocks received from compressor "
                       << blocks.size();
            return false;
        }
        for (size_t j = 0; j < blocks_to_write; j++) {
            const uint8_t* const iter =
                    reinterpret_cast<const uint8_t*>(data) + (header_.block_size * (i + j));

            CowOperation& op = cached_ops_.emplace_back();
            auto& vec = data_vec_.emplace_back();
            auto& compressed_data = cached_data_.emplace_back();
            auto& compressed_data = cached_data_.emplace_back(std::move(blocks[j]));
            op.new_block = new_block_start + i + j;

            op.set_type(type);
@@ -310,20 +354,6 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t
            } else {
                op.set_source(next_data_pos_ + compressed_bytes);
            }
            if (compression_.algorithm == kCowCompressNone) {
                compressed_data.resize(header_.block_size);
            } else {
                compressed_data = compressor_->Compress(iter, header_.block_size);
                if (compressed_data.empty()) {
                    LOG(ERROR) << "Compression failed during EmitBlocks(" << new_block_start << ", "
                               << num_blocks << ");";
                    return false;
                }
            }
            if (compressed_data.size() >= header_.block_size) {
                compressed_data.resize(header_.block_size);
                std::memcpy(compressed_data.data(), iter, header_.block_size);
            }
            vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
            op.data_length = vec.iov_len;
            compressed_bytes += op.data_length;
@@ -443,6 +473,57 @@ bool CowWriterV3::FlushCacheOps() {
    return true;
}

std::vector<std::basic_string<uint8_t>> CowWriterV3::CompressBlocks(const size_t num_blocks,
                                                                    const void* data) {
    const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
    const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
    std::vector<std::basic_string<uint8_t>> compressed_buf;
    compressed_buf.clear();
    const uint8_t* const iter = reinterpret_cast<const uint8_t*>(data);
    if (compression_.algorithm == kCowCompressNone) {
        for (size_t i = 0; i < num_blocks; i++) {
            auto& buf = compressed_buf.emplace_back();
            buf.resize(header_.block_size);
            std::memcpy(buf.data(), iter + i * header_.block_size, header_.block_size);
        }
        return compressed_buf;
    }
    if (num_threads <= 1) {
        if (!CompressWorker::CompressBlocks(compressor_.get(), header_.block_size, data, num_blocks,
                                            &compressed_buf)) {
            return {};
        }
    } else {
        // Submit the blocks per thread. The retrieval of
        // compressed buffers has to be done in the same order.
        // We should not poll for completed buffers in a different order as the
        // buffers are tightly coupled with block ordering.
        for (size_t i = 0; i < num_threads; i++) {
            CompressWorker* worker = compress_threads_[i].get();
            const auto blocks_in_batch =
                    std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
            worker->EnqueueCompressBlocks(iter + i * blocks_per_thread * header_.block_size,
                                          blocks_in_batch);
        }

        for (size_t i = 0; i < num_threads; i++) {
            CompressWorker* worker = compress_threads_[i].get();
            if (!worker->GetCompressedBuffers(&compressed_buf)) {
                return {};
            }
        }
    }
    for (size_t i = 0; i < num_blocks; i++) {
        auto& block = compressed_buf[i];
        if (block.size() >= header_.block_size) {
            block.resize(header_.block_size);
            std::memcpy(block.data(), iter + header_.block_size * i, header_.block_size);
        }
    }

    return compressed_buf;
}

bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
                                 std::basic_string_view<struct iovec> data) {
    const auto total_data_size =
+6 −1
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

#include <android-base/logging.h>
#include <string_view>
#include <thread>
#include <vector>

#include "writer_base.h"
@@ -51,12 +52,14 @@ class CowWriterV3 : public CowWriterBase {
                        std::basic_string_view<struct iovec> data);
    bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
                    uint16_t offset, CowOperationType type);
    bool CompressBlocks(size_t num_blocks, const void* data);
    bool CheckOpCount(size_t op_count);

  private:
    std::vector<std::basic_string<uint8_t>> CompressBlocks(const size_t num_blocks,
                                                           const void* data);
    bool ReadBackVerification();
    bool FlushCacheOps();
    void InitWorkers();
    CowHeaderV3 header_{};
    CowCompression compression_;
    // in the case that we are using one thread for compression, we can store and re-use the same
@@ -75,6 +78,8 @@ class CowWriterV3 : public CowWriterBase {
    std::vector<CowOperationV3> cached_ops_;
    std::vector<std::basic_string<uint8_t>> cached_data_;
    std::vector<struct iovec> data_vec_;

    std::vector<std::thread> threads_;
};

}  // namespace snapshot