Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 124492d8 authored by Kelvin Zhang's avatar Kelvin Zhang
Browse files

Do not create worker thread if threading isn't enabled

Currently, we would create worker threads even if number of compression
thread is set to 1. This works, but having context switches and data
exchange between two threads is complete overhead if main thread is just
blocking on the worker thread.

Test: th
Change-Id: I02f98ee1e0c4889dc1ae602eb06667b05796d3f0
parent 515c9243
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -120,6 +120,12 @@ class CompressWorker {
    void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
    bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
    void Finalize();
    static std::basic_string<uint8_t> Compress(CowCompressionAlgorithm compression,
                                               const void* data, size_t length);

    static bool CompressBlocks(CowCompressionAlgorithm compression, size_t block_size,
                               const void* buffer, size_t num_blocks,
                               std::vector<std::basic_string<uint8_t>>* compressed_data);

  private:
    struct CompressWork {
+15 −6
Original line number Diff line number Diff line
@@ -32,9 +32,13 @@

namespace android {
namespace snapshot {

std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
    switch (compression_) {
    return Compress(compression_, data, length);
}

std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm compression,
                                                    const void* data, size_t length) {
    switch (compression) {
        case kCowCompressGz: {
            const auto bound = compressBound(length);
            std::basic_string<uint8_t> buffer(bound, '\0');
@@ -94,17 +98,22 @@ std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t len
            return buffer;
        }
        default:
            LOG(ERROR) << "unhandled compression type: " << compression_;
            LOG(ERROR) << "unhandled compression type: " << compression;
            break;
    }
    return {};
}

bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
                                    std::vector<std::basic_string<uint8_t>>* compressed_data) {
    return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data);
}

bool CompressWorker::CompressBlocks(CowCompressionAlgorithm compression, size_t block_size,
                                    const void* buffer, size_t num_blocks,
                                    std::vector<std::basic_string<uint8_t>>* compressed_data) {
    const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
    while (num_blocks) {
        auto data = Compress(iter, block_size_);
        auto data = Compress(compression, iter, block_size);
        if (data.empty()) {
            PLOG(ERROR) << "CompressBlocks: Compression failed";
            return false;
@@ -116,7 +125,7 @@ bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,

        compressed_data->emplace_back(std::move(data));
        num_blocks -= 1;
        iter += block_size_;
        iter += block_size;
    }
    return true;
}
+21 −5
Original line number Diff line number Diff line
@@ -275,6 +275,10 @@ void CowWriter::InitBatchWrites() {
}

void CowWriter::InitWorkers() {
    if (num_compress_threads_ <= 1) {
        LOG(INFO) << "Not creating new threads for compression.";
        return;
    }
    for (int i = 0; i < num_compress_threads_; i++) {
        auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size);
        threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
@@ -447,6 +451,10 @@ bool CowWriter::CompressBlocks(size_t num_blocks, const void* data) {
    size_t num_blocks_per_thread = num_blocks / num_threads;
    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
    compressed_buf_.clear();
    if (num_threads <= 1) {
        return CompressWorker::CompressBlocks(compression_, options_.block_size, data, num_blocks,
                                              &compressed_buf_);
    }

    // Submit the blocks per thread. The retrieval of
    // compressed buffers has to be done in the same order.
@@ -490,13 +498,12 @@ bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t si
    while (num_blocks) {
        size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));

        if (compression_) {
        if (compression_ && num_compress_threads_ > 1) {
            if (!CompressBlocks(pending_blocks, iter)) {
                return false;
            }
            buf_iter_ = compressed_buf_.begin();
            CHECK(pending_blocks == compressed_buf_.size());
            iter += (pending_blocks * header_.block_size);
        }

        num_blocks -= pending_blocks;
@@ -512,7 +519,17 @@ bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t si
            }

            if (compression_) {
                auto data = [&, this]() {
                    if (num_compress_threads_ > 1) {
                        auto data = std::move(*buf_iter_);
                        buf_iter_++;
                        return data;
                    } else {
                        auto data =
                                CompressWorker::Compress(compression_, iter, header_.block_size);
                        return data;
                    }
                }();
                op.compression = compression_;
                op.data_length = static_cast<uint16_t>(data.size());

@@ -520,15 +537,14 @@ bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t si
                    PLOG(ERROR) << "AddRawBlocks: write failed";
                    return false;
                }
                buf_iter_++;
            } else {
                op.data_length = static_cast<uint16_t>(header_.block_size);
                if (!WriteOperation(op, iter, header_.block_size)) {
                    PLOG(ERROR) << "AddRawBlocks: write failed";
                    return false;
                }
                iter += header_.block_size;
            }
            iter += header_.block_size;

            i += 1;
            pending_blocks -= 1;