Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 74295d4f authored by Daniel Zheng's avatar Daniel Zheng Committed by Gerrit Code Review
Browse files

Merge changes I3feede9f,I9194e2c6 into main

* changes:
  Optimize zstd compression
  Adding compressor class
parents 49dc9d12 03acfdbe
Loading
Loading
Loading
Loading
+48 −0
Original line number Original line Diff line number Diff line
//
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#pragma once

#include <memory>
#include <string_view>
#include "libsnapshot/cow_format.h"

namespace android {
namespace snapshot {

class ICompressor {
  public:
    explicit ICompressor(uint32_t compression_level) : compression_level_(compression_level) {}

    virtual ~ICompressor() {}
    // Factory methods for compression methods.
    static std::unique_ptr<ICompressor> Gz(uint32_t compression_level);
    static std::unique_ptr<ICompressor> Brotli(uint32_t compression_level);
    static std::unique_ptr<ICompressor> Lz4(uint32_t compression_level);
    static std::unique_ptr<ICompressor> Zstd(uint32_t compression_level);

    static std::unique_ptr<ICompressor> Create(CowCompression compression);

    uint32_t GetCompressionLevel() const { return compression_level_; }

    [[nodiscard]] virtual std::basic_string<uint8_t> Compress(const void* data,
                                                              size_t length) const = 0;

  private:
    uint32_t compression_level_;
};
}  // namespace snapshot
}  // namespace android
 No newline at end of file
+5 −5
Original line number Original line Diff line number Diff line
@@ -14,6 +14,8 @@


#pragma once
#pragma once


#include <libsnapshot/cow_compress.h>

#include <stdint.h>
#include <stdint.h>


#include <condition_variable>
#include <condition_variable>
@@ -107,16 +109,14 @@ class ICowWriter {


class CompressWorker {
class CompressWorker {
  public:
  public:
    CompressWorker(CowCompression compression, uint32_t block_size);
    CompressWorker(std::unique_ptr<ICompressor>&& compressor, uint32_t block_size);
    bool RunThread();
    bool RunThread();
    void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
    void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
    bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
    bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
    void Finalize();
    void Finalize();
    static uint32_t GetDefaultCompressionLevel(CowCompressionAlgorithm compression);
    static uint32_t GetDefaultCompressionLevel(CowCompressionAlgorithm compression);
    static std::basic_string<uint8_t> Compress(CowCompression compression, const void* data,
                                               size_t length);


    static bool CompressBlocks(CowCompression compression, size_t block_size, const void* buffer,
    static bool CompressBlocks(ICompressor* compressor, size_t block_size, const void* buffer,
                               size_t num_blocks,
                               size_t num_blocks,
                               std::vector<std::basic_string<uint8_t>>* compressed_data);
                               std::vector<std::basic_string<uint8_t>>* compressed_data);


@@ -128,7 +128,7 @@ class CompressWorker {
        std::vector<std::basic_string<uint8_t>> compressed_data;
        std::vector<std::basic_string<uint8_t>> compressed_data;
    };
    };


    CowCompression compression_;
    std::unique_ptr<ICompressor> compressor_;
    uint32_t block_size_;
    uint32_t block_size_;


    std::queue<CompressWork> work_queue_;
    std::queue<CompressWork> work_queue_;
+146 −86
Original line number Original line Diff line number Diff line
@@ -18,12 +18,16 @@
#include <unistd.h>
#include <unistd.h>


#include <limits>
#include <limits>
#include <memory>
#include <queue>
#include <queue>


#include <android-base/file.h>
#include <android-base/file.h>
#include <android-base/logging.h>
#include <android-base/logging.h>
#include <android-base/parseint.h>
#include <android-base/strings.h>
#include <android-base/unique_fd.h>
#include <android-base/unique_fd.h>
#include <brotli/encode.h>
#include <brotli/encode.h>
#include <libsnapshot/cow_compress.h>
#include <libsnapshot/cow_format.h>
#include <libsnapshot/cow_format.h>
#include <libsnapshot/cow_reader.h>
#include <libsnapshot/cow_reader.h>
#include <libsnapshot/cow_writer.h>
#include <libsnapshot/cow_writer.h>
@@ -51,6 +55,22 @@ std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::strin
    }
    }
}
}


std::unique_ptr<ICompressor> ICompressor::Create(CowCompression compression) {
    switch (compression.algorithm) {
        case kCowCompressLz4:
            return ICompressor::Lz4(compression.compression_level);
        case kCowCompressBrotli:
            return ICompressor::Brotli(compression.compression_level);
        case kCowCompressGz:
            return ICompressor::Gz(compression.compression_level);
        case kCowCompressZstd:
            return ICompressor::Zstd(compression.compression_level);
        case kCowCompressNone:
            return nullptr;
    }
    return nullptr;
}

// 1. Default compression level is determined by compression algorithm
// 1. Default compression level is determined by compression algorithm
// 2. There might be compatibility issues if a value is changed here, as  some older versions of
// 2. There might be compatibility issues if a value is changed here, as  some older versions of
// Android will assume a different compression level, causing cow_size estimation differences that
// Android will assume a different compression level, causing cow_size estimation differences that
@@ -77,43 +97,31 @@ uint32_t CompressWorker::GetDefaultCompressionLevel(CowCompressionAlgorithm comp
    return 0;
    return 0;
}
}


std::basic_string<uint8_t> CompressWorker::Compress(CowCompression compression, const void* data,
class GzCompressor final : public ICompressor {
                                                    size_t length) {
  public:
    switch (compression.algorithm) {
    GzCompressor(uint32_t compression_level) : ICompressor(compression_level){};
        case kCowCompressGz: {

    std::basic_string<uint8_t> Compress(const void* data, size_t length) const override {
        const auto bound = compressBound(length);
        const auto bound = compressBound(length);
        std::basic_string<uint8_t> buffer(bound, '\0');
        std::basic_string<uint8_t> buffer(bound, '\0');


        uLongf dest_len = bound;
        uLongf dest_len = bound;
            auto rv = compress2(buffer.data(), &dest_len, reinterpret_cast<const Bytef*>(data),
        auto rv = compress2(buffer.data(), &dest_len, reinterpret_cast<const Bytef*>(data), length,
                                length, compression.compression_level);
                            GetCompressionLevel());
        if (rv != Z_OK) {
        if (rv != Z_OK) {
            LOG(ERROR) << "compress2 returned: " << rv;
            LOG(ERROR) << "compress2 returned: " << rv;
            return {};
            return {};
        }
        }
        buffer.resize(dest_len);
        buffer.resize(dest_len);
        return buffer;
        return buffer;
        }
    };
        case kCowCompressBrotli: {
};
            const auto bound = BrotliEncoderMaxCompressedSize(length);
            if (!bound) {
                LOG(ERROR) << "BrotliEncoderMaxCompressedSize returned 0";
                return {};
            }
            std::basic_string<uint8_t> buffer(bound, '\0');


            size_t encoded_size = bound;
class Lz4Compressor final : public ICompressor {
            auto rv = BrotliEncoderCompress(
  public:
                    compression.compression_level, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE,
    Lz4Compressor(uint32_t compression_level) : ICompressor(compression_level){};
                    length, reinterpret_cast<const uint8_t*>(data), &encoded_size, buffer.data());

            if (!rv) {
    std::basic_string<uint8_t> Compress(const void* data, size_t length) const override {
                LOG(ERROR) << "BrotliEncoderCompress failed";
                return {};
            }
            buffer.resize(encoded_size);
            return buffer;
        }
        case kCowCompressLz4: {
        const auto bound = LZ4_compressBound(length);
        const auto bound = LZ4_compressBound(length);
        if (!bound) {
        if (!bound) {
            LOG(ERROR) << "LZ4_compressBound returned 0";
            LOG(ERROR) << "LZ4_compressBound returned 0";
@@ -121,9 +129,9 @@ std::basic_string<uint8_t> CompressWorker::Compress(CowCompression compression,
        }
        }
        std::basic_string<uint8_t> buffer(bound, '\0');
        std::basic_string<uint8_t> buffer(bound, '\0');


            const auto compressed_size = LZ4_compress_default(
        const auto compressed_size =
                    static_cast<const char*>(data), reinterpret_cast<char*>(buffer.data()), length,
                LZ4_compress_default(static_cast<const char*>(data),
                    buffer.size());
                                     reinterpret_cast<char*>(buffer.data()), length, buffer.size());
        if (compressed_size <= 0) {
        if (compressed_size <= 0) {
            LOG(ERROR) << "LZ4_compress_default failed, input size: " << length
            LOG(ERROR) << "LZ4_compress_default failed, input size: " << length
                       << ", compression bound: " << bound << ", ret: " << compressed_size;
                       << ", compression bound: " << bound << ", ret: " << compressed_size;
@@ -137,11 +145,48 @@ std::basic_string<uint8_t> CompressWorker::Compress(CowCompression compression,
            buffer.resize(compressed_size);
            buffer.resize(compressed_size);
        }
        }
        return buffer;
        return buffer;
    };
};

class BrotliCompressor final : public ICompressor {
  public:
    BrotliCompressor(uint32_t compression_level) : ICompressor(compression_level){};

    std::basic_string<uint8_t> Compress(const void* data, size_t length) const override {
        const auto bound = BrotliEncoderMaxCompressedSize(length);
        if (!bound) {
            LOG(ERROR) << "BrotliEncoderMaxCompressedSize returned 0";
            return {};
        }
        }
        case kCowCompressZstd: {
        std::basic_string<uint8_t> buffer(bound, '\0');

        size_t encoded_size = bound;
        auto rv = BrotliEncoderCompress(
                GetCompressionLevel(), BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, length,
                reinterpret_cast<const uint8_t*>(data), &encoded_size, buffer.data());
        if (!rv) {
            LOG(ERROR) << "BrotliEncoderCompress failed";
            return {};
        }
        buffer.resize(encoded_size);
        return buffer;
    };
};

class ZstdCompressor final : public ICompressor {
  public:
    ZstdCompressor(uint32_t compression_level)
        : ICompressor(compression_level), zstd_context_(ZSTD_createCCtx(), ZSTD_freeCCtx) {
        ZSTD_CCtx_setParameter(zstd_context_.get(), ZSTD_c_compressionLevel, compression_level);
        // FIXME: hardcoding a value of 12 here for 4k blocks, should change to be either set by
        // user, or optimized depending on block size
        ZSTD_CCtx_setParameter(zstd_context_.get(), ZSTD_c_windowLog, 12);
    };

    std::basic_string<uint8_t> Compress(const void* data, size_t length) const override {
        std::basic_string<uint8_t> buffer(ZSTD_compressBound(length), '\0');
        std::basic_string<uint8_t> buffer(ZSTD_compressBound(length), '\0');
            const auto compressed_size = ZSTD_compress(buffer.data(), buffer.size(), data, length,
        const auto compressed_size =
                                                       compression.compression_level);
                ZSTD_compress2(zstd_context_.get(), buffer.data(), buffer.size(), data, length);
        if (compressed_size <= 0) {
        if (compressed_size <= 0) {
            LOG(ERROR) << "ZSTD compression failed " << compressed_size;
            LOG(ERROR) << "ZSTD compression failed " << compressed_size;
            return {};
            return {};
@@ -154,24 +199,23 @@ std::basic_string<uint8_t> CompressWorker::Compress(CowCompression compression,
            buffer.resize(compressed_size);
            buffer.resize(compressed_size);
        }
        }
        return buffer;
        return buffer;
        }
    };
        default:

            LOG(ERROR) << "unhandled compression type: " << compression.algorithm;
  private:
            break;
    std::unique_ptr<ZSTD_CCtx, decltype(&ZSTD_freeCCtx)> zstd_context_;
    }
};
    return {};

}
bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
                                    std::vector<std::basic_string<uint8_t>>* compressed_data) {
                                    std::vector<std::basic_string<uint8_t>>* compressed_data) {
    return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data);
    return CompressBlocks(compressor_.get(), block_size_, buffer, num_blocks, compressed_data);
}
}


bool CompressWorker::CompressBlocks(CowCompression compression, size_t block_size,
bool CompressWorker::CompressBlocks(ICompressor* compressor, size_t block_size, const void* buffer,
                                    const void* buffer, size_t num_blocks,
                                    size_t num_blocks,
                                    std::vector<std::basic_string<uint8_t>>* compressed_data) {
                                    std::vector<std::basic_string<uint8_t>>* compressed_data) {
    const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
    const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
    while (num_blocks) {
    while (num_blocks) {
        auto data = Compress(compression, iter, block_size);
        auto data = compressor->Compress(iter, block_size);
        if (data.empty()) {
        if (data.empty()) {
            PLOG(ERROR) << "CompressBlocks: Compression failed";
            PLOG(ERROR) << "CompressBlocks: Compression failed";
            return false;
            return false;
@@ -270,6 +314,22 @@ bool CompressWorker::GetCompressedBuffers(std::vector<std::basic_string<uint8_t>
    return true;
    return true;
}
}


std::unique_ptr<ICompressor> ICompressor::Brotli(uint32_t compression_level) {
    return std::make_unique<BrotliCompressor>(compression_level);
}

std::unique_ptr<ICompressor> ICompressor::Gz(uint32_t compression_level) {
    return std::make_unique<GzCompressor>(compression_level);
}

std::unique_ptr<ICompressor> ICompressor::Lz4(uint32_t compression_level) {
    return std::make_unique<Lz4Compressor>(compression_level);
}

std::unique_ptr<ICompressor> ICompressor::Zstd(uint32_t compression_level) {
    return std::make_unique<ZstdCompressor>(compression_level);
}

void CompressWorker::Finalize() {
void CompressWorker::Finalize() {
    {
    {
        std::unique_lock<std::mutex> lock(lock_);
        std::unique_lock<std::mutex> lock(lock_);
@@ -278,8 +338,8 @@ void CompressWorker::Finalize() {
    cv_.notify_all();
    cv_.notify_all();
}
}


CompressWorker::CompressWorker(CowCompression compression, uint32_t block_size)
CompressWorker::CompressWorker(std::unique_ptr<ICompressor>&& compressor, uint32_t block_size)
    : compression_(compression), block_size_(block_size) {}
    : compressor_(std::move(compressor)), block_size_(block_size) {}


}  // namespace snapshot
}  // namespace snapshot
}  // namespace android
}  // namespace android
+2 −1
Original line number Original line Diff line number Diff line
@@ -480,7 +480,8 @@ TEST_P(CompressionTest, HorribleStream) {
    std::string expected = "The quick brown fox jumps over the lazy dog.";
    std::string expected = "The quick brown fox jumps over the lazy dog.";
    expected.resize(4096, '\0');
    expected.resize(4096, '\0');


    auto result = CompressWorker::Compress(compression, expected.data(), expected.size());
    std::unique_ptr<ICompressor> compressor = ICompressor::Create(compression);
    auto result = compressor->Compress(expected.data(), expected.size());
    ASSERT_FALSE(result.empty());
    ASSERT_FALSE(result.empty());


    HorribleStream<uint8_t> stream(result);
    HorribleStream<uint8_t> stream(result);
+12 −6
Original line number Original line Diff line number Diff line
@@ -184,7 +184,8 @@ void CowWriterV2::InitWorkers() {
        return;
        return;
    }
    }
    for (int i = 0; i < num_compress_threads_; i++) {
    for (int i = 0; i < num_compress_threads_; i++) {
        auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size);
        std::unique_ptr<ICompressor> compressor = ICompressor::Create(compression_);
        auto wt = std::make_unique<CompressWorker>(std::move(compressor), header_.block_size);
        threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
        threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
        compress_threads_.push_back(std::move(wt));
        compress_threads_.push_back(std::move(wt));
    }
    }
@@ -339,10 +340,12 @@ bool CowWriterV2::CompressBlocks(size_t num_blocks, const void* data) {
    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
    compressed_buf_.clear();
    compressed_buf_.clear();
    if (num_threads <= 1) {
    if (num_threads <= 1) {
        return CompressWorker::CompressBlocks(compression_, options_.block_size, data, num_blocks,
        if (!compressor_) {
                                              &compressed_buf_);
            compressor_ = ICompressor::Create(compression_);
        }
        return CompressWorker::CompressBlocks(compressor_.get(), options_.block_size, data,
                                              num_blocks, &compressed_buf_);
    }
    }

    // Submit the blocks per thread. The retrieval of
    // Submit the blocks per thread. The retrieval of
    // compressed buffers has to be done in the same order.
    // compressed buffers has to be done in the same order.
    // We should not poll for completed buffers in a different order as the
    // We should not poll for completed buffers in a different order as the
@@ -412,8 +415,11 @@ bool CowWriterV2::EmitBlocks(uint64_t new_block_start, const void* data, size_t
                        buf_iter_++;
                        buf_iter_++;
                        return data;
                        return data;
                    } else {
                    } else {
                        auto data =
                        if (!compressor_) {
                                CompressWorker::Compress(compression_, iter, header_.block_size);
                            compressor_ = ICompressor::Create(compression_);
                        }

                        auto data = compressor_->Compress(iter, header_.block_size);
                        return data;
                        return data;
                    }
                    }
                }();
                }();
Loading