Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4bc81b40 authored by Akilesh Kailash's avatar Akilesh Kailash
Browse files

libsnapshot: Batch write COW operations in a cluster



COW operations are written in cluster. All the COW ops
and the COW data in this cluster are contiguous. Hence,
batch these writes and write them in one syscall.

Writes are done when the cluster is full or when
label ops are written.

OTA install time (without post-install) on Pixel 6 Pro:

          Without-this-patch        With-this-patch

Full OTA: 17 Minutes               13 Minutes

Following are the OTA install times with both the optimization.
viz - batch writes + 2 threads for compression.

OTA install (without post-install) on Pixel 6 Pro.

All numbers are in minutes.

Full ota - 2.2G

Compression   Without-this-patch     With-this-patch
=========================================================
gz            23                      13
lz4           13                       7
none          13                       7

Incremental OTA - 376M

Compression   Without-this-patch     With-this-patch
=========================================================

gz            22                     16
lz4           14                     11
none          15                     11

Bug: 254188450
Test: Full / Incremental OTA on Pixel
Change-Id: Ie3aba1ff28a6569d25a766377efab6cbe78d9277
Signed-off-by: default avatarAkilesh Kailash <akailash@google.com>
parent b1a05003
Loading
Loading
Loading
Loading
+17 −0
Original line number Diff line number Diff line
@@ -52,6 +52,9 @@ struct CowOptions {

    // Number of threads for compression
    int num_compress_threads = 0;

    // Batch write cluster ops
    bool batch_write = false;
};

// Interface for writing to a snapuserd COW. All operations are ordered; merges
@@ -188,7 +191,9 @@ class CowWriter : public ICowWriter {
    bool WriteOperation(const CowOperation& op, const void* data = nullptr, size_t size = 0);
    void AddOperation(const CowOperation& op);
    void InitPos();
    void InitBatchWrites();
    void InitWorkers();
    bool FlushCluster();

    bool CompressBlocks(size_t num_blocks, const void* data);
    bool SetFd(android::base::borrowed_fd fd);
@@ -202,8 +207,11 @@ class CowWriter : public ICowWriter {
    CowHeader header_{};
    CowFooter footer_{};
    CowCompressionAlgorithm compression_ = kCowCompressNone;
    uint64_t current_op_pos_ = 0;
    uint64_t next_op_pos_ = 0;
    uint64_t next_data_pos_ = 0;
    uint64_t current_data_pos_ = 0;
    ssize_t total_data_written_ = 0;
    uint32_t cluster_size_ = 0;
    uint32_t current_cluster_size_ = 0;
    uint64_t current_data_size_ = 0;
@@ -217,6 +225,15 @@ class CowWriter : public ICowWriter {
    std::vector<std::future<bool>> threads_;
    std::vector<std::basic_string<uint8_t>> compressed_buf_;
    std::vector<std::basic_string<uint8_t>>::iterator buf_iter_;

    std::vector<std::unique_ptr<CowOperation>> opbuffer_vec_;
    std::vector<std::unique_ptr<uint8_t[]>> databuffer_vec_;
    std::unique_ptr<struct iovec[]> cowop_vec_;
    int op_vec_index_ = 0;

    std::unique_ptr<struct iovec[]> data_vec_;
    int data_vec_index_ = 0;
    bool batch_write_ = false;
};

}  // namespace snapshot
+114 −8
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
//

#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>

#include <limits>
@@ -162,10 +163,17 @@ void CowWriter::SetupWriteOptions() {
        num_compress_threads_ = 1;
        // We prefer not to have more than two threads as the overhead of additional
        // threads is far greater than cutting down compression time.
        if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) {
        if (header_.cluster_ops &&
            android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) {
            num_compress_threads_ = 2;
        }
    }

    if (header_.cluster_ops &&
        (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
         options_.batch_write)) {
        batch_write_ = true;
    }
}

void CowWriter::SetupHeaders() {
@@ -240,12 +248,40 @@ bool CowWriter::SetFd(android::base::borrowed_fd fd) {
    return true;
}

void CowWriter::InitBatchWrites() {
    if (batch_write_) {
        cowop_vec_ = std::make_unique<struct iovec[]>(header_.cluster_ops);
        data_vec_ = std::make_unique<struct iovec[]>(header_.cluster_ops);
        struct iovec* cowop_ptr = cowop_vec_.get();
        struct iovec* data_ptr = data_vec_.get();
        for (size_t i = 0; i < header_.cluster_ops; i++) {
            std::unique_ptr<CowOperation> op = std::make_unique<CowOperation>();
            cowop_ptr[i].iov_base = op.get();
            cowop_ptr[i].iov_len = sizeof(CowOperation);
            opbuffer_vec_.push_back(std::move(op));

            std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(header_.block_size * 2);
            data_ptr[i].iov_base = buffer.get();
            data_ptr[i].iov_len = header_.block_size * 2;
            databuffer_vec_.push_back(std::move(buffer));
        }

        current_op_pos_ = next_op_pos_;
        current_data_pos_ = next_data_pos_;
    }

    std::string batch_write = batch_write_ ? "enabled" : "disabled";
    LOG(INFO) << "Batch writes: " << batch_write;
}

void CowWriter::InitWorkers() {
    for (int i = 0; i < num_compress_threads_; i++) {
        auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size);
        threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
        compress_threads_.push_back(std::move(wt));
    }

    LOG(INFO) << num_compress_threads_ << " thread used for compression";
}

bool CowWriter::Initialize(unique_fd&& fd) {
@@ -341,6 +377,7 @@ bool CowWriter::OpenForWrite() {
    }

    InitPos();
    InitBatchWrites();

    return true;
}
@@ -374,6 +411,9 @@ bool CowWriter::OpenForAppend(uint64_t label) {
        PLOG(ERROR) << "lseek failed";
        return false;
    }

    InitBatchWrites();

    return EmitClusterIfNeeded();
}

@@ -522,7 +562,7 @@ bool CowWriter::EmitLabel(uint64_t label) {
bool CowWriter::EmitSequenceData(size_t num_ops, const uint32_t* data) {
    CHECK(!merge_in_progress_);
    size_t to_add = 0;
    size_t max_ops = std::numeric_limits<uint16_t>::max() / sizeof(uint32_t);
    size_t max_ops = (header_.block_size * 2) / sizeof(uint32_t);
    while (num_ops > 0) {
        CowOperation op = {};
        op.type = kCowSequenceOp;
@@ -567,6 +607,11 @@ static void SHA256(const void*, size_t, uint8_t[]) {
}

bool CowWriter::Finalize() {
    if (!FlushCluster()) {
        LOG(ERROR) << "Finalize: FlushCluster() failed";
        return false;
    }

    auto continue_cluster_size = current_cluster_size_;
    auto continue_data_size = current_data_size_;
    auto continue_data_pos = next_data_pos_;
@@ -631,6 +676,9 @@ bool CowWriter::Finalize() {
        next_op_pos_ = continue_op_pos;
        footer_.op.num_ops = continue_num_ops;
    }

    FlushCluster();

    return Sync();
}

@@ -662,6 +710,35 @@ bool CowWriter::EnsureSpaceAvailable(const uint64_t bytes_needed) const {
    return true;
}

bool CowWriter::FlushCluster() {
    ssize_t ret;

    if (op_vec_index_) {
        ret = pwritev(fd_.get(), cowop_vec_.get(), op_vec_index_, current_op_pos_);
        if (ret != (op_vec_index_ * sizeof(CowOperation))) {
            PLOG(ERROR) << "pwritev failed for CowOperation. Expected: "
                        << (op_vec_index_ * sizeof(CowOperation));
            return false;
        }
    }

    if (data_vec_index_) {
        ret = pwritev(fd_.get(), data_vec_.get(), data_vec_index_, current_data_pos_);
        if (ret != total_data_written_) {
            PLOG(ERROR) << "pwritev failed for data. Expected: " << total_data_written_;
            return false;
        }
    }

    total_data_written_ = 0;
    op_vec_index_ = 0;
    data_vec_index_ = 0;
    current_op_pos_ = next_op_pos_;
    current_data_pos_ = next_data_pos_;

    return true;
}

bool CowWriter::WriteOperation(const CowOperation& op, const void* data, size_t size) {
    if (!EnsureSpaceAvailable(next_op_pos_ + sizeof(op))) {
        return false;
@@ -670,14 +747,43 @@ bool CowWriter::WriteOperation(const CowOperation& op, const void* data, size_t
        return false;
    }

    if (!android::base::WriteFullyAtOffset(fd_, reinterpret_cast<const uint8_t*>(&op), sizeof(op),
                                           next_op_pos_)) {
    if (batch_write_) {
        CowOperation* cow_op = reinterpret_cast<CowOperation*>(cowop_vec_[op_vec_index_].iov_base);
        std::memcpy(cow_op, &op, sizeof(CowOperation));
        op_vec_index_ += 1;

        if (data != nullptr && size > 0) {
            struct iovec* data_ptr = data_vec_.get();
            std::memcpy(data_ptr[data_vec_index_].iov_base, data, size);
            data_ptr[data_vec_index_].iov_len = size;
            data_vec_index_ += 1;
            total_data_written_ += size;
        }
    } else {
        if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
            PLOG(ERROR) << "lseek failed for writing operation.";
            return false;
        }
        if (!android::base::WriteFully(fd_, reinterpret_cast<const uint8_t*>(&op), sizeof(op))) {
            return false;
        }
        if (data != nullptr && size > 0) {
            if (!WriteRawData(data, size)) return false;
        }
    }

    AddOperation(op);

    if (batch_write_) {
        if (op_vec_index_ == header_.cluster_ops || data_vec_index_ == header_.cluster_ops ||
            op.type == kCowLabelOp || op.type == kCowClusterOp) {
            if (!FlushCluster()) {
                LOG(ERROR) << "Failed to flush cluster data";
                return false;
            }
        }
    }

    return EmitClusterIfNeeded();
}