Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 743c4cdb authored by Akilesh Kailash's avatar Akilesh Kailash Committed by Gerrit Code Review
Browse files

Merge "snapuserd: I/O request on overlapping blocks during snapshot-merge." into main

parents 7cf712ab cffa413d
Loading
Loading
Loading
Loading
+12 −1
Original line number Original line Diff line number Diff line
@@ -173,6 +173,10 @@ bool SnapshotHandler::ReadMetadata() {
    }
    }


    SNAP_LOG(INFO) << "Merge-ops: " << header.num_merge_ops;
    SNAP_LOG(INFO) << "Merge-ops: " << header.num_merge_ops;
    if (header.num_merge_ops) {
        resume_merge_ = true;
        SNAP_LOG(INFO) << "Resume Snapshot-merge";
    }


    if (!MmapMetadata()) {
    if (!MmapMetadata()) {
        SNAP_LOG(ERROR) << "mmap failed";
        SNAP_LOG(ERROR) << "mmap failed";
@@ -295,6 +299,11 @@ bool SnapshotHandler::Start() {
    if (ra_thread_) {
    if (ra_thread_) {
        ra_thread_status =
        ra_thread_status =
                std::async(std::launch::async, &ReadAhead::RunThread, read_ahead_thread_.get());
                std::async(std::launch::async, &ReadAhead::RunThread, read_ahead_thread_.get());
        // If this is a merge-resume path, wait until RA thread is fully up as
        // the data has to be re-constructed from the scratch space.
        if (resume_merge_ && ShouldReconstructDataFromCow()) {
            WaitForRaThreadToStart();
        }
    }
    }


    // Launch worker threads
    // Launch worker threads
@@ -307,7 +316,9 @@ bool SnapshotHandler::Start() {
            std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get());
            std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get());


    // Now that the worker threads are up, scan the partitions.
    // Now that the worker threads are up, scan the partitions.
    if (perform_verification_) {
    // If the snapshot-merge is being resumed, there is no need to scan as the
    // current slot is already marked as boot complete.
    if (perform_verification_ && !resume_merge_) {
        update_verify_->VerifyUpdatePartition();
        update_verify_->VerifyUpdatePartition();
    }
    }


+4 −0
Original line number Original line Diff line number Diff line
@@ -147,6 +147,8 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
    void WakeupMonitorMergeThread();
    void WakeupMonitorMergeThread();
    void WaitForMergeComplete();
    void WaitForMergeComplete();
    bool WaitForMergeBegin();
    bool WaitForMergeBegin();
    void RaThreadStarted();
    void WaitForRaThreadToStart();
    void NotifyRAForMergeReady();
    void NotifyRAForMergeReady();
    bool WaitForMergeReady();
    bool WaitForMergeReady();
    void MergeFailed();
    void MergeFailed();
@@ -221,6 +223,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
    // Read-ahead related
    // Read-ahead related
    bool populate_data_from_cow_ = false;
    bool populate_data_from_cow_ = false;
    bool ra_thread_ = false;
    bool ra_thread_ = false;
    bool ra_thread_started_ = false;
    int total_ra_blocks_merged_ = 0;
    int total_ra_blocks_merged_ = 0;
    MERGE_IO_TRANSITION io_state_ = MERGE_IO_TRANSITION::INVALID;
    MERGE_IO_TRANSITION io_state_ = MERGE_IO_TRANSITION::INVALID;
    std::unique_ptr<ReadAhead> read_ahead_thread_;
    std::unique_ptr<ReadAhead> read_ahead_thread_;
@@ -242,6 +245,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
    bool scratch_space_ = false;
    bool scratch_space_ = false;
    int num_worker_threads_ = kNumWorkerThreads;
    int num_worker_threads_ = kNumWorkerThreads;
    bool perform_verification_ = true;
    bool perform_verification_ = true;
    bool resume_merge_ = false;


    std::unique_ptr<struct io_uring> ring_;
    std::unique_ptr<struct io_uring> ring_;
    std::unique_ptr<UpdateVerify> update_verify_;
    std::unique_ptr<UpdateVerify> update_verify_;
+7 −2
Original line number Original line Diff line number Diff line
@@ -206,6 +206,7 @@ bool ReadAhead::ReconstructDataFromCow() {
        return false;
        return false;
    }
    }


    snapuserd_->RaThreadStarted();
    SNAP_LOG(INFO) << "ReconstructDataFromCow success";
    SNAP_LOG(INFO) << "ReconstructDataFromCow success";
    notify_read_ahead_failed.Cancel();
    notify_read_ahead_failed.Cancel();
    return true;
    return true;
@@ -716,9 +717,13 @@ bool ReadAhead::ReadAheadIOStart() {
    total_ra_blocks_completed_ += total_blocks_merged_;
    total_ra_blocks_completed_ += total_blocks_merged_;
    snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_);
    snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_);


    // Flush the data only if we have a overlapping blocks in the region
    // Flush the scratch data - Technically, we should flush only for overlapping
    // blocks; However, since this region is mmap'ed, the dirty pages can still
    // get flushed to disk at any random point in time. Instead, make sure
    // the data in scratch is in the correct state before merge thread resumes.
    //
    // Notify the Merge thread to resume merging this window
    // Notify the Merge thread to resume merging this window
    if (!snapuserd_->ReadAheadIOCompleted(overlap_)) {
    if (!snapuserd_->ReadAheadIOCompleted(true)) {
        SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
        SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
        snapuserd_->ReadAheadIOFailed();
        snapuserd_->ReadAheadIOFailed();
        return false;
        return false;
+95 −0
Original line number Original line Diff line number Diff line
@@ -235,9 +235,11 @@ class SnapuserdTest : public SnapuserdTestBase {
    bool Merge();
    bool Merge();
    void ValidateMerge();
    void ValidateMerge();
    void ReadSnapshotDeviceAndValidate();
    void ReadSnapshotDeviceAndValidate();
    void ReadSnapshotAndValidateOverlappingBlocks();
    void Shutdown();
    void Shutdown();
    void MergeInterrupt();
    void MergeInterrupt();
    void MergeInterruptFixed(int duration);
    void MergeInterruptFixed(int duration);
    void MergeInterruptAndValidate(int duration);
    void MergeInterruptRandomly(int max_duration);
    void MergeInterruptRandomly(int max_duration);
    bool StartMerge();
    bool StartMerge();
    void CheckMergeCompletion();
    void CheckMergeCompletion();
@@ -358,6 +360,76 @@ void SnapuserdTest::ReadSnapshotDeviceAndValidate() {
    ASSERT_EQ(memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + (size_ * 4), size_), 0);
    ASSERT_EQ(memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + (size_ * 4), size_), 0);
}
}


void SnapuserdTest::ReadSnapshotAndValidateOverlappingBlocks() {
    // Open COW device
    unique_fd fd(open(cow_system_->path, O_RDONLY));
    ASSERT_GE(fd, 0);

    CowReader reader;
    ASSERT_TRUE(reader.Parse(fd));

    const auto& header = reader.GetHeader();
    size_t total_mapped_addr_length = header.prefix.header_size + BUFFER_REGION_DEFAULT_SIZE;

    ASSERT_GE(header.prefix.major_version, 2);

    void* mapped_addr = mmap(NULL, total_mapped_addr_length, PROT_READ, MAP_SHARED, fd.get(), 0);
    ASSERT_NE(mapped_addr, MAP_FAILED);

    bool populate_data_from_scratch = false;
    struct BufferState* ra_state =
            reinterpret_cast<struct BufferState*>((char*)mapped_addr + header.prefix.header_size);
    if (ra_state->read_ahead_state == kCowReadAheadDone) {
        populate_data_from_scratch = true;
    }

    size_t num_merge_ops = header.num_merge_ops;
    // We have some partial merge operations completed.
    // To test the merge-resume path, forcefully corrupt the data of the base
    // device for the offsets where the merge is still pending.
    if (num_merge_ops && populate_data_from_scratch) {
        std::string corrupt_buffer(4096, 0);
        // Corrupt two blocks from the point where the merge has to be resumed by
        // writing down zeroe's.
        //
        // Now, since this is a merge-resume path, the "correct" data should be
        // in the scratch space of the COW device. When there is an I/O request
        // from the snapshot device, the data has to be retrieved from the
        // scratch space. If not and I/O is routed to the base device, we
        // may end up with corruption.
        off_t corrupt_offset = (num_merge_ops + 2) * 4096;

        if (corrupt_offset < size_) {
            ASSERT_EQ(android::base::WriteFullyAtOffset(base_fd_, (void*)corrupt_buffer.c_str(),
                                                        4096, corrupt_offset),
                      true);
            corrupt_offset -= 4096;
            ASSERT_EQ(android::base::WriteFullyAtOffset(base_fd_, (void*)corrupt_buffer.c_str(),
                                                        4096, corrupt_offset),
                      true);
            fsync(base_fd_.get());
        }
    }

    // Time to read the snapshot device.
    unique_fd snapshot_fd(open(dmuser_dev_->GetPath().c_str(), O_RDONLY | O_DIRECT | O_SYNC));
    ASSERT_GE(snapshot_fd, 0);

    void* buff_addr;
    ASSERT_EQ(posix_memalign(&buff_addr, 4096, size_), 0);

    std::unique_ptr<void, decltype(&::free)> snapshot_buffer(buff_addr, ::free);

    // Scan the entire snapshot device and read the data and verify data
    // integrity. Since the base device was forcefully corrupted, the data from
    // this scan should be retrieved from scratch space of the COW partition.
    //
    // Furthermore, after the merge is complete, base device data is again
    // verified as the aforementioned corrupted blocks aren't persisted.
    ASSERT_EQ(ReadFullyAtOffset(snapshot_fd, snapshot_buffer.get(), size_, 0), true);
    ASSERT_EQ(memcmp(snapshot_buffer.get(), orig_buffer_.get(), size_), 0);
}

void SnapuserdTest::CreateCowDeviceWithCopyOverlap_2() {
void SnapuserdTest::CreateCowDeviceWithCopyOverlap_2() {
    auto writer = CreateCowDeviceInternal();
    auto writer = CreateCowDeviceInternal();
    ASSERT_NE(writer, nullptr);
    ASSERT_NE(writer, nullptr);
@@ -665,6 +737,20 @@ void SnapuserdTest::MergeInterruptFixed(int duration) {
    ASSERT_TRUE(Merge());
    ASSERT_TRUE(Merge());
}
}


void SnapuserdTest::MergeInterruptAndValidate(int duration) {
    ASSERT_TRUE(StartMerge());

    for (int i = 0; i < 15; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(duration));
        ASSERT_NO_FATAL_FAILURE(SimulateDaemonRestart());
        ReadSnapshotAndValidateOverlappingBlocks();
        ASSERT_TRUE(StartMerge());
    }

    ASSERT_NO_FATAL_FAILURE(SimulateDaemonRestart());
    ASSERT_TRUE(Merge());
}

void SnapuserdTest::MergeInterrupt() {
void SnapuserdTest::MergeInterrupt() {
    // Interrupt merge at various intervals
    // Interrupt merge at various intervals
    ASSERT_TRUE(StartMerge());
    ASSERT_TRUE(StartMerge());
@@ -761,6 +847,15 @@ TEST_F(SnapuserdTest, Snapshot_COPY_Overlap_Merge_Resume_TEST) {
    ValidateMerge();
    ValidateMerge();
}
}


TEST_F(SnapuserdTest, Snapshot_COPY_Overlap_Merge_Resume_IO_Validate_TEST) {
    if (!harness_->HasUserDevice()) {
        GTEST_SKIP() << "Skipping snapshot read; not supported";
    }
    ASSERT_NO_FATAL_FAILURE(SetupCopyOverlap_2());
    ASSERT_NO_FATAL_FAILURE(MergeInterruptAndValidate(2));
    ValidateMerge();
}

TEST_F(SnapuserdTest, Snapshot_Merge_Crash_Fixed_Ordered) {
TEST_F(SnapuserdTest, Snapshot_Merge_Crash_Fixed_Ordered) {
    ASSERT_NO_FATAL_FAILURE(SetupOrderedOps());
    ASSERT_NO_FATAL_FAILURE(SetupOrderedOps());
    ASSERT_NO_FATAL_FAILURE(MergeInterruptFixed(300));
    ASSERT_NO_FATAL_FAILURE(MergeInterruptFixed(300));
+27 −1
Original line number Original line Diff line number Diff line
@@ -366,6 +366,26 @@ void SnapshotHandler::WaitForMergeComplete() {
    }
    }
}
}


void SnapshotHandler::RaThreadStarted() {
    std::unique_lock<std::mutex> lock(lock_);
    ra_thread_started_ = true;
}

void SnapshotHandler::WaitForRaThreadToStart() {
    auto now = std::chrono::system_clock::now();
    auto deadline = now + 3s;
    {
        std::unique_lock<std::mutex> lock(lock_);
        while (!ra_thread_started_) {
            auto status = cv.wait_until(lock, deadline);
            if (status == std::cv_status::timeout) {
                SNAP_LOG(ERROR) << "Read-ahead thread did not start";
                return;
            }
        }
    }
}

std::string SnapshotHandler::GetMergeStatus() {
std::string SnapshotHandler::GetMergeStatus() {
    bool merge_not_initiated = false;
    bool merge_not_initiated = false;
    bool merge_monitored = false;
    bool merge_monitored = false;
@@ -618,7 +638,6 @@ bool SnapshotHandler::GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t b
    std::unordered_map<uint64_t, void*>::iterator it = read_ahead_buffer_map_.find(block);
    std::unordered_map<uint64_t, void*>::iterator it = read_ahead_buffer_map_.find(block);


    if (it == read_ahead_buffer_map_.end()) {
    if (it == read_ahead_buffer_map_.end()) {
        SNAP_LOG(ERROR) << "Block: " << block << " not found in RA buffer";
        return false;
        return false;
    }
    }


@@ -642,6 +661,13 @@ MERGE_GROUP_STATE SnapshotHandler::ProcessMergingBlock(uint64_t new_block, void*
        MERGE_GROUP_STATE state = blk_state->merge_state_;
        MERGE_GROUP_STATE state = blk_state->merge_state_;
        switch (state) {
        switch (state) {
            case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
            case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
                // If this is a merge-resume path, check if the data is
                // available from scratch space. Data from scratch space takes
                // higher precedence than from source device for overlapping
                // blocks.
                if (resume_merge_ && GetRABuffer(&lock, new_block, buffer)) {
                    return (MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS);
                }
                blk_state->num_ios_in_progress += 1;  // ref count
                blk_state->num_ios_in_progress += 1;  // ref count
                [[fallthrough]];
                [[fallthrough]];
            }
            }