Loading fs_mgr/libsnapshot/libsnapshot_cow/create_cow.cpp +169 −21 Original line number Diff line number Diff line Loading @@ -71,6 +71,8 @@ class CreateSnapshot { const int kNumThreads = 6; const size_t kBlockSizeToRead = 1_MiB; const size_t compression_factor_ = 64_KiB; size_t replace_ops_ = 0, copy_ops_ = 0, zero_ops_ = 0, in_place_ops_ = 0; std::unordered_map<std::string, int> source_block_hash_; std::mutex source_block_hash_lock_; Loading @@ -81,7 +83,12 @@ class CreateSnapshot { std::unique_ptr<uint8_t[]> zblock_; std::string compression_ = "lz4"; unique_fd fd_; unique_fd cow_fd_; unique_fd target_fd_; std::vector<uint64_t> zero_blocks_; std::vector<uint64_t> replace_blocks_; std::unordered_map<uint64_t, uint64_t> copy_blocks_; const int BLOCK_SZ = 4_KiB; void SHA256(const void* data, size_t length, uint8_t out[32]); Loading @@ -93,7 +100,14 @@ class CreateSnapshot { bool FindSourceBlockHash(); bool PrepareParse(std::string& parsing_file, const bool createSnapshot); bool ParsePartition(); bool WriteSnapshot(const void* buffer, uint64_t block, std::string& block_hash); void PrepareMergeBlock(const void* buffer, uint64_t block, std::string& block_hash); bool WriteV3Snapshots(); size_t PrepareWrite(size_t* pending_ops, size_t start_index); bool CreateSnapshotWriter(); bool WriteOrderedSnapshots(); bool WriteNonOrderedSnapshots(); bool VerifyMergeOrder(); }; void CreateSnapshotLogger(android::base::LogId, android::base::LogSeverity severity, const char*, Loading @@ -118,21 +132,19 @@ bool CreateSnapshot::PrepareParse(std::string& parsing_file, const bool createSn create_snapshot_patch_ = createSnapshot; if (createSnapshot) { fd_.reset(open(patch_file_.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666)); if (fd_ < 0) { cow_fd_.reset(open(patch_file_.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666)); if (cow_fd_ < 0) { PLOG(ERROR) << "Failed to open the snapshot-patch file: " << patch_file_; return false; } target_fd_.reset((open(parsing_file_.c_str(), O_RDONLY))); if (target_fd_ < 0) { LOG(ERROR) << "open failed: " << parsing_file_; return false; } zblock_ = std::make_unique<uint8_t[]>(BLOCK_SZ); std::memset(zblock_.get(), 0, BLOCK_SZ); CowOptions options; options.compression = compression_; options.num_compress_threads = 2; options.batch_write = true; options.cluster_ops = 600; writer_ = CreateCowWriter(2, options, std::move(fd_)); } return true; } Loading Loading @@ -187,19 +199,158 @@ std::string CreateSnapshot::ToHexString(const uint8_t* buf, size_t len) { return out; } bool CreateSnapshot::WriteSnapshot(const void* buffer, uint64_t block, std::string& block_hash) { void CreateSnapshot::PrepareMergeBlock(const void* buffer, uint64_t block, std::string& block_hash) { if (std::memcmp(zblock_.get(), buffer, BLOCK_SZ) == 0) { std::lock_guard<std::mutex> lock(write_lock_); return writer_->AddZeroBlocks(block, 1); zero_blocks_.push_back(block); return; } auto iter = source_block_hash_.find(block_hash); if (iter != source_block_hash_.end()) { std::lock_guard<std::mutex> lock(write_lock_); return writer_->AddCopy(block, iter->second, 1); // In-place copy is skipped if (block != iter->second) { copy_blocks_[block] = iter->second; } else { in_place_ops_ += 1; } return; } std::lock_guard<std::mutex> lock(write_lock_); return writer_->AddRawBlocks(block, buffer, BLOCK_SZ); replace_blocks_.push_back(block); } size_t CreateSnapshot::PrepareWrite(size_t* pending_ops, size_t start_index) { size_t num_ops = *pending_ops; uint64_t start_block = replace_blocks_[start_index]; size_t nr_consecutive = 1; num_ops -= 1; while (num_ops) { uint64_t next_block = replace_blocks_[start_index + nr_consecutive]; if (next_block != start_block + nr_consecutive) { break; } nr_consecutive += 1; num_ops -= 1; } return nr_consecutive; } bool CreateSnapshot::CreateSnapshotWriter() { uint64_t dev_sz = lseek(target_fd_.get(), 0, SEEK_END); CowOptions options; options.compression = compression_; options.num_compress_threads = 2; options.batch_write = true; options.cluster_ops = 600; options.compression_factor = compression_factor_; options.max_blocks = {dev_sz / options.block_size}; writer_ = CreateCowWriter(3, options, std::move(cow_fd_)); return true; } bool CreateSnapshot::WriteNonOrderedSnapshots() { zero_ops_ = zero_blocks_.size(); for (auto it = zero_blocks_.begin(); it != zero_blocks_.end(); it++) { if (!writer_->AddZeroBlocks(*it, 1)) { return false; } } std::string buffer(compression_factor_, '\0'); replace_ops_ = replace_blocks_.size(); size_t blocks_to_compress = replace_blocks_.size(); size_t num_ops = 0; size_t block_index = 0; while (blocks_to_compress) { num_ops = std::min((compression_factor_ / BLOCK_SZ), blocks_to_compress); auto linear_blocks = PrepareWrite(&num_ops, block_index); if (!android::base::ReadFullyAtOffset(target_fd_.get(), buffer.data(), (linear_blocks * BLOCK_SZ), replace_blocks_[block_index] * BLOCK_SZ)) { LOG(ERROR) << "Failed to read at offset: " << replace_blocks_[block_index] * BLOCK_SZ << " size: " << linear_blocks * BLOCK_SZ; return false; } if (!writer_->AddRawBlocks(replace_blocks_[block_index], buffer.data(), linear_blocks * BLOCK_SZ)) { LOG(ERROR) << "AddRawBlocks failed"; return false; } block_index += linear_blocks; blocks_to_compress -= linear_blocks; } if (!writer_->Finalize()) { return false; } return true; } bool CreateSnapshot::WriteOrderedSnapshots() { std::unordered_map<uint64_t, uint64_t> overwritten_blocks; std::vector<std::pair<uint64_t, uint64_t>> merge_sequence; for (auto it = copy_blocks_.begin(); it != copy_blocks_.end(); it++) { if (overwritten_blocks.count(it->second)) { replace_blocks_.push_back(it->first); continue; } overwritten_blocks[it->first] = it->second; merge_sequence.emplace_back(std::make_pair(it->first, it->second)); } // Sort the blocks so that if the blocks are contiguous, it would help // compress multiple blocks in one shot based on the compression factor. std::sort(replace_blocks_.begin(), replace_blocks_.end()); copy_ops_ = merge_sequence.size(); for (auto it = merge_sequence.begin(); it != merge_sequence.end(); it++) { if (!writer_->AddCopy(it->first, it->second, 1)) { return false; } } return true; } bool CreateSnapshot::VerifyMergeOrder() { unique_fd read_fd; read_fd.reset(open(patch_file_.c_str(), O_RDONLY)); if (read_fd < 0) { PLOG(ERROR) << "Failed to open the snapshot-patch file: " << patch_file_; return false; } CowReader reader; if (!reader.Parse(read_fd)) { LOG(ERROR) << "Parse failed"; return false; } if (!reader.VerifyMergeOps()) { LOG(ERROR) << "MergeOps Order is wrong"; return false; } return true; } bool CreateSnapshot::WriteV3Snapshots() { if (!CreateSnapshotWriter()) { return false; } if (!WriteOrderedSnapshots()) { return false; } if (!WriteNonOrderedSnapshots()) { return false; } if (!VerifyMergeOrder()) { return false; } LOG(INFO) << "In-place: " << in_place_ops_ << " Zero: " << zero_ops_ << " Replace: " << replace_ops_ << " copy: " << copy_ops_; return true; } bool CreateSnapshot::ReadBlocks(off_t offset, const int skip_blocks, const uint64_t dev_sz) { Loading Loading @@ -241,10 +392,7 @@ bool CreateSnapshot::ReadBlocks(off_t offset, const int skip_blocks, const uint6 std::string hash = ToHexString(checksum, sizeof(checksum)); if (create_snapshot_patch_) { if (!WriteSnapshot(bufptr, blkindex, hash)) { LOG(ERROR) << "WriteSnapshot failed for block: " << blkindex; return false; } PrepareMergeBlock(bufptr, blkindex, hash); } else { std::lock_guard<std::mutex> lock(source_block_hash_lock_); { Loading Loading @@ -306,8 +454,8 @@ bool CreateSnapshot::ParsePartition() { ret = t.get() && ret; } if (ret && create_snapshot_patch_ && !writer_->Finalize()) { LOG(ERROR) << "Finzalize failed"; if (ret && create_snapshot_patch_ && !WriteV3Snapshots()) { LOG(ERROR) << "Snapshot Write failed"; return false; } Loading Loading
fs_mgr/libsnapshot/libsnapshot_cow/create_cow.cpp +169 −21 Original line number Diff line number Diff line Loading @@ -71,6 +71,8 @@ class CreateSnapshot { const int kNumThreads = 6; const size_t kBlockSizeToRead = 1_MiB; const size_t compression_factor_ = 64_KiB; size_t replace_ops_ = 0, copy_ops_ = 0, zero_ops_ = 0, in_place_ops_ = 0; std::unordered_map<std::string, int> source_block_hash_; std::mutex source_block_hash_lock_; Loading @@ -81,7 +83,12 @@ class CreateSnapshot { std::unique_ptr<uint8_t[]> zblock_; std::string compression_ = "lz4"; unique_fd fd_; unique_fd cow_fd_; unique_fd target_fd_; std::vector<uint64_t> zero_blocks_; std::vector<uint64_t> replace_blocks_; std::unordered_map<uint64_t, uint64_t> copy_blocks_; const int BLOCK_SZ = 4_KiB; void SHA256(const void* data, size_t length, uint8_t out[32]); Loading @@ -93,7 +100,14 @@ class CreateSnapshot { bool FindSourceBlockHash(); bool PrepareParse(std::string& parsing_file, const bool createSnapshot); bool ParsePartition(); bool WriteSnapshot(const void* buffer, uint64_t block, std::string& block_hash); void PrepareMergeBlock(const void* buffer, uint64_t block, std::string& block_hash); bool WriteV3Snapshots(); size_t PrepareWrite(size_t* pending_ops, size_t start_index); bool CreateSnapshotWriter(); bool WriteOrderedSnapshots(); bool WriteNonOrderedSnapshots(); bool VerifyMergeOrder(); }; void CreateSnapshotLogger(android::base::LogId, android::base::LogSeverity severity, const char*, Loading @@ -118,21 +132,19 @@ bool CreateSnapshot::PrepareParse(std::string& parsing_file, const bool createSn create_snapshot_patch_ = createSnapshot; if (createSnapshot) { fd_.reset(open(patch_file_.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666)); if (fd_ < 0) { cow_fd_.reset(open(patch_file_.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666)); if (cow_fd_ < 0) { PLOG(ERROR) << "Failed to open the snapshot-patch file: " << patch_file_; return false; } target_fd_.reset((open(parsing_file_.c_str(), O_RDONLY))); if (target_fd_ < 0) { LOG(ERROR) << "open failed: " << parsing_file_; return false; } zblock_ = std::make_unique<uint8_t[]>(BLOCK_SZ); std::memset(zblock_.get(), 0, BLOCK_SZ); CowOptions options; options.compression = compression_; options.num_compress_threads = 2; options.batch_write = true; options.cluster_ops = 600; writer_ = CreateCowWriter(2, options, std::move(fd_)); } return true; } Loading Loading @@ -187,19 +199,158 @@ std::string CreateSnapshot::ToHexString(const uint8_t* buf, size_t len) { return out; } bool CreateSnapshot::WriteSnapshot(const void* buffer, uint64_t block, std::string& block_hash) { void CreateSnapshot::PrepareMergeBlock(const void* buffer, uint64_t block, std::string& block_hash) { if (std::memcmp(zblock_.get(), buffer, BLOCK_SZ) == 0) { std::lock_guard<std::mutex> lock(write_lock_); return writer_->AddZeroBlocks(block, 1); zero_blocks_.push_back(block); return; } auto iter = source_block_hash_.find(block_hash); if (iter != source_block_hash_.end()) { std::lock_guard<std::mutex> lock(write_lock_); return writer_->AddCopy(block, iter->second, 1); // In-place copy is skipped if (block != iter->second) { copy_blocks_[block] = iter->second; } else { in_place_ops_ += 1; } return; } std::lock_guard<std::mutex> lock(write_lock_); return writer_->AddRawBlocks(block, buffer, BLOCK_SZ); replace_blocks_.push_back(block); } size_t CreateSnapshot::PrepareWrite(size_t* pending_ops, size_t start_index) { size_t num_ops = *pending_ops; uint64_t start_block = replace_blocks_[start_index]; size_t nr_consecutive = 1; num_ops -= 1; while (num_ops) { uint64_t next_block = replace_blocks_[start_index + nr_consecutive]; if (next_block != start_block + nr_consecutive) { break; } nr_consecutive += 1; num_ops -= 1; } return nr_consecutive; } bool CreateSnapshot::CreateSnapshotWriter() { uint64_t dev_sz = lseek(target_fd_.get(), 0, SEEK_END); CowOptions options; options.compression = compression_; options.num_compress_threads = 2; options.batch_write = true; options.cluster_ops = 600; options.compression_factor = compression_factor_; options.max_blocks = {dev_sz / options.block_size}; writer_ = CreateCowWriter(3, options, std::move(cow_fd_)); return true; } bool CreateSnapshot::WriteNonOrderedSnapshots() { zero_ops_ = zero_blocks_.size(); for (auto it = zero_blocks_.begin(); it != zero_blocks_.end(); it++) { if (!writer_->AddZeroBlocks(*it, 1)) { return false; } } std::string buffer(compression_factor_, '\0'); replace_ops_ = replace_blocks_.size(); size_t blocks_to_compress = replace_blocks_.size(); size_t num_ops = 0; size_t block_index = 0; while (blocks_to_compress) { num_ops = std::min((compression_factor_ / BLOCK_SZ), blocks_to_compress); auto linear_blocks = PrepareWrite(&num_ops, block_index); if (!android::base::ReadFullyAtOffset(target_fd_.get(), buffer.data(), (linear_blocks * BLOCK_SZ), replace_blocks_[block_index] * BLOCK_SZ)) { LOG(ERROR) << "Failed to read at offset: " << replace_blocks_[block_index] * BLOCK_SZ << " size: " << linear_blocks * BLOCK_SZ; return false; } if (!writer_->AddRawBlocks(replace_blocks_[block_index], buffer.data(), linear_blocks * BLOCK_SZ)) { LOG(ERROR) << "AddRawBlocks failed"; return false; } block_index += linear_blocks; blocks_to_compress -= linear_blocks; } if (!writer_->Finalize()) { return false; } return true; } bool CreateSnapshot::WriteOrderedSnapshots() { std::unordered_map<uint64_t, uint64_t> overwritten_blocks; std::vector<std::pair<uint64_t, uint64_t>> merge_sequence; for (auto it = copy_blocks_.begin(); it != copy_blocks_.end(); it++) { if (overwritten_blocks.count(it->second)) { replace_blocks_.push_back(it->first); continue; } overwritten_blocks[it->first] = it->second; merge_sequence.emplace_back(std::make_pair(it->first, it->second)); } // Sort the blocks so that if the blocks are contiguous, it would help // compress multiple blocks in one shot based on the compression factor. std::sort(replace_blocks_.begin(), replace_blocks_.end()); copy_ops_ = merge_sequence.size(); for (auto it = merge_sequence.begin(); it != merge_sequence.end(); it++) { if (!writer_->AddCopy(it->first, it->second, 1)) { return false; } } return true; } bool CreateSnapshot::VerifyMergeOrder() { unique_fd read_fd; read_fd.reset(open(patch_file_.c_str(), O_RDONLY)); if (read_fd < 0) { PLOG(ERROR) << "Failed to open the snapshot-patch file: " << patch_file_; return false; } CowReader reader; if (!reader.Parse(read_fd)) { LOG(ERROR) << "Parse failed"; return false; } if (!reader.VerifyMergeOps()) { LOG(ERROR) << "MergeOps Order is wrong"; return false; } return true; } bool CreateSnapshot::WriteV3Snapshots() { if (!CreateSnapshotWriter()) { return false; } if (!WriteOrderedSnapshots()) { return false; } if (!WriteNonOrderedSnapshots()) { return false; } if (!VerifyMergeOrder()) { return false; } LOG(INFO) << "In-place: " << in_place_ops_ << " Zero: " << zero_ops_ << " Replace: " << replace_ops_ << " copy: " << copy_ops_; return true; } bool CreateSnapshot::ReadBlocks(off_t offset, const int skip_blocks, const uint64_t dev_sz) { Loading Loading @@ -241,10 +392,7 @@ bool CreateSnapshot::ReadBlocks(off_t offset, const int skip_blocks, const uint6 std::string hash = ToHexString(checksum, sizeof(checksum)); if (create_snapshot_patch_) { if (!WriteSnapshot(bufptr, blkindex, hash)) { LOG(ERROR) << "WriteSnapshot failed for block: " << blkindex; return false; } PrepareMergeBlock(bufptr, blkindex, hash); } else { std::lock_guard<std::mutex> lock(source_block_hash_lock_); { Loading Loading @@ -306,8 +454,8 @@ bool CreateSnapshot::ParsePartition() { ret = t.get() && ret; } if (ret && create_snapshot_patch_ && !writer_->Finalize()) { LOG(ERROR) << "Finzalize failed"; if (ret && create_snapshot_patch_ && !WriteV3Snapshots()) { LOG(ERROR) << "Snapshot Write failed"; return false; } Loading