Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3991ce6c authored by Akilesh Kailash's avatar Akilesh Kailash Committed by Gerrit Code Review
Browse files

Merge changes from topic "snapuserd-async-merge"

* changes:
  Add a new property to disable io_uring and run vts and snapuserd_test
  snapuserd: Async I/O for block verification
  snapuserd: Use io_uring api's for snapshot merge
parents 91ff0285 401a46a5
Loading
Loading
Loading
Loading
+14 −0
Original line number Diff line number Diff line
@@ -54,6 +54,8 @@
#include <libsnapshot/mock_snapshot.h>

DEFINE_string(force_config, "", "Force testing mode (dmsnap, vab, vabc) ignoring device config.");
DEFINE_string(force_iouring_disable, "",
              "Force testing mode (iouring_disabled) - disable io_uring");

namespace android {
namespace snapshot {
@@ -2769,10 +2771,22 @@ int main(int argc, char** argv) {
        }
    }

    if (FLAGS_force_iouring_disable == "iouring_disabled") {
        if (!android::base::SetProperty("snapuserd.test.io_uring.force_disable", "1")) {
            return testing::AssertionFailure()
                   << "Failed to disable property: snapuserd.test.io_uring.disabled";
        }
    }

    int ret = RUN_ALL_TESTS();

    if (FLAGS_force_config == "dmsnap") {
        android::base::SetProperty("snapuserd.test.dm.snapshots", "0");
    }

    if (FLAGS_force_iouring_disable == "iouring_disabled") {
        android::base::SetProperty("snapuserd.test.io_uring.force_disable", "0");
    }

    return ret;
}
+5 −0
Original line number Diff line number Diff line
@@ -86,7 +86,9 @@ cc_defaults {
        "libsnapshot_cow",
        "libz",
        "libext4_utils",
        "liburing",
    ],
    include_dirs: ["bionic/libc/kernel"],
}

cc_binary {
@@ -182,7 +184,10 @@ cc_test {
        "libfs_mgr",
        "libdm",
        "libext4_utils",
        "liburing",
        "libgflags",
    ],
    include_dirs: ["bionic/libc/kernel"],
    header_libs: [
        "libstorage_literals_headers",
        "libfiemap_headers",
+176 −9
Original line number Diff line number Diff line
@@ -16,6 +16,10 @@

#include "snapuserd_core.h"

#include <sys/utsname.h>

#include <android-base/properties.h>
#include <android-base/scopeguard.h>
#include <android-base/strings.h>

namespace android {
@@ -288,6 +292,136 @@ bool SnapshotHandler::InitCowDevice() {
    return ReadMetadata();
}

void SnapshotHandler::FinalizeIouring() {
    io_uring_queue_exit(ring_.get());
}

bool SnapshotHandler::InitializeIouring(int io_depth) {
    ring_ = std::make_unique<struct io_uring>();

    int ret = io_uring_queue_init(io_depth, ring_.get(), 0);
    if (ret) {
        LOG(ERROR) << "io_uring_queue_init failed with ret: " << ret;
        return false;
    }

    LOG(INFO) << "io_uring_queue_init success with io_depth: " << io_depth;
    return true;
}

bool SnapshotHandler::ReadBlocksAsync(const std::string& dm_block_device,
                                      const std::string& partition_name, size_t size) {
    // 64k block size with io_depth of 64 is optimal
    // for a single thread. We just need a single thread
    // to read all the blocks from all dynamic partitions.
    size_t io_depth = 64;
    size_t bs = (64 * 1024);

    if (!InitializeIouring(io_depth)) {
        return false;
    }

    LOG(INFO) << "ReadBlockAsync start "
              << " Block-device: " << dm_block_device << " Partition-name: " << partition_name
              << " Size: " << size;

    auto scope_guard = android::base::make_scope_guard([this]() -> void { FinalizeIouring(); });

    std::vector<std::unique_ptr<struct iovec>> vecs;
    using AlignedBuf = std::unique_ptr<void, decltype(free)*>;
    std::vector<AlignedBuf> alignedBufVector;

    /*
     * TODO: We need aligned memory for DIRECT-IO. However, if we do
     * a DIRECT-IO and verify the blocks then we need to inform
     * update-verifier that block verification has been done and
     * there is no need to repeat the same. We are not there yet
     * as we need to see if there are any boot time improvements doing
     * a DIRECT-IO.
     *
     * Also, we could you the same function post merge for block verification;
     * again, we can do a DIRECT-IO instead of thrashing page-cache and
     * hurting other applications.
     *
     * For now, we will just create aligned buffers but rely on buffered
     * I/O until we have perf numbers to justify DIRECT-IO.
     */
    for (int i = 0; i < io_depth; i++) {
        auto iovec = std::make_unique<struct iovec>();
        vecs.push_back(std::move(iovec));

        struct iovec* iovec_ptr = vecs[i].get();

        if (posix_memalign(&iovec_ptr->iov_base, BLOCK_SZ, bs)) {
            LOG(ERROR) << "posix_memalign failed";
            return false;
        }

        iovec_ptr->iov_len = bs;
        alignedBufVector.push_back(
                std::unique_ptr<void, decltype(free)*>(iovec_ptr->iov_base, free));
    }

    android::base::unique_fd fd(TEMP_FAILURE_RETRY(open(dm_block_device.c_str(), O_RDONLY)));
    if (fd.get() == -1) {
        SNAP_PLOG(ERROR) << "File open failed - block-device " << dm_block_device
                         << " partition-name: " << partition_name;
        return false;
    }

    loff_t offset = 0;
    size_t remain = size;
    size_t read_sz = io_depth * bs;

    while (remain > 0) {
        size_t to_read = std::min(remain, read_sz);
        size_t queue_size = to_read / bs;

        for (int i = 0; i < queue_size; i++) {
            struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
            if (!sqe) {
                SNAP_LOG(ERROR) << "io_uring_get_sqe() failed";
                return false;
            }

            struct iovec* iovec_ptr = vecs[i].get();

            io_uring_prep_read(sqe, fd.get(), iovec_ptr->iov_base, iovec_ptr->iov_len, offset);
            sqe->flags |= IOSQE_ASYNC;
            offset += bs;
        }

        int ret = io_uring_submit(ring_.get());
        if (ret != queue_size) {
            SNAP_LOG(ERROR) << "submit got: " << ret << " wanted: " << queue_size;
            return false;
        }

        for (int i = 0; i < queue_size; i++) {
            struct io_uring_cqe* cqe;

            int ret = io_uring_wait_cqe(ring_.get(), &cqe);
            if (ret) {
                SNAP_PLOG(ERROR) << "wait_cqe failed" << ret;
                return false;
            }

            if (cqe->res < 0) {
                SNAP_LOG(ERROR) << "io failed with res: " << cqe->res;
                return false;
            }
            io_uring_cqe_seen(ring_.get(), cqe);
        }

        remain -= to_read;
    }

    LOG(INFO) << "ReadBlockAsync complete: "
              << " Block-device: " << dm_block_device << " Partition-name: " << partition_name
              << " Size: " << size;
    return true;
}

void SnapshotHandler::ReadBlocksToCache(const std::string& dm_block_device,
                                        const std::string& partition_name, off_t offset,
                                        size_t size) {
@@ -344,6 +478,10 @@ void SnapshotHandler::ReadBlocks(const std::string partition_name,
        return;
    }

    if (IsIouringSupported()) {
        std::async(std::launch::async, &SnapshotHandler::ReadBlocksAsync, this, dm_block_device,
                   partition_name, dev_sz);
    } else {
        int num_threads = 2;
        size_t num_blocks = dev_sz >> BLOCK_SHIFT;
        size_t num_blocks_per_thread = num_blocks / num_threads;
@@ -351,12 +489,13 @@ void SnapshotHandler::ReadBlocks(const std::string partition_name,
        off_t offset = 0;

        for (int i = 0; i < num_threads; i++) {
        std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device,
                   partition_name, offset, read_sz_per_thread);
            std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this,
                       dm_block_device, partition_name, offset, read_sz_per_thread);

            offset += read_sz_per_thread;
        }
    }
}

/*
 * Entry point to launch threads
@@ -513,5 +652,33 @@ struct BufferState* SnapshotHandler::GetBufferState() {
    return ra_state;
}

bool SnapshotHandler::IsIouringSupported() {
    struct utsname uts;
    unsigned int major, minor;

    if (android::base::GetBoolProperty("snapuserd.test.io_uring.force_disable", false)) {
        SNAP_LOG(INFO) << "io_uring disabled for testing";
        return false;
    }

    if ((uname(&uts) != 0) || (sscanf(uts.release, "%u.%u", &major, &minor) != 2)) {
        SNAP_LOG(ERROR) << "Could not parse the kernel version from uname. "
                        << " io_uring not supported";
        return false;
    }

    // We will only support kernels from 5.6 onwards as IOSQE_ASYNC flag and
    // IO_URING_OP_READ/WRITE opcodes were introduced only on 5.6 kernel
    if (major >= 5) {
        if (major == 5 && minor < 6) {
            return false;
        }
    } else {
        return false;
    }

    return android::base::GetBoolProperty("ro.virtual_ab.io_uring.enabled", false);
}

}  // namespace snapshot
}  // namespace android
+47 −0
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@
#include <libdm/dm.h>
#include <libsnapshot/cow_reader.h>
#include <libsnapshot/cow_writer.h>
#include <liburing.h>
#include <snapuserd/snapuserd_buffer.h>
#include <snapuserd/snapuserd_kernel.h>

@@ -113,6 +114,19 @@ class ReadAhead {
    bool ReconstructDataFromCow();
    void CheckOverlap(const CowOperation* cow_op);

    bool ReadAheadAsyncIO();
    bool ReapIoCompletions(int pending_ios_to_complete);
    bool ReadXorData(size_t block_index, size_t xor_op_index,
                     std::vector<const CowOperation*>& xor_op_vec);
    void ProcessXorData(size_t& block_xor_index, size_t& xor_index,
                        std::vector<const CowOperation*>& xor_op_vec, void* buffer,
                        loff_t& buffer_offset);
    void UpdateScratchMetadata();

    bool ReadAheadSyncIO();
    bool InitializeIouring();
    void FinalizeIouring();

    void* read_ahead_buffer_;
    void* metadata_buffer_;

@@ -131,7 +145,19 @@ class ReadAhead {
    std::unordered_set<uint64_t> dest_blocks_;
    std::unordered_set<uint64_t> source_blocks_;
    bool overlap_;
    std::vector<uint64_t> blocks_;
    int total_blocks_merged_ = 0;
    std::unique_ptr<uint8_t[]> ra_temp_buffer_;
    std::unique_ptr<uint8_t[]> ra_temp_meta_buffer_;
    BufferSink bufsink_;

    bool read_ahead_async_ = false;
    // Queue depth of 32 seems optimal. We don't want
    // to have a huge depth as it may put more memory pressure
    // on the kernel worker threads given that we use
    // IOSQE_ASYNC flag.
    int queue_depth_ = 32;
    std::unique_ptr<struct io_uring> ring_;
};

class Worker {
@@ -185,6 +211,7 @@ class Worker {
    // Merge related ops
    bool Merge();
    bool MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
    bool MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter);
    bool MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
    int PrepareMerge(uint64_t* source_offset, int* pending_ops,
                     const std::unique_ptr<ICowOpIter>& cowop_iter,
@@ -193,6 +220,9 @@ class Worker {
    sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
    chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }

    bool InitializeIouring();
    void FinalizeIouring();

    std::unique_ptr<CowReader> reader_;
    BufferSink bufsink_;
    XorSink xorsink_;
@@ -208,6 +238,14 @@ class Worker {
    unique_fd base_path_merge_fd_;
    unique_fd ctrl_fd_;

    bool merge_async_ = false;
    // Queue depth of 32 seems optimal. We don't want
    // to have a huge depth as it may put more memory pressure
    // on the kernel worker threads given that we use
    // IOSQE_ASYNC flag.
    int queue_depth_ = 32;
    std::unique_ptr<struct io_uring> ring_;

    std::shared_ptr<SnapshotHandler> snapuserd_;
};

@@ -292,6 +330,8 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
    bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
    MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer);

    bool IsIouringSupported();

  private:
    bool ReadMetadata();
    sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
@@ -304,6 +344,11 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
    void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name,
                           off_t offset, size_t size);

    bool InitializeIouring(int io_depth);
    void FinalizeIouring();
    bool ReadBlocksAsync(const std::string& dm_block_device, const std::string& partition_name,
                         size_t size);

    // COW device
    std::string cow_device_;
    // Source device
@@ -352,6 +397,8 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
    bool attached_ = false;
    bool is_socket_present_;
    bool scratch_space_ = false;

    std::unique_ptr<struct io_uring> ring_;
};

}  // namespace snapshot
+217 −14
Original line number Diff line number Diff line
@@ -72,16 +72,16 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
}

bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
    // Flush every 2048 ops. Since all ops are independent and there is no
    // Flush every 8192 ops. Since all ops are independent and there is no
    // dependency between COW ops, we will flush the data and the number
    // of ops merged in COW file for every 2048 ops. If there is a crash,
    // of ops merged in COW file for every 8192 ops. If there is a crash,
    // we will end up replaying some of the COW ops which were already merged.
    // That is ok.
    //
    // Why 2048 ops ? We can probably increase this to bigger value but just
    // need to ensure that merge makes forward progress if there are
    // crashes repeatedly which is highly unlikely.
    int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 8;
    // Why 8192 ops ? Increasing this may improve merge time 3-4 seconds but
    // we need to make sure that we checkpoint; 8k ops seems optimal. In-case
    // if there is a crash merge should always make forward progress.
    int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32;
    int num_ops_merged = 0;

    while (!cowop_iter->Done()) {
@@ -128,7 +128,7 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter)

        num_ops_merged += linear_blocks;

        if (num_ops_merged == total_ops_merged_per_commit) {
        if (num_ops_merged >= total_ops_merged_per_commit) {
            // Flush the data
            if (fsync(base_path_merge_fd_.get()) < 0) {
                SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
@@ -172,6 +172,173 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter)
    return true;
}

bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) {
    void* mapped_addr = snapuserd_->GetMappedAddr();
    void* read_ahead_buffer =
            static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
    size_t block_index = 0;

    SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";

    while (!cowop_iter->Done()) {
        const CowOperation* cow_op = &cowop_iter->Get();
        if (!IsOrderedOp(*cow_op)) {
            break;
        }

        SNAP_LOG(DEBUG) << "Waiting for merge begin...";
        // Wait for RA thread to notify that the merge window
        // is ready for merging.
        if (!snapuserd_->WaitForMergeBegin()) {
            snapuserd_->SetMergeFailed(block_index);
            return false;
        }

        snapuserd_->SetMergeInProgress(block_index);

        loff_t offset = 0;
        int num_ops = snapuserd_->GetTotalBlocksToMerge();

        int pending_sqe = queue_depth_;
        int pending_ios_to_submit = 0;
        bool flush_required = false;

        SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
        while (num_ops) {
            uint64_t source_offset;

            int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter);

            if (linear_blocks != 0) {
                size_t io_size = (linear_blocks * BLOCK_SZ);

                // Get an SQE entry from the ring and populate the I/O variables
                struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
                if (!sqe) {
                    SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
                    snapuserd_->SetMergeFailed(block_index);
                    return false;
                }

                io_uring_prep_write(sqe, base_path_merge_fd_.get(),
                                    (char*)read_ahead_buffer + offset, io_size, source_offset);

                offset += io_size;
                num_ops -= linear_blocks;

                pending_sqe -= 1;
                pending_ios_to_submit += 1;
                sqe->flags |= IOSQE_ASYNC;
            }

            // Ring is full or no more COW ops to be merged in this batch
            if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
                // If this is a last set of COW ops to be merged in this batch, we need
                // to sync the merged data. We will try to grab an SQE entry
                // and set the FSYNC command; additionally, make sure that
                // the fsync is done after all the I/O operations queued
                // in the ring is completed by setting IOSQE_IO_DRAIN.
                //
                // If there is no space in the ring, we will flush it later
                // by explicitly calling fsync() system call.
                if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
                    if (pending_sqe != 0) {
                        struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
                        if (!sqe) {
                            // very unlikely but let's continue and not fail the
                            // merge - we will flush it later
                            SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
                            flush_required = true;
                        } else {
                            io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0);
                            // Drain the queue before fsync
                            io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN);
                            pending_sqe -= 1;
                            flush_required = false;
                            pending_ios_to_submit += 1;
                            sqe->flags |= IOSQE_ASYNC;
                        }
                    } else {
                        flush_required = true;
                    }
                }

                // Submit the IO for all the COW ops in a single syscall
                int ret = io_uring_submit(ring_.get());
                if (ret != pending_ios_to_submit) {
                    SNAP_PLOG(ERROR)
                            << "io_uring_submit failed for read-ahead: "
                            << " io submit: " << ret << " expected: " << pending_ios_to_submit;
                    snapuserd_->SetMergeFailed(block_index);
                    return false;
                }

                int pending_ios_to_complete = pending_ios_to_submit;
                pending_ios_to_submit = 0;

                // Reap I/O completions
                while (pending_ios_to_complete) {
                    struct io_uring_cqe* cqe;

                    ret = io_uring_wait_cqe(ring_.get(), &cqe);
                    if (ret) {
                        SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
                        snapuserd_->SetMergeFailed(block_index);
                        return false;
                    }

                    if (cqe->res < 0) {
                        SNAP_LOG(ERROR)
                                << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
                        snapuserd_->SetMergeFailed(block_index);
                        return false;
                    }

                    io_uring_cqe_seen(ring_.get(), cqe);
                    pending_ios_to_complete -= 1;
                }

                pending_sqe = queue_depth_;
            }

            if (linear_blocks == 0) {
                break;
            }
        }

        // Verify all ops are merged
        CHECK(num_ops == 0);

        // Flush the data
        if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
            SNAP_LOG(ERROR) << " Failed to fsync merged data";
            snapuserd_->SetMergeFailed(block_index);
            return false;
        }

        // Merge is done and data is on disk. Update the COW Header about
        // the merge completion
        if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
            SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
            snapuserd_->SetMergeFailed(block_index);
            return false;
        }

        SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
        // Mark the block as merge complete
        snapuserd_->SetMergeCompleted(block_index);

        // Notify RA thread that the merge thread is ready to merge the next
        // window
        snapuserd_->NotifyRAForMergeReady();

        // Get the next block
        block_index += 1;
    }

    return true;
}

bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
    void* mapped_addr = snapuserd_->GetMappedAddr();
    void* read_ahead_buffer =
@@ -260,14 +427,22 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
bool Worker::Merge() {
    std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();

    if (merge_async_) {
        if (!MergeOrderedOpsAsync(cowop_iter)) {
            SNAP_LOG(ERROR) << "Merge failed for ordered ops";
            snapuserd_->MergeFailed();
            return false;
        }
        SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed.....";
    } else {
        // Start with Copy and Xor ops
        if (!MergeOrderedOps(cowop_iter)) {
            SNAP_LOG(ERROR) << "Merge failed for ordered ops";
            snapuserd_->MergeFailed();
            return false;
        }

    SNAP_LOG(INFO) << "MergeOrderedOps completed...";
        SNAP_LOG(INFO) << "MergeOrderedOps completed.....";
    }

    // Replace and Zero ops
    if (!MergeReplaceZeroOps(cowop_iter)) {
@@ -281,6 +456,31 @@ bool Worker::Merge() {
    return true;
}

bool Worker::InitializeIouring() {
    if (!snapuserd_->IsIouringSupported()) {
        return false;
    }

    ring_ = std::make_unique<struct io_uring>();

    int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
    if (ret) {
        LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret;
        return false;
    }

    merge_async_ = true;

    LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_;
    return true;
}

void Worker::FinalizeIouring() {
    if (merge_async_) {
        io_uring_queue_exit(ring_.get());
    }
}

bool Worker::RunMergeThread() {
    SNAP_LOG(DEBUG) << "Waiting for merge begin...";
    if (!snapuserd_->WaitForMergeBegin()) {
@@ -296,10 +496,13 @@ bool Worker::RunMergeThread() {
        return false;
    }

    InitializeIouring();

    if (!Merge()) {
        return false;
    }

    FinalizeIouring();
    CloseFds();
    reader_->CloseCowFd();

Loading