Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 35960465 authored by Akilesh Kailash's avatar Akilesh Kailash
Browse files

snapuserd: Use io_uring api's for snapshot merge



using io_uring READ/WRITE opcodes for snapshot merge.
Specifically, it is used only for readahead and ordered ops
code path.

Snapshot merge perf:

===========================================================

Incremental OTA of 300M between two git_master branches on Pixel 6:

===========================================================

On Android S (with dm-snapshot): ~15 minutes:

update_engine: [INFO:cleanup_previous_update_action.cc(330)] Merge finished with state MergeCompleted.
update_engine: [INFO:cleanup_previous_update_action.cc(130)] Stopping/suspending/completing CleanupPreviousUpdateAction
update_engine: [INFO:cleanup_previous_update_action.cc(501)] Reporting merge stats: MergeCompleted in 926508ms (resumed 0 times), using 0 bytes of COW image.

===========================================================

On Android T (with io_uring: ~38 seconds):

update_engine: [INFO:cleanup_previous_update_action.cc(330)] Merge finished with state MergeCompleted.
update_engine: [INFO:cleanup_previous_update_action.cc(130)] Stopping/suspending/completing CleanupPreviousUpdateAction
update_engine: [INFO:cleanup_previous_update_action.cc(501)] Reporting merge stats: MergeCompleted in 38868ms (resumed 0 times), using 0 bytes of COW image.

===========================================================

Bug: 202784286
Test: Full/Incremental OTA
Signed-off-by: default avatarAkilesh Kailash <akailash@google.com>
Change-Id: I24ed3ae16730d0a18be0350c162dc67e1a7b74e1
parent e060580c
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -86,7 +86,9 @@ cc_defaults {
        "libsnapshot_cow",
        "libz",
        "libext4_utils",
        "liburing",
    ],
    include_dirs: ["bionic/libc/kernel"],
}

cc_binary {
@@ -182,7 +184,9 @@ cc_test {
        "libfs_mgr",
        "libdm",
        "libext4_utils",
        "liburing",
    ],
    include_dirs: ["bionic/libc/kernel"],
    header_libs: [
        "libstorage_literals_headers",
        "libfiemap_headers",
+26 −0
Original line number Diff line number Diff line
@@ -16,6 +16,9 @@

#include "snapuserd_core.h"

#include <sys/utsname.h>

#include <android-base/properties.h>
#include <android-base/strings.h>

namespace android {
@@ -513,5 +516,28 @@ struct BufferState* SnapshotHandler::GetBufferState() {
    return ra_state;
}

bool SnapshotHandler::IsIouringSupported() {
    struct utsname uts;
    unsigned int major, minor;

    if ((uname(&uts) != 0) || (sscanf(uts.release, "%u.%u", &major, &minor) != 2)) {
        SNAP_LOG(ERROR) << "Could not parse the kernel version from uname. "
                        << " io_uring not supported";
        return false;
    }

    // We will only support kernels from 5.6 onwards as IOSQE_ASYNC flag and
    // IO_URING_OP_READ/WRITE opcodes were introduced only on 5.6 kernel
    if (major >= 5) {
        if (major == 5 && minor < 6) {
            return false;
        }
    } else {
        return false;
    }

    return android::base::GetBoolProperty("ro.virtual_ab.io_uring.enabled", false);
}

}  // namespace snapshot
}  // namespace android
+40 −0
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@
#include <libdm/dm.h>
#include <libsnapshot/cow_reader.h>
#include <libsnapshot/cow_writer.h>
#include <liburing.h>
#include <snapuserd/snapuserd_buffer.h>
#include <snapuserd/snapuserd_kernel.h>

@@ -113,6 +114,19 @@ class ReadAhead {
    bool ReconstructDataFromCow();
    void CheckOverlap(const CowOperation* cow_op);

    bool ReadAheadAsyncIO();
    bool ReapIoCompletions(int pending_ios_to_complete);
    bool ReadXorData(size_t block_index, size_t xor_op_index,
                     std::vector<const CowOperation*>& xor_op_vec);
    void ProcessXorData(size_t& block_xor_index, size_t& xor_index,
                        std::vector<const CowOperation*>& xor_op_vec, void* buffer,
                        loff_t& buffer_offset);
    void UpdateScratchMetadata();

    bool ReadAheadSyncIO();
    bool InitializeIouring();
    void FinalizeIouring();

    void* read_ahead_buffer_;
    void* metadata_buffer_;

@@ -131,7 +145,19 @@ class ReadAhead {
    std::unordered_set<uint64_t> dest_blocks_;
    std::unordered_set<uint64_t> source_blocks_;
    bool overlap_;
    std::vector<uint64_t> blocks_;
    int total_blocks_merged_ = 0;
    std::unique_ptr<uint8_t[]> ra_temp_buffer_;
    std::unique_ptr<uint8_t[]> ra_temp_meta_buffer_;
    BufferSink bufsink_;

    bool read_ahead_async_ = false;
    // Queue depth of 32 seems optimal. We don't want
    // to have a huge depth as it may put more memory pressure
    // on the kernel worker threads given that we use
    // IOSQE_ASYNC flag.
    int queue_depth_ = 32;
    std::unique_ptr<struct io_uring> ring_;
};

class Worker {
@@ -185,6 +211,7 @@ class Worker {
    // Merge related ops
    bool Merge();
    bool MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
    bool MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter);
    bool MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
    int PrepareMerge(uint64_t* source_offset, int* pending_ops,
                     const std::unique_ptr<ICowOpIter>& cowop_iter,
@@ -193,6 +220,9 @@ class Worker {
    sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
    chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }

    bool InitializeIouring();
    void FinalizeIouring();

    std::unique_ptr<CowReader> reader_;
    BufferSink bufsink_;
    XorSink xorsink_;
@@ -208,6 +238,14 @@ class Worker {
    unique_fd base_path_merge_fd_;
    unique_fd ctrl_fd_;

    bool merge_async_ = false;
    // Queue depth of 32 seems optimal. We don't want
    // to have a huge depth as it may put more memory pressure
    // on the kernel worker threads given that we use
    // IOSQE_ASYNC flag.
    int queue_depth_ = 32;
    std::unique_ptr<struct io_uring> ring_;

    std::shared_ptr<SnapshotHandler> snapuserd_;
};

@@ -292,6 +330,8 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
    bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
    MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer);

    bool IsIouringSupported();

  private:
    bool ReadMetadata();
    sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
+217 −14
Original line number Diff line number Diff line
@@ -72,16 +72,16 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
}

bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
    // Flush every 2048 ops. Since all ops are independent and there is no
    // Flush every 8192 ops. Since all ops are independent and there is no
    // dependency between COW ops, we will flush the data and the number
    // of ops merged in COW file for every 2048 ops. If there is a crash,
    // of ops merged in COW file for every 8192 ops. If there is a crash,
    // we will end up replaying some of the COW ops which were already merged.
    // That is ok.
    //
    // Why 2048 ops ? We can probably increase this to bigger value but just
    // need to ensure that merge makes forward progress if there are
    // crashes repeatedly which is highly unlikely.
    int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 8;
    // Why 8192 ops ? Increasing this may improve merge time 3-4 seconds but
    // we need to make sure that we checkpoint; 8k ops seems optimal. In-case
    // if there is a crash merge should always make forward progress.
    int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32;
    int num_ops_merged = 0;

    while (!cowop_iter->Done()) {
@@ -128,7 +128,7 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter)

        num_ops_merged += linear_blocks;

        if (num_ops_merged == total_ops_merged_per_commit) {
        if (num_ops_merged >= total_ops_merged_per_commit) {
            // Flush the data
            if (fsync(base_path_merge_fd_.get()) < 0) {
                SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
@@ -172,6 +172,173 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter)
    return true;
}

bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) {
    void* mapped_addr = snapuserd_->GetMappedAddr();
    void* read_ahead_buffer =
            static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
    size_t block_index = 0;

    SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";

    while (!cowop_iter->Done()) {
        const CowOperation* cow_op = &cowop_iter->Get();
        if (!IsOrderedOp(*cow_op)) {
            break;
        }

        SNAP_LOG(DEBUG) << "Waiting for merge begin...";
        // Wait for RA thread to notify that the merge window
        // is ready for merging.
        if (!snapuserd_->WaitForMergeBegin()) {
            snapuserd_->SetMergeFailed(block_index);
            return false;
        }

        snapuserd_->SetMergeInProgress(block_index);

        loff_t offset = 0;
        int num_ops = snapuserd_->GetTotalBlocksToMerge();

        int pending_sqe = queue_depth_;
        int pending_ios_to_submit = 0;
        bool flush_required = false;

        SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
        while (num_ops) {
            uint64_t source_offset;

            int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter);

            if (linear_blocks != 0) {
                size_t io_size = (linear_blocks * BLOCK_SZ);

                // Get an SQE entry from the ring and populate the I/O variables
                struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
                if (!sqe) {
                    SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
                    snapuserd_->SetMergeFailed(block_index);
                    return false;
                }

                io_uring_prep_write(sqe, base_path_merge_fd_.get(),
                                    (char*)read_ahead_buffer + offset, io_size, source_offset);

                offset += io_size;
                num_ops -= linear_blocks;

                pending_sqe -= 1;
                pending_ios_to_submit += 1;
                sqe->flags |= IOSQE_ASYNC;
            }

            // Ring is full or no more COW ops to be merged in this batch
            if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
                // If this is a last set of COW ops to be merged in this batch, we need
                // to sync the merged data. We will try to grab an SQE entry
                // and set the FSYNC command; additionally, make sure that
                // the fsync is done after all the I/O operations queued
                // in the ring is completed by setting IOSQE_IO_DRAIN.
                //
                // If there is no space in the ring, we will flush it later
                // by explicitly calling fsync() system call.
                if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
                    if (pending_sqe != 0) {
                        struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
                        if (!sqe) {
                            // very unlikely but let's continue and not fail the
                            // merge - we will flush it later
                            SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
                            flush_required = true;
                        } else {
                            io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0);
                            // Drain the queue before fsync
                            io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN);
                            pending_sqe -= 1;
                            flush_required = false;
                            pending_ios_to_submit += 1;
                            sqe->flags |= IOSQE_ASYNC;
                        }
                    } else {
                        flush_required = true;
                    }
                }

                // Submit the IO for all the COW ops in a single syscall
                int ret = io_uring_submit(ring_.get());
                if (ret != pending_ios_to_submit) {
                    SNAP_PLOG(ERROR)
                            << "io_uring_submit failed for read-ahead: "
                            << " io submit: " << ret << " expected: " << pending_ios_to_submit;
                    snapuserd_->SetMergeFailed(block_index);
                    return false;
                }

                int pending_ios_to_complete = pending_ios_to_submit;
                pending_ios_to_submit = 0;

                // Reap I/O completions
                while (pending_ios_to_complete) {
                    struct io_uring_cqe* cqe;

                    ret = io_uring_wait_cqe(ring_.get(), &cqe);
                    if (ret) {
                        SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
                        snapuserd_->SetMergeFailed(block_index);
                        return false;
                    }

                    if (cqe->res < 0) {
                        SNAP_LOG(ERROR)
                                << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
                        snapuserd_->SetMergeFailed(block_index);
                        return false;
                    }

                    io_uring_cqe_seen(ring_.get(), cqe);
                    pending_ios_to_complete -= 1;
                }

                pending_sqe = queue_depth_;
            }

            if (linear_blocks == 0) {
                break;
            }
        }

        // Verify all ops are merged
        CHECK(num_ops == 0);

        // Flush the data
        if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
            SNAP_LOG(ERROR) << " Failed to fsync merged data";
            snapuserd_->SetMergeFailed(block_index);
            return false;
        }

        // Merge is done and data is on disk. Update the COW Header about
        // the merge completion
        if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
            SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
            snapuserd_->SetMergeFailed(block_index);
            return false;
        }

        SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
        // Mark the block as merge complete
        snapuserd_->SetMergeCompleted(block_index);

        // Notify RA thread that the merge thread is ready to merge the next
        // window
        snapuserd_->NotifyRAForMergeReady();

        // Get the next block
        block_index += 1;
    }

    return true;
}

bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
    void* mapped_addr = snapuserd_->GetMappedAddr();
    void* read_ahead_buffer =
@@ -260,14 +427,22 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
bool Worker::Merge() {
    std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();

    if (merge_async_) {
        if (!MergeOrderedOpsAsync(cowop_iter)) {
            SNAP_LOG(ERROR) << "Merge failed for ordered ops";
            snapuserd_->MergeFailed();
            return false;
        }
        SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed.....";
    } else {
        // Start with Copy and Xor ops
        if (!MergeOrderedOps(cowop_iter)) {
            SNAP_LOG(ERROR) << "Merge failed for ordered ops";
            snapuserd_->MergeFailed();
            return false;
        }

    SNAP_LOG(INFO) << "MergeOrderedOps completed...";
        SNAP_LOG(INFO) << "MergeOrderedOps completed.....";
    }

    // Replace and Zero ops
    if (!MergeReplaceZeroOps(cowop_iter)) {
@@ -281,6 +456,31 @@ bool Worker::Merge() {
    return true;
}

bool Worker::InitializeIouring() {
    if (!snapuserd_->IsIouringSupported()) {
        return false;
    }

    ring_ = std::make_unique<struct io_uring>();

    int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
    if (ret) {
        LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret;
        return false;
    }

    merge_async_ = true;

    LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_;
    return true;
}

void Worker::FinalizeIouring() {
    if (merge_async_) {
        io_uring_queue_exit(ring_.get());
    }
}

bool Worker::RunMergeThread() {
    SNAP_LOG(DEBUG) << "Waiting for merge begin...";
    if (!snapuserd_->WaitForMergeBegin()) {
@@ -296,10 +496,13 @@ bool Worker::RunMergeThread() {
        return false;
    }

    InitializeIouring();

    if (!Merge()) {
        return false;
    }

    FinalizeIouring();
    CloseFds();
    reader_->CloseCowFd();

+375 −34

File changed.

Preview size limit exceeded, changes collapsed.