Loading fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -468,7 +468,7 @@ bool CowReader::VerifyMergeOps() { if (overwritten_blocks.count(block)) { overwrite = overwritten_blocks[block]; LOG(ERROR) << "Invalid Sequence! Block needed for op:\n" << op << "\noverwritten by previously merged op:\n" << *op << "\noverwritten by previously merged op:\n" << *overwrite; } if (misaligned && overwritten_blocks.count(block + 1)) { Loading fs_mgr/libsnapshot/libsnapshot_cow/create_cow.cpp +64 −16 Original line number Diff line number Diff line Loading @@ -376,29 +376,77 @@ bool CreateSnapshot::WriteNonOrderedSnapshots() { } return true; } bool CreateSnapshot::WriteOrderedSnapshots() { std::unordered_map<uint64_t, uint64_t> overwritten_blocks; std::vector<std::pair<uint64_t, uint64_t>> merge_sequence; for (auto it = copy_blocks_.begin(); it != copy_blocks_.end(); it++) { if (overwritten_blocks.count(it->second)) { replace_blocks_.push_back(it->first); continue; // Sort copy_blocks_ by target block index so consecutive // target blocks can be together std::vector<std::pair<uint64_t, uint64_t>> sorted_copy_blocks_(copy_blocks_.begin(), copy_blocks_.end()); std::sort(sorted_copy_blocks_.begin(), sorted_copy_blocks_.end()); std::unordered_map<uint64_t, std::vector<uint64_t>> dependency_graph; std::unordered_map<uint64_t, int> in_degree; // Initialize in-degree and build the dependency graph for (const auto& [target, source] : sorted_copy_blocks_) { in_degree[target] = 0; if (copy_blocks_.count(source)) { // this source block itself gets modified dependency_graph[source].push_back(target); in_degree[target]++; } overwritten_blocks[it->first] = it->second; merge_sequence.emplace_back(std::make_pair(it->first, it->second)); } // Sort the blocks so that if the blocks are contiguous, it would help // compress multiple blocks in one shot based on the compression factor. std::sort(replace_blocks_.begin(), replace_blocks_.end()); copy_ops_ = merge_sequence.size(); for (auto it = merge_sequence.begin(); it != merge_sequence.end(); it++) { if (!writer_->AddCopy(it->first, it->second, 1)) { return false; std::vector<uint64_t> ordered_copy_ops_; std::deque<uint64_t> queue; // Add nodes with in-degree 0 (no dependency) to the queue for (const auto& [target, degree] : in_degree) { if (degree == 0) { queue.push_back(target); } } while (!queue.empty()) { uint64_t current_target = queue.front(); queue.pop_front(); ordered_copy_ops_.push_back(current_target); if (dependency_graph.count(current_target)) { for (uint64_t neighbor : dependency_graph[current_target]) { in_degree[neighbor]--; if (in_degree[neighbor] == 0) { queue.push_back(neighbor); } } } } // Detect cycles and change those blocks to replace blocks if (ordered_copy_ops_.size() != copy_blocks_.size()) { LOG(INFO) << "Cycle detected in copy operations! Converting some to replace."; std::unordered_set<uint64_t> safe_targets_(ordered_copy_ops_.begin(), ordered_copy_ops_.end()); for (const auto& [target, source] : copy_blocks_) { if (safe_targets_.find(target) == safe_targets_.end()) { replace_blocks_.push_back(target); copy_blocks_.erase(target); } } } std::reverse(ordered_copy_ops_.begin(), ordered_copy_ops_.end()); // Add the copy blocks copy_ops_ = 0; for (uint64_t target : ordered_copy_ops_) { LOG(DEBUG) << "copy target: " << target << " source: " << copy_blocks_[target]; if (!writer_->AddCopy(target, copy_blocks_[target], 1)) { return false; } copy_ops_++; } // Sort the blocks so that if the blocks are contiguous, it would help // compress multiple blocks in one shot based on the compression factor. std::sort(replace_blocks_.begin(), replace_blocks_.end()); LOG(DEBUG) << "Total copy ops: " << copy_ops_; return true; } Loading Loading
fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -468,7 +468,7 @@ bool CowReader::VerifyMergeOps() { if (overwritten_blocks.count(block)) { overwrite = overwritten_blocks[block]; LOG(ERROR) << "Invalid Sequence! Block needed for op:\n" << op << "\noverwritten by previously merged op:\n" << *op << "\noverwritten by previously merged op:\n" << *overwrite; } if (misaligned && overwritten_blocks.count(block + 1)) { Loading
fs_mgr/libsnapshot/libsnapshot_cow/create_cow.cpp +64 −16 Original line number Diff line number Diff line Loading @@ -376,29 +376,77 @@ bool CreateSnapshot::WriteNonOrderedSnapshots() { } return true; } bool CreateSnapshot::WriteOrderedSnapshots() { std::unordered_map<uint64_t, uint64_t> overwritten_blocks; std::vector<std::pair<uint64_t, uint64_t>> merge_sequence; for (auto it = copy_blocks_.begin(); it != copy_blocks_.end(); it++) { if (overwritten_blocks.count(it->second)) { replace_blocks_.push_back(it->first); continue; // Sort copy_blocks_ by target block index so consecutive // target blocks can be together std::vector<std::pair<uint64_t, uint64_t>> sorted_copy_blocks_(copy_blocks_.begin(), copy_blocks_.end()); std::sort(sorted_copy_blocks_.begin(), sorted_copy_blocks_.end()); std::unordered_map<uint64_t, std::vector<uint64_t>> dependency_graph; std::unordered_map<uint64_t, int> in_degree; // Initialize in-degree and build the dependency graph for (const auto& [target, source] : sorted_copy_blocks_) { in_degree[target] = 0; if (copy_blocks_.count(source)) { // this source block itself gets modified dependency_graph[source].push_back(target); in_degree[target]++; } overwritten_blocks[it->first] = it->second; merge_sequence.emplace_back(std::make_pair(it->first, it->second)); } // Sort the blocks so that if the blocks are contiguous, it would help // compress multiple blocks in one shot based on the compression factor. std::sort(replace_blocks_.begin(), replace_blocks_.end()); copy_ops_ = merge_sequence.size(); for (auto it = merge_sequence.begin(); it != merge_sequence.end(); it++) { if (!writer_->AddCopy(it->first, it->second, 1)) { return false; std::vector<uint64_t> ordered_copy_ops_; std::deque<uint64_t> queue; // Add nodes with in-degree 0 (no dependency) to the queue for (const auto& [target, degree] : in_degree) { if (degree == 0) { queue.push_back(target); } } while (!queue.empty()) { uint64_t current_target = queue.front(); queue.pop_front(); ordered_copy_ops_.push_back(current_target); if (dependency_graph.count(current_target)) { for (uint64_t neighbor : dependency_graph[current_target]) { in_degree[neighbor]--; if (in_degree[neighbor] == 0) { queue.push_back(neighbor); } } } } // Detect cycles and change those blocks to replace blocks if (ordered_copy_ops_.size() != copy_blocks_.size()) { LOG(INFO) << "Cycle detected in copy operations! Converting some to replace."; std::unordered_set<uint64_t> safe_targets_(ordered_copy_ops_.begin(), ordered_copy_ops_.end()); for (const auto& [target, source] : copy_blocks_) { if (safe_targets_.find(target) == safe_targets_.end()) { replace_blocks_.push_back(target); copy_blocks_.erase(target); } } } std::reverse(ordered_copy_ops_.begin(), ordered_copy_ops_.end()); // Add the copy blocks copy_ops_ = 0; for (uint64_t target : ordered_copy_ops_) { LOG(DEBUG) << "copy target: " << target << " source: " << copy_blocks_[target]; if (!writer_->AddCopy(target, copy_blocks_[target], 1)) { return false; } copy_ops_++; } // Sort the blocks so that if the blocks are contiguous, it would help // compress multiple blocks in one shot based on the compression factor. std::sort(replace_blocks_.begin(), replace_blocks_.end()); LOG(DEBUG) << "Total copy ops: " << copy_ops_; return true; } Loading