Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b1a05003 authored by Akilesh Kailash's avatar Akilesh Kailash
Browse files

libsnapshot: Use two threads to run compression



Compression is a hot function in the install path. Use
two threads for compression.

By default, number of thread is set to 1. If the property,
"ro.virtual_ab.compression.threads" is true, the number
of threads is increased to 2.

OTA install time (without post-install) on Pixel 6 Pro with 2 threads:

	  Without-this-patch       With-this-patch

Full OTA: 23 Minutes               17 Minutes

Bug: 254188450
Test: Full/Incremental OTA on Pixel
Change-Id: I4a11dca3a5ebfe11dcc7f0d882332d491f2d7933
Signed-off-by: default avatarAkilesh Kailash <akailash@google.com>
parent 67bd5b01
Loading
Loading
Loading
Loading
+50 −1
Original line number Diff line number Diff line
@@ -16,10 +16,17 @@

#include <stdint.h>

#include <condition_variable>
#include <cstdint>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <string>
#include <thread>
#include <utility>
#include <vector>

#include <android-base/unique_fd.h>
#include <libsnapshot/cow_format.h>
@@ -42,6 +49,9 @@ struct CowOptions {

    // Preset the number of merged ops. Only useful for testing.
    uint64_t num_merge_ops = 0;

    // Number of threads for compression
    int num_compress_threads = 0;
};

// Interface for writing to a snapuserd COW. All operations are ordered; merges
@@ -100,9 +110,40 @@ class ICowWriter {
    CowOptions options_;
};

class CompressWorker {
  public:
    CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size);
    bool RunThread();
    void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
    bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
    void Finalize();

  private:
    struct CompressWork {
        const void* buffer;
        size_t num_blocks;
        bool compression_status = false;
        std::vector<std::basic_string<uint8_t>> compressed_data;
    };

    CowCompressionAlgorithm compression_;
    uint32_t block_size_;

    std::queue<CompressWork> work_queue_;
    std::queue<CompressWork> compressed_queue_;
    std::mutex lock_;
    std::condition_variable cv_;
    bool stopped_ = false;

    std::basic_string<uint8_t> Compress(const void* data, size_t length);
    bool CompressBlocks(const void* buffer, size_t num_blocks,
                        std::vector<std::basic_string<uint8_t>>* compressed_data);
};

class CowWriter : public ICowWriter {
  public:
    explicit CowWriter(const CowOptions& options);
    ~CowWriter();

    // Set up the writer.
    // The file starts from the beginning.
@@ -138,6 +179,7 @@ class CowWriter : public ICowWriter {
    bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
                    uint16_t offset, uint8_t type);
    void SetupHeaders();
    void SetupWriteOptions();
    bool ParseOptions();
    bool OpenForWrite();
    bool OpenForAppend(uint64_t label);
@@ -145,9 +187,10 @@ class CowWriter : public ICowWriter {
    bool WriteRawData(const void* data, size_t size);
    bool WriteOperation(const CowOperation& op, const void* data = nullptr, size_t size = 0);
    void AddOperation(const CowOperation& op);
    std::basic_string<uint8_t> Compress(const void* data, size_t length);
    void InitPos();
    void InitWorkers();

    bool CompressBlocks(size_t num_blocks, const void* data);
    bool SetFd(android::base::borrowed_fd fd);
    bool Sync();
    bool Truncate(off_t length);
@@ -168,6 +211,12 @@ class CowWriter : public ICowWriter {
    bool merge_in_progress_ = false;
    bool is_block_device_ = false;
    uint64_t cow_image_size_ = INT64_MAX;

    int num_compress_threads_ = 1;
    std::vector<std::unique_ptr<CompressWorker>> compress_threads_;
    std::vector<std::future<bool>> threads_;
    std::vector<std::basic_string<uint8_t>> compressed_buf_;
    std::vector<std::basic_string<uint8_t>>::iterator buf_iter_;
};

}  // namespace snapshot
+115 −1
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@
namespace android {
namespace snapshot {

std::basic_string<uint8_t> CowWriter::Compress(const void* data, size_t length) {
std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
    switch (compression_) {
        case kCowCompressGz: {
            const auto bound = compressBound(length);
@@ -100,5 +100,119 @@ std::basic_string<uint8_t> CowWriter::Compress(const void* data, size_t length)
    return {};
}

bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
                                    std::vector<std::basic_string<uint8_t>>* compressed_data) {
    const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
    while (num_blocks) {
        auto data = Compress(iter, block_size_);
        if (data.empty()) {
            PLOG(ERROR) << "CompressBlocks: Compression failed";
            return false;
        }
        if (data.size() > std::numeric_limits<uint16_t>::max()) {
            LOG(ERROR) << "Compressed block is too large: " << data.size();
            return false;
        }

        compressed_data->emplace_back(std::move(data));
        num_blocks -= 1;
        iter += block_size_;
    }
    return true;
}

bool CompressWorker::RunThread() {
    while (true) {
        // Wait for work
        CompressWork blocks;
        {
            std::unique_lock<std::mutex> lock(lock_);
            while (work_queue_.empty() && !stopped_) {
                cv_.wait(lock);
            }

            if (stopped_) {
                return true;
            }

            blocks = std::move(work_queue_.front());
            work_queue_.pop();
        }

        // Compress blocks
        bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data);
        blocks.compression_status = ret;
        {
            std::lock_guard<std::mutex> lock(lock_);
            compressed_queue_.push(std::move(blocks));
        }

        // Notify completion
        cv_.notify_all();

        if (!ret) {
            LOG(ERROR) << "CompressBlocks failed";
            return false;
        }
    }

    return true;
}

void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) {
    {
        std::lock_guard<std::mutex> lock(lock_);

        CompressWork blocks = {};
        blocks.buffer = buffer;
        blocks.num_blocks = num_blocks;
        work_queue_.push(std::move(blocks));
    }
    cv_.notify_all();
}

bool CompressWorker::GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf) {
    {
        std::unique_lock<std::mutex> lock(lock_);
        while (compressed_queue_.empty() && !stopped_) {
            cv_.wait(lock);
        }

        if (stopped_) {
            return true;
        }
    }

    {
        std::lock_guard<std::mutex> lock(lock_);
        while (compressed_queue_.size() > 0) {
            CompressWork blocks = std::move(compressed_queue_.front());
            compressed_queue_.pop();

            if (blocks.compression_status) {
                compressed_buf->insert(compressed_buf->end(),
                                       std::make_move_iterator(blocks.compressed_data.begin()),
                                       std::make_move_iterator(blocks.compressed_data.end()));
            } else {
                LOG(ERROR) << "Block compression failed";
                return false;
            }
        }
    }

    return true;
}

void CompressWorker::Finalize() {
    {
        std::unique_lock<std::mutex> lock(lock_);
        stopped_ = true;
    }
    cv_.notify_all();
}

CompressWorker::CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size)
    : compression_(compression), block_size_(block_size) {}

}  // namespace snapshot
}  // namespace android
+137 −31
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@

#include <android-base/file.h>
#include <android-base/logging.h>
#include <android-base/properties.h>
#include <android-base/unique_fd.h>
#include <brotli/encode.h>
#include <libsnapshot/cow_format.h>
@@ -132,6 +133,39 @@ bool ICowWriter::ValidateNewBlock(uint64_t new_block) {

CowWriter::CowWriter(const CowOptions& options) : ICowWriter(options), fd_(-1) {
    SetupHeaders();
    SetupWriteOptions();
}

CowWriter::~CowWriter() {
    for (size_t i = 0; i < compress_threads_.size(); i++) {
        CompressWorker* worker = compress_threads_[i].get();
        if (worker) {
            worker->Finalize();
        }
    }

    bool ret = true;
    for (auto& t : threads_) {
        ret = t.get() && ret;
    }

    if (!ret) {
        LOG(ERROR) << "Compression failed";
    }
    compress_threads_.clear();
}

void CowWriter::SetupWriteOptions() {
    num_compress_threads_ = options_.num_compress_threads;

    if (!num_compress_threads_) {
        num_compress_threads_ = 1;
        // We prefer not to have more than two threads as the overhead of additional
        // threads is far greater than cutting down compression time.
        if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) {
            num_compress_threads_ = 2;
        }
    }
}

void CowWriter::SetupHeaders() {
@@ -206,6 +240,14 @@ bool CowWriter::SetFd(android::base::borrowed_fd fd) {
    return true;
}

void CowWriter::InitWorkers() {
    for (int i = 0; i < num_compress_threads_; i++) {
        auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size);
        threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
        compress_threads_.push_back(std::move(wt));
    }
}

bool CowWriter::Initialize(unique_fd&& fd) {
    owned_fd_ = std::move(fd);
    return Initialize(borrowed_fd{owned_fd_});
@@ -216,7 +258,13 @@ bool CowWriter::Initialize(borrowed_fd fd) {
        return false;
    }

    return OpenForWrite();
    bool ret = OpenForWrite();

    if (ret) {
        InitWorkers();
    }

    return ret;
}

bool CowWriter::InitializeAppend(android::base::unique_fd&& fd, uint64_t label) {
@@ -229,7 +277,13 @@ bool CowWriter::InitializeAppend(android::base::borrowed_fd fd, uint64_t label)
        return false;
    }

    return OpenForAppend(label);
    bool ret = OpenForAppend(label);

    if (ret && !compress_threads_.size()) {
        InitWorkers();
    }

    return ret;
}

void CowWriter::InitPos() {
@@ -348,11 +402,66 @@ bool CowWriter::EmitXorBlocks(uint32_t new_block_start, const void* data, size_t
    return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp);
}

bool CowWriter::CompressBlocks(size_t num_blocks, const void* data) {
    size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
    size_t num_blocks_per_thread = num_blocks / num_threads;
    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
    compressed_buf_.clear();

    // Submit the blocks per thread. The retrieval of
    // compressed buffers has to be done in the same order.
    // We should not poll for completed buffers in a different order as the
    // buffers are tightly coupled with block ordering.
    for (size_t i = 0; i < num_threads; i++) {
        CompressWorker* worker = compress_threads_[i].get();
        if (i == num_threads - 1) {
            num_blocks_per_thread = num_blocks;
        }
        worker->EnqueueCompressBlocks(iter, num_blocks_per_thread);
        iter += (num_blocks_per_thread * header_.block_size);
        num_blocks -= num_blocks_per_thread;
    }

    for (size_t i = 0; i < num_threads; i++) {
        CompressWorker* worker = compress_threads_[i].get();
        if (!worker->GetCompressedBuffers(&compressed_buf_)) {
            return false;
        }
    }

    return true;
}

bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
                           uint64_t old_block, uint16_t offset, uint8_t type) {
    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
    CHECK(!merge_in_progress_);
    for (size_t i = 0; i < size / header_.block_size; i++) {
    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);

    // Update engine can potentially send 100MB of blocks at a time. We
    // don't want to process all those blocks in one shot as it can
    // stress the memory. Hence, process the blocks in chunks.
    //
    // 1024 blocks is reasonable given we will end up using max
    // memory of ~4MB.
    const size_t kProcessingBlocks = 1024;
    size_t num_blocks = (size / header_.block_size);
    size_t i = 0;

    while (num_blocks) {
        size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));

        if (compression_) {
            if (!CompressBlocks(pending_blocks, iter)) {
                return false;
            }
            buf_iter_ = compressed_buf_.begin();
            CHECK(pending_blocks == compressed_buf_.size());
            iter += (pending_blocks * header_.block_size);
        }

        num_blocks -= pending_blocks;

        while (i < size / header_.block_size && pending_blocks) {
            CowOperation op = {};
            op.new_block = new_block_start + i;
            op.type = type;
@@ -363,32 +472,29 @@ bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t si
            }

            if (compression_) {
            auto data = Compress(iter, header_.block_size);
            if (data.empty()) {
                PLOG(ERROR) << "AddRawBlocks: compression failed";
                return false;
            }
            if (data.size() > std::numeric_limits<uint16_t>::max()) {
                LOG(ERROR) << "Compressed block is too large: " << data.size() << " bytes";
                return false;
            }
                auto data = std::move(*buf_iter_);
                op.compression = compression_;
                op.data_length = static_cast<uint16_t>(data.size());

                if (!WriteOperation(op, data.data(), data.size())) {
                PLOG(ERROR) << "AddRawBlocks: write failed, bytes requested: " << size
                            << ", bytes written: " << i * header_.block_size;
                    PLOG(ERROR) << "AddRawBlocks: write failed";
                    return false;
                }
                buf_iter_++;
            } else {
                op.data_length = static_cast<uint16_t>(header_.block_size);
                if (!WriteOperation(op, iter, header_.block_size)) {
                    PLOG(ERROR) << "AddRawBlocks: write failed";
                    return false;
                }
                iter += header_.block_size;
            }

            i += 1;
            pending_blocks -= 1;
        }

        iter += header_.block_size;
        CHECK(pending_blocks == 0);
    }
    return true;
}