Loading fs_mgr/libsnapshot/cow_reader.cpp +36 −0 Original line number Diff line number Diff line Loading @@ -550,6 +550,9 @@ class CowOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; void Prev() override; bool RDone() override; private: std::shared_ptr<std::vector<CowOperation>> ops_; std::vector<CowOperation>::iterator op_iter_; Loading @@ -560,6 +563,15 @@ CowOpIter::CowOpIter(std::shared_ptr<std::vector<CowOperation>>& ops) { op_iter_ = ops_->begin(); } bool CowOpIter::RDone() { return op_iter_ == ops_->begin(); } void CowOpIter::Prev() { CHECK(!RDone()); op_iter_--; } bool CowOpIter::Done() { return op_iter_ == ops_->end(); } Loading @@ -585,6 +597,9 @@ class CowRevMergeOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; void Prev() override; bool RDone() override; private: std::shared_ptr<std::vector<CowOperation>> ops_; std::shared_ptr<std::vector<uint32_t>> merge_op_blocks_; Loading @@ -603,6 +618,9 @@ class CowMergeOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; void Prev() override; bool RDone() override; private: std::shared_ptr<std::vector<CowOperation>> ops_; std::shared_ptr<std::vector<uint32_t>> merge_op_blocks_; Loading @@ -623,6 +641,15 @@ CowMergeOpIter::CowMergeOpIter(std::shared_ptr<std::vector<CowOperation>> ops, block_iter_ = merge_op_blocks->begin() + start; } bool CowMergeOpIter::RDone() { return block_iter_ == merge_op_blocks_->begin(); } void CowMergeOpIter::Prev() { CHECK(!RDone()); block_iter_--; } bool CowMergeOpIter::Done() { return block_iter_ == merge_op_blocks_->end(); } Loading @@ -649,6 +676,15 @@ CowRevMergeOpIter::CowRevMergeOpIter(std::shared_ptr<std::vector<CowOperation>> block_riter_ = merge_op_blocks->rbegin(); } bool CowRevMergeOpIter::RDone() { return block_riter_ == merge_op_blocks_->rbegin(); } void CowRevMergeOpIter::Prev() { CHECK(!RDone()); block_riter_--; } bool CowRevMergeOpIter::Done() { return block_riter_ == merge_op_blocks_->rend() - start_; } Loading fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h +7 −1 Original line number Diff line number Diff line Loading @@ -92,7 +92,7 @@ class ICowOpIter { public: virtual ~ICowOpIter() {} // True if there are more items to read, false otherwise. // True if there are no more items to read forward, false otherwise. virtual bool Done() = 0; // Read the current operation. Loading @@ -100,6 +100,12 @@ class ICowOpIter { // Advance to the next item. virtual void Next() = 0; // Advance to the previous item. virtual void Prev() = 0; // True if there are no more items to read backwards, false otherwise virtual bool RDone() = 0; }; class CowReader final : public ICowReader { Loading fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp +10 −17 Original line number Diff line number Diff line Loading @@ -492,10 +492,6 @@ void SnapshotHandler::ReadBlocks(const std::string partition_name, return; } if (IsIouringSupported()) { std::async(std::launch::async, &SnapshotHandler::ReadBlocksAsync, this, dm_block_device, partition_name, dev_sz); } else { int num_threads = 2; size_t num_blocks = dev_sz >> BLOCK_SHIFT; size_t num_blocks_per_thread = num_blocks / num_threads; Loading @@ -503,13 +499,12 @@ void SnapshotHandler::ReadBlocks(const std::string partition_name, off_t offset = 0; for (int i = 0; i < num_threads; i++) { std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device, partition_name, offset, read_sz_per_thread); std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device, partition_name, offset, read_sz_per_thread); offset += read_sz_per_thread; } } } /* * Entry point to launch threads Loading Loading @@ -694,10 +689,8 @@ bool SnapshotHandler::IsIouringSupported() { // During selinux init transition, libsnapshot will propagate the // status of io_uring enablement. As properties are not initialized, // we cannot query system property. // // TODO: b/219642530: Intermittent I/O failures observed if (is_io_uring_enabled_) { return false; return true; } // Finally check the system property Loading fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +22 −10 Original line number Diff line number Diff line Loading @@ -99,6 +99,7 @@ class ReadAhead { void InitializeRAIter(); bool RAIterDone(); void RAIterNext(); void RAResetIter(uint64_t num_blocks); const CowOperation* GetRAOpIter(); void InitializeBuffer(); Loading Loading @@ -151,12 +152,16 @@ class ReadAhead { std::unique_ptr<uint8_t[]> ra_temp_meta_buffer_; BufferSink bufsink_; uint64_t total_ra_blocks_completed_ = 0; bool read_ahead_async_ = false; // Queue depth of 32 seems optimal. We don't want // Queue depth of 8 seems optimal. We don't want // to have a huge depth as it may put more memory pressure // on the kernel worker threads given that we use // IOSQE_ASYNC flag. int queue_depth_ = 32; // IOSQE_ASYNC flag - ASYNC flags can potentially // result in EINTR; Since we don't restart // syscalls and fallback to synchronous I/O, we // don't want huge queue depth int queue_depth_ = 8; std::unique_ptr<struct io_uring> ring_; }; Loading Loading @@ -210,11 +215,12 @@ class Worker { // Merge related ops bool Merge(); bool MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter); bool MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter); bool MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter); bool AsyncMerge(); bool SyncMerge(); bool MergeOrderedOps(); bool MergeOrderedOpsAsync(); bool MergeReplaceZeroOps(); int PrepareMerge(uint64_t* source_offset, int* pending_ops, const std::unique_ptr<ICowOpIter>& cowop_iter, std::vector<const CowOperation*>* replace_zero_vec = nullptr); sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } Loading @@ -238,12 +244,18 @@ class Worker { unique_fd base_path_merge_fd_; unique_fd ctrl_fd_; std::unique_ptr<ICowOpIter> cowop_iter_; size_t ra_block_index_ = 0; uint64_t blocks_merged_in_group_ = 0; bool merge_async_ = false; // Queue depth of 32 seems optimal. We don't want // Queue depth of 8 seems optimal. We don't want // to have a huge depth as it may put more memory pressure // on the kernel worker threads given that we use // IOSQE_ASYNC flag. int queue_depth_ = 32; // IOSQE_ASYNC flag - ASYNC flags can potentially // result in EINTR; Since we don't restart // syscalls and fallback to synchronous I/O, we // don't want huge queue depth int queue_depth_ = 8; std::unique_ptr<struct io_uring> ring_; std::shared_ptr<SnapshotHandler> snapuserd_; Loading fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp +117 −69 Original line number Diff line number Diff line Loading @@ -24,15 +24,14 @@ using namespace android::dm; using android::base::unique_fd; int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, const std::unique_ptr<ICowOpIter>& cowop_iter, std::vector<const CowOperation*>* replace_zero_vec) { int num_ops = *pending_ops; int nr_consecutive = 0; bool checkOrderedOp = (replace_zero_vec == nullptr); do { if (!cowop_iter->Done() && num_ops) { const CowOperation* cow_op = &cowop_iter->Get(); if (!cowop_iter_->Done() && num_ops) { const CowOperation* cow_op = &cowop_iter_->Get(); if (checkOrderedOp && !IsOrderedOp(*cow_op)) { break; } Loading @@ -42,12 +41,12 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, replace_zero_vec->push_back(cow_op); } cowop_iter->Next(); cowop_iter_->Next(); num_ops -= 1; nr_consecutive = 1; while (!cowop_iter->Done() && num_ops) { const CowOperation* op = &cowop_iter->Get(); while (!cowop_iter_->Done() && num_ops) { const CowOperation* op = &cowop_iter_->Get(); if (checkOrderedOp && !IsOrderedOp(*op)) { break; } Loading @@ -63,7 +62,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, nr_consecutive += 1; num_ops -= 1; cowop_iter->Next(); cowop_iter_->Next(); } } } while (0); Loading @@ -71,7 +70,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, return nr_consecutive; } bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { bool Worker::MergeReplaceZeroOps() { // Flush every 8192 ops. Since all ops are independent and there is no // dependency between COW ops, we will flush the data and the number // of ops merged in COW file for every 8192 ops. If there is a crash, Loading @@ -84,15 +83,17 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32; int num_ops_merged = 0; while (!cowop_iter->Done()) { SNAP_LOG(INFO) << "MergeReplaceZeroOps started...."; while (!cowop_iter_->Done()) { int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ; std::vector<const CowOperation*> replace_zero_vec; uint64_t source_offset; int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter, &replace_zero_vec); int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec); if (linear_blocks == 0) { // Merge complete CHECK(cowop_iter->Done()); CHECK(cowop_iter_->Done()); break; } Loading @@ -117,8 +118,8 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) size_t io_size = linear_blocks * BLOCK_SZ; // Merge - Write the contents back to base device int ret = pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), io_size, source_offset); int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), io_size, source_offset)); if (ret < 0 || ret != io_size) { SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to write to backing device while merging " Loading Loading @@ -172,16 +173,15 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) return true; } bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) { bool Worker::MergeOrderedOpsAsync() { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); size_t block_index = 0; SNAP_LOG(INFO) << "MergeOrderedOpsAsync started...."; while (!cowop_iter->Done()) { const CowOperation* cow_op = &cowop_iter->Get(); while (!cowop_iter_->Done()) { const CowOperation* cow_op = &cowop_iter_->Get(); if (!IsOrderedOp(*cow_op)) { break; } Loading @@ -190,11 +190,10 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) // Wait for RA thread to notify that the merge window // is ready for merging. if (!snapuserd_->WaitForMergeBegin()) { snapuserd_->SetMergeFailed(block_index); return false; } snapuserd_->SetMergeInProgress(block_index); snapuserd_->SetMergeInProgress(ra_block_index_); loff_t offset = 0; int num_ops = snapuserd_->GetTotalBlocksToMerge(); Loading @@ -202,12 +201,13 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) int pending_sqe = queue_depth_; int pending_ios_to_submit = 0; bool flush_required = false; blocks_merged_in_group_ = 0; SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops; while (num_ops) { uint64_t source_offset; int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter); int linear_blocks = PrepareMerge(&source_offset, &num_ops); if (linear_blocks != 0) { size_t io_size = (linear_blocks * BLOCK_SZ); Loading @@ -216,7 +216,6 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get()); if (!sqe) { SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops"; snapuserd_->SetMergeFailed(block_index); return false; } Loading @@ -225,10 +224,18 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) offset += io_size; num_ops -= linear_blocks; blocks_merged_in_group_ += linear_blocks; pending_sqe -= 1; pending_ios_to_submit += 1; sqe->flags |= IOSQE_ASYNC; // These flags are important - We need to make sure that the // blocks are linked and are written in the same order as // populated. This is because of overlapping block writes. // // If there are no dependency, we can optimize this further by // allowing parallel writes; but for now, just link all the SQ // entries. sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC); } // Ring is full or no more COW ops to be merged in this batch Loading Loading @@ -256,7 +263,7 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) pending_sqe -= 1; flush_required = false; pending_ios_to_submit += 1; sqe->flags |= IOSQE_ASYNC; sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC); } } else { flush_required = true; Loading @@ -269,35 +276,45 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: " << " io submit: " << ret << " expected: " << pending_ios_to_submit; snapuserd_->SetMergeFailed(block_index); return false; } int pending_ios_to_complete = pending_ios_to_submit; pending_ios_to_submit = 0; bool status = true; // Reap I/O completions while (pending_ios_to_complete) { struct io_uring_cqe* cqe; // We need to make sure to reap all the I/O's submitted // even if there are any errors observed. // // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR; // these error codes are not truly I/O errors; we can retry them // by re-populating the SQE entries and submitting the I/O // request back. However, we don't do that now; instead we // will fallback to synchronous I/O. ret = io_uring_wait_cqe(ring_.get(), &cqe); if (ret) { SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret; snapuserd_->SetMergeFailed(block_index); return false; SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << ret; status = false; } if (cqe->res < 0) { SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res; snapuserd_->SetMergeFailed(block_index); return false; SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res; status = false; } io_uring_cqe_seen(ring_.get(), cqe); pending_ios_to_complete -= 1; } if (!status) { return false; } pending_sqe = queue_depth_; } Loading @@ -312,7 +329,6 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) // Flush the data if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) { SNAP_LOG(ERROR) << " Failed to fsync merged data"; snapuserd_->SetMergeFailed(block_index); return false; } Loading @@ -320,35 +336,34 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) // the merge completion if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; snapuserd_->SetMergeFailed(block_index); return false; } SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); // Mark the block as merge complete snapuserd_->SetMergeCompleted(block_index); snapuserd_->SetMergeCompleted(ra_block_index_); // Notify RA thread that the merge thread is ready to merge the next // window snapuserd_->NotifyRAForMergeReady(); // Get the next block block_index += 1; ra_block_index_ += 1; } return true; } bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { bool Worker::MergeOrderedOps() { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); size_t block_index = 0; SNAP_LOG(INFO) << "MergeOrderedOps started...."; while (!cowop_iter->Done()) { const CowOperation* cow_op = &cowop_iter->Get(); while (!cowop_iter_->Done()) { const CowOperation* cow_op = &cowop_iter_->Get(); if (!IsOrderedOp(*cow_op)) { break; } Loading @@ -357,11 +372,11 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { // Wait for RA thread to notify that the merge window // is ready for merging. if (!snapuserd_->WaitForMergeBegin()) { snapuserd_->SetMergeFailed(block_index); snapuserd_->SetMergeFailed(ra_block_index_); return false; } snapuserd_->SetMergeInProgress(block_index); snapuserd_->SetMergeInProgress(ra_block_index_); loff_t offset = 0; int num_ops = snapuserd_->GetTotalBlocksToMerge(); Loading @@ -369,7 +384,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { while (num_ops) { uint64_t source_offset; int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter); int linear_blocks = PrepareMerge(&source_offset, &num_ops); if (linear_blocks == 0) { break; } Loading @@ -378,12 +393,13 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { // Write to the base device. Data is already in the RA buffer. Note // that XOR ops is already handled by the RA thread. We just write // the contents out. int ret = pwrite(base_path_merge_fd_.get(), (char*)read_ahead_buffer + offset, io_size, source_offset); int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), (char*)read_ahead_buffer + offset, io_size, source_offset)); if (ret < 0 || ret != io_size) { SNAP_LOG(ERROR) << "Failed to write to backing device while merging " << " at offset: " << source_offset << " io_size: " << io_size; snapuserd_->SetMergeFailed(block_index); snapuserd_->SetMergeFailed(ra_block_index_); return false; } Loading @@ -397,7 +413,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { // Flush the data if (fsync(base_path_merge_fd_.get()) < 0) { SNAP_LOG(ERROR) << " Failed to fsync merged data"; snapuserd_->SetMergeFailed(block_index); snapuserd_->SetMergeFailed(ra_block_index_); return false; } Loading @@ -405,47 +421,87 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { // the merge completion if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; snapuserd_->SetMergeFailed(block_index); snapuserd_->SetMergeFailed(ra_block_index_); return false; } SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); // Mark the block as merge complete snapuserd_->SetMergeCompleted(block_index); snapuserd_->SetMergeCompleted(ra_block_index_); // Notify RA thread that the merge thread is ready to merge the next // window snapuserd_->NotifyRAForMergeReady(); // Get the next block block_index += 1; ra_block_index_ += 1; } return true; } bool Worker::Merge() { std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter(); bool Worker::AsyncMerge() { if (!MergeOrderedOpsAsync()) { SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O"; // Reset the iter so that we retry the merge while (blocks_merged_in_group_ && !cowop_iter_->RDone()) { cowop_iter_->Prev(); blocks_merged_in_group_ -= 1; } if (merge_async_) { if (!MergeOrderedOpsAsync(cowop_iter)) { return false; } SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed"; return true; } bool Worker::SyncMerge() { if (!MergeOrderedOps()) { SNAP_LOG(ERROR) << "Merge failed for ordered ops"; snapuserd_->MergeFailed(); return false; } SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed....."; } else { // Start with Copy and Xor ops if (!MergeOrderedOps(cowop_iter)) { SNAP_LOG(INFO) << "MergeOrderedOps completed"; return true; } bool Worker::Merge() { cowop_iter_ = reader_->GetMergeOpIter(); bool retry = false; bool ordered_ops_merge_status; // Start Async Merge if (merge_async_) { ordered_ops_merge_status = AsyncMerge(); if (!ordered_ops_merge_status) { FinalizeIouring(); retry = true; merge_async_ = false; } } // Check if we need to fallback and retry the merge // // If the device doesn't support async merge, we // will directly enter here (aka devices with 4.x kernels) const bool sync_merge_required = (retry || !merge_async_); if (sync_merge_required) { ordered_ops_merge_status = SyncMerge(); if (!ordered_ops_merge_status) { // Merge failed. Device will continue to be mounted // off snapshots; merge will be retried during // next reboot SNAP_LOG(ERROR) << "Merge failed for ordered ops"; snapuserd_->MergeFailed(); return false; } SNAP_LOG(INFO) << "MergeOrderedOps completed....."; } // Replace and Zero ops if (!MergeReplaceZeroOps(cowop_iter)) { if (!MergeReplaceZeroOps()) { SNAP_LOG(ERROR) << "Merge failed for replace/zero ops"; snapuserd_->MergeFailed(); return false; Loading @@ -461,14 +517,6 @@ bool Worker::InitializeIouring() { return false; } { // TODO: b/219642530 - Disable io_uring for merge // until we figure out the cause of intermittent // IO failures. merge_async_ = false; return true; } ring_ = std::make_unique<struct io_uring>(); int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0); Loading Loading @@ -514,7 +562,7 @@ bool Worker::RunMergeThread() { CloseFds(); reader_->CloseCowFd(); SNAP_LOG(INFO) << "Merge finish"; SNAP_LOG(INFO) << "Snapshot-Merge completed"; return true; } Loading Loading
fs_mgr/libsnapshot/cow_reader.cpp +36 −0 Original line number Diff line number Diff line Loading @@ -550,6 +550,9 @@ class CowOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; void Prev() override; bool RDone() override; private: std::shared_ptr<std::vector<CowOperation>> ops_; std::vector<CowOperation>::iterator op_iter_; Loading @@ -560,6 +563,15 @@ CowOpIter::CowOpIter(std::shared_ptr<std::vector<CowOperation>>& ops) { op_iter_ = ops_->begin(); } bool CowOpIter::RDone() { return op_iter_ == ops_->begin(); } void CowOpIter::Prev() { CHECK(!RDone()); op_iter_--; } bool CowOpIter::Done() { return op_iter_ == ops_->end(); } Loading @@ -585,6 +597,9 @@ class CowRevMergeOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; void Prev() override; bool RDone() override; private: std::shared_ptr<std::vector<CowOperation>> ops_; std::shared_ptr<std::vector<uint32_t>> merge_op_blocks_; Loading @@ -603,6 +618,9 @@ class CowMergeOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; void Prev() override; bool RDone() override; private: std::shared_ptr<std::vector<CowOperation>> ops_; std::shared_ptr<std::vector<uint32_t>> merge_op_blocks_; Loading @@ -623,6 +641,15 @@ CowMergeOpIter::CowMergeOpIter(std::shared_ptr<std::vector<CowOperation>> ops, block_iter_ = merge_op_blocks->begin() + start; } bool CowMergeOpIter::RDone() { return block_iter_ == merge_op_blocks_->begin(); } void CowMergeOpIter::Prev() { CHECK(!RDone()); block_iter_--; } bool CowMergeOpIter::Done() { return block_iter_ == merge_op_blocks_->end(); } Loading @@ -649,6 +676,15 @@ CowRevMergeOpIter::CowRevMergeOpIter(std::shared_ptr<std::vector<CowOperation>> block_riter_ = merge_op_blocks->rbegin(); } bool CowRevMergeOpIter::RDone() { return block_riter_ == merge_op_blocks_->rbegin(); } void CowRevMergeOpIter::Prev() { CHECK(!RDone()); block_riter_--; } bool CowRevMergeOpIter::Done() { return block_riter_ == merge_op_blocks_->rend() - start_; } Loading
fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h +7 −1 Original line number Diff line number Diff line Loading @@ -92,7 +92,7 @@ class ICowOpIter { public: virtual ~ICowOpIter() {} // True if there are more items to read, false otherwise. // True if there are no more items to read forward, false otherwise. virtual bool Done() = 0; // Read the current operation. Loading @@ -100,6 +100,12 @@ class ICowOpIter { // Advance to the next item. virtual void Next() = 0; // Advance to the previous item. virtual void Prev() = 0; // True if there are no more items to read backwards, false otherwise virtual bool RDone() = 0; }; class CowReader final : public ICowReader { Loading
fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp +10 −17 Original line number Diff line number Diff line Loading @@ -492,10 +492,6 @@ void SnapshotHandler::ReadBlocks(const std::string partition_name, return; } if (IsIouringSupported()) { std::async(std::launch::async, &SnapshotHandler::ReadBlocksAsync, this, dm_block_device, partition_name, dev_sz); } else { int num_threads = 2; size_t num_blocks = dev_sz >> BLOCK_SHIFT; size_t num_blocks_per_thread = num_blocks / num_threads; Loading @@ -503,13 +499,12 @@ void SnapshotHandler::ReadBlocks(const std::string partition_name, off_t offset = 0; for (int i = 0; i < num_threads; i++) { std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device, partition_name, offset, read_sz_per_thread); std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device, partition_name, offset, read_sz_per_thread); offset += read_sz_per_thread; } } } /* * Entry point to launch threads Loading Loading @@ -694,10 +689,8 @@ bool SnapshotHandler::IsIouringSupported() { // During selinux init transition, libsnapshot will propagate the // status of io_uring enablement. As properties are not initialized, // we cannot query system property. // // TODO: b/219642530: Intermittent I/O failures observed if (is_io_uring_enabled_) { return false; return true; } // Finally check the system property Loading
fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +22 −10 Original line number Diff line number Diff line Loading @@ -99,6 +99,7 @@ class ReadAhead { void InitializeRAIter(); bool RAIterDone(); void RAIterNext(); void RAResetIter(uint64_t num_blocks); const CowOperation* GetRAOpIter(); void InitializeBuffer(); Loading Loading @@ -151,12 +152,16 @@ class ReadAhead { std::unique_ptr<uint8_t[]> ra_temp_meta_buffer_; BufferSink bufsink_; uint64_t total_ra_blocks_completed_ = 0; bool read_ahead_async_ = false; // Queue depth of 32 seems optimal. We don't want // Queue depth of 8 seems optimal. We don't want // to have a huge depth as it may put more memory pressure // on the kernel worker threads given that we use // IOSQE_ASYNC flag. int queue_depth_ = 32; // IOSQE_ASYNC flag - ASYNC flags can potentially // result in EINTR; Since we don't restart // syscalls and fallback to synchronous I/O, we // don't want huge queue depth int queue_depth_ = 8; std::unique_ptr<struct io_uring> ring_; }; Loading Loading @@ -210,11 +215,12 @@ class Worker { // Merge related ops bool Merge(); bool MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter); bool MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter); bool MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter); bool AsyncMerge(); bool SyncMerge(); bool MergeOrderedOps(); bool MergeOrderedOpsAsync(); bool MergeReplaceZeroOps(); int PrepareMerge(uint64_t* source_offset, int* pending_ops, const std::unique_ptr<ICowOpIter>& cowop_iter, std::vector<const CowOperation*>* replace_zero_vec = nullptr); sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } Loading @@ -238,12 +244,18 @@ class Worker { unique_fd base_path_merge_fd_; unique_fd ctrl_fd_; std::unique_ptr<ICowOpIter> cowop_iter_; size_t ra_block_index_ = 0; uint64_t blocks_merged_in_group_ = 0; bool merge_async_ = false; // Queue depth of 32 seems optimal. We don't want // Queue depth of 8 seems optimal. We don't want // to have a huge depth as it may put more memory pressure // on the kernel worker threads given that we use // IOSQE_ASYNC flag. int queue_depth_ = 32; // IOSQE_ASYNC flag - ASYNC flags can potentially // result in EINTR; Since we don't restart // syscalls and fallback to synchronous I/O, we // don't want huge queue depth int queue_depth_ = 8; std::unique_ptr<struct io_uring> ring_; std::shared_ptr<SnapshotHandler> snapuserd_; Loading
fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp +117 −69 Original line number Diff line number Diff line Loading @@ -24,15 +24,14 @@ using namespace android::dm; using android::base::unique_fd; int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, const std::unique_ptr<ICowOpIter>& cowop_iter, std::vector<const CowOperation*>* replace_zero_vec) { int num_ops = *pending_ops; int nr_consecutive = 0; bool checkOrderedOp = (replace_zero_vec == nullptr); do { if (!cowop_iter->Done() && num_ops) { const CowOperation* cow_op = &cowop_iter->Get(); if (!cowop_iter_->Done() && num_ops) { const CowOperation* cow_op = &cowop_iter_->Get(); if (checkOrderedOp && !IsOrderedOp(*cow_op)) { break; } Loading @@ -42,12 +41,12 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, replace_zero_vec->push_back(cow_op); } cowop_iter->Next(); cowop_iter_->Next(); num_ops -= 1; nr_consecutive = 1; while (!cowop_iter->Done() && num_ops) { const CowOperation* op = &cowop_iter->Get(); while (!cowop_iter_->Done() && num_ops) { const CowOperation* op = &cowop_iter_->Get(); if (checkOrderedOp && !IsOrderedOp(*op)) { break; } Loading @@ -63,7 +62,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, nr_consecutive += 1; num_ops -= 1; cowop_iter->Next(); cowop_iter_->Next(); } } } while (0); Loading @@ -71,7 +70,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, return nr_consecutive; } bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { bool Worker::MergeReplaceZeroOps() { // Flush every 8192 ops. Since all ops are independent and there is no // dependency between COW ops, we will flush the data and the number // of ops merged in COW file for every 8192 ops. If there is a crash, Loading @@ -84,15 +83,17 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32; int num_ops_merged = 0; while (!cowop_iter->Done()) { SNAP_LOG(INFO) << "MergeReplaceZeroOps started...."; while (!cowop_iter_->Done()) { int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ; std::vector<const CowOperation*> replace_zero_vec; uint64_t source_offset; int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter, &replace_zero_vec); int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec); if (linear_blocks == 0) { // Merge complete CHECK(cowop_iter->Done()); CHECK(cowop_iter_->Done()); break; } Loading @@ -117,8 +118,8 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) size_t io_size = linear_blocks * BLOCK_SZ; // Merge - Write the contents back to base device int ret = pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), io_size, source_offset); int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), io_size, source_offset)); if (ret < 0 || ret != io_size) { SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to write to backing device while merging " Loading Loading @@ -172,16 +173,15 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) return true; } bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) { bool Worker::MergeOrderedOpsAsync() { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); size_t block_index = 0; SNAP_LOG(INFO) << "MergeOrderedOpsAsync started...."; while (!cowop_iter->Done()) { const CowOperation* cow_op = &cowop_iter->Get(); while (!cowop_iter_->Done()) { const CowOperation* cow_op = &cowop_iter_->Get(); if (!IsOrderedOp(*cow_op)) { break; } Loading @@ -190,11 +190,10 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) // Wait for RA thread to notify that the merge window // is ready for merging. if (!snapuserd_->WaitForMergeBegin()) { snapuserd_->SetMergeFailed(block_index); return false; } snapuserd_->SetMergeInProgress(block_index); snapuserd_->SetMergeInProgress(ra_block_index_); loff_t offset = 0; int num_ops = snapuserd_->GetTotalBlocksToMerge(); Loading @@ -202,12 +201,13 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) int pending_sqe = queue_depth_; int pending_ios_to_submit = 0; bool flush_required = false; blocks_merged_in_group_ = 0; SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops; while (num_ops) { uint64_t source_offset; int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter); int linear_blocks = PrepareMerge(&source_offset, &num_ops); if (linear_blocks != 0) { size_t io_size = (linear_blocks * BLOCK_SZ); Loading @@ -216,7 +216,6 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get()); if (!sqe) { SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops"; snapuserd_->SetMergeFailed(block_index); return false; } Loading @@ -225,10 +224,18 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) offset += io_size; num_ops -= linear_blocks; blocks_merged_in_group_ += linear_blocks; pending_sqe -= 1; pending_ios_to_submit += 1; sqe->flags |= IOSQE_ASYNC; // These flags are important - We need to make sure that the // blocks are linked and are written in the same order as // populated. This is because of overlapping block writes. // // If there are no dependency, we can optimize this further by // allowing parallel writes; but for now, just link all the SQ // entries. sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC); } // Ring is full or no more COW ops to be merged in this batch Loading Loading @@ -256,7 +263,7 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) pending_sqe -= 1; flush_required = false; pending_ios_to_submit += 1; sqe->flags |= IOSQE_ASYNC; sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC); } } else { flush_required = true; Loading @@ -269,35 +276,45 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: " << " io submit: " << ret << " expected: " << pending_ios_to_submit; snapuserd_->SetMergeFailed(block_index); return false; } int pending_ios_to_complete = pending_ios_to_submit; pending_ios_to_submit = 0; bool status = true; // Reap I/O completions while (pending_ios_to_complete) { struct io_uring_cqe* cqe; // We need to make sure to reap all the I/O's submitted // even if there are any errors observed. // // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR; // these error codes are not truly I/O errors; we can retry them // by re-populating the SQE entries and submitting the I/O // request back. However, we don't do that now; instead we // will fallback to synchronous I/O. ret = io_uring_wait_cqe(ring_.get(), &cqe); if (ret) { SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret; snapuserd_->SetMergeFailed(block_index); return false; SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << ret; status = false; } if (cqe->res < 0) { SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res; snapuserd_->SetMergeFailed(block_index); return false; SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res; status = false; } io_uring_cqe_seen(ring_.get(), cqe); pending_ios_to_complete -= 1; } if (!status) { return false; } pending_sqe = queue_depth_; } Loading @@ -312,7 +329,6 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) // Flush the data if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) { SNAP_LOG(ERROR) << " Failed to fsync merged data"; snapuserd_->SetMergeFailed(block_index); return false; } Loading @@ -320,35 +336,34 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) // the merge completion if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; snapuserd_->SetMergeFailed(block_index); return false; } SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); // Mark the block as merge complete snapuserd_->SetMergeCompleted(block_index); snapuserd_->SetMergeCompleted(ra_block_index_); // Notify RA thread that the merge thread is ready to merge the next // window snapuserd_->NotifyRAForMergeReady(); // Get the next block block_index += 1; ra_block_index_ += 1; } return true; } bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { bool Worker::MergeOrderedOps() { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); size_t block_index = 0; SNAP_LOG(INFO) << "MergeOrderedOps started...."; while (!cowop_iter->Done()) { const CowOperation* cow_op = &cowop_iter->Get(); while (!cowop_iter_->Done()) { const CowOperation* cow_op = &cowop_iter_->Get(); if (!IsOrderedOp(*cow_op)) { break; } Loading @@ -357,11 +372,11 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { // Wait for RA thread to notify that the merge window // is ready for merging. if (!snapuserd_->WaitForMergeBegin()) { snapuserd_->SetMergeFailed(block_index); snapuserd_->SetMergeFailed(ra_block_index_); return false; } snapuserd_->SetMergeInProgress(block_index); snapuserd_->SetMergeInProgress(ra_block_index_); loff_t offset = 0; int num_ops = snapuserd_->GetTotalBlocksToMerge(); Loading @@ -369,7 +384,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { while (num_ops) { uint64_t source_offset; int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter); int linear_blocks = PrepareMerge(&source_offset, &num_ops); if (linear_blocks == 0) { break; } Loading @@ -378,12 +393,13 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { // Write to the base device. Data is already in the RA buffer. Note // that XOR ops is already handled by the RA thread. We just write // the contents out. int ret = pwrite(base_path_merge_fd_.get(), (char*)read_ahead_buffer + offset, io_size, source_offset); int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), (char*)read_ahead_buffer + offset, io_size, source_offset)); if (ret < 0 || ret != io_size) { SNAP_LOG(ERROR) << "Failed to write to backing device while merging " << " at offset: " << source_offset << " io_size: " << io_size; snapuserd_->SetMergeFailed(block_index); snapuserd_->SetMergeFailed(ra_block_index_); return false; } Loading @@ -397,7 +413,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { // Flush the data if (fsync(base_path_merge_fd_.get()) < 0) { SNAP_LOG(ERROR) << " Failed to fsync merged data"; snapuserd_->SetMergeFailed(block_index); snapuserd_->SetMergeFailed(ra_block_index_); return false; } Loading @@ -405,47 +421,87 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) { // the merge completion if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; snapuserd_->SetMergeFailed(block_index); snapuserd_->SetMergeFailed(ra_block_index_); return false; } SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); // Mark the block as merge complete snapuserd_->SetMergeCompleted(block_index); snapuserd_->SetMergeCompleted(ra_block_index_); // Notify RA thread that the merge thread is ready to merge the next // window snapuserd_->NotifyRAForMergeReady(); // Get the next block block_index += 1; ra_block_index_ += 1; } return true; } bool Worker::Merge() { std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter(); bool Worker::AsyncMerge() { if (!MergeOrderedOpsAsync()) { SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O"; // Reset the iter so that we retry the merge while (blocks_merged_in_group_ && !cowop_iter_->RDone()) { cowop_iter_->Prev(); blocks_merged_in_group_ -= 1; } if (merge_async_) { if (!MergeOrderedOpsAsync(cowop_iter)) { return false; } SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed"; return true; } bool Worker::SyncMerge() { if (!MergeOrderedOps()) { SNAP_LOG(ERROR) << "Merge failed for ordered ops"; snapuserd_->MergeFailed(); return false; } SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed....."; } else { // Start with Copy and Xor ops if (!MergeOrderedOps(cowop_iter)) { SNAP_LOG(INFO) << "MergeOrderedOps completed"; return true; } bool Worker::Merge() { cowop_iter_ = reader_->GetMergeOpIter(); bool retry = false; bool ordered_ops_merge_status; // Start Async Merge if (merge_async_) { ordered_ops_merge_status = AsyncMerge(); if (!ordered_ops_merge_status) { FinalizeIouring(); retry = true; merge_async_ = false; } } // Check if we need to fallback and retry the merge // // If the device doesn't support async merge, we // will directly enter here (aka devices with 4.x kernels) const bool sync_merge_required = (retry || !merge_async_); if (sync_merge_required) { ordered_ops_merge_status = SyncMerge(); if (!ordered_ops_merge_status) { // Merge failed. Device will continue to be mounted // off snapshots; merge will be retried during // next reboot SNAP_LOG(ERROR) << "Merge failed for ordered ops"; snapuserd_->MergeFailed(); return false; } SNAP_LOG(INFO) << "MergeOrderedOps completed....."; } // Replace and Zero ops if (!MergeReplaceZeroOps(cowop_iter)) { if (!MergeReplaceZeroOps()) { SNAP_LOG(ERROR) << "Merge failed for replace/zero ops"; snapuserd_->MergeFailed(); return false; Loading @@ -461,14 +517,6 @@ bool Worker::InitializeIouring() { return false; } { // TODO: b/219642530 - Disable io_uring for merge // until we figure out the cause of intermittent // IO failures. merge_async_ = false; return true; } ring_ = std::make_unique<struct io_uring>(); int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0); Loading Loading @@ -514,7 +562,7 @@ bool Worker::RunMergeThread() { CloseFds(); reader_->CloseCowFd(); SNAP_LOG(INFO) << "Merge finish"; SNAP_LOG(INFO) << "Snapshot-Merge completed"; return true; } Loading