Loading fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h +1 −1 Original line number Original line Diff line number Diff line Loading @@ -45,7 +45,7 @@ struct CowOptions { std::optional<uint64_t> max_blocks; std::optional<uint64_t> max_blocks; // Number of CowOperations in a cluster. 0 for no clustering. Cannot be 1. // Number of CowOperations in a cluster. 0 for no clustering. Cannot be 1. uint32_t cluster_ops = 200; uint32_t cluster_ops = 1024; bool scratch_space = true; bool scratch_space = true; Loading fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp +23 −14 Original line number Original line Diff line number Diff line Loading @@ -301,6 +301,14 @@ bool CowWriterV3::CheckOpCount(size_t op_count) { return true; return true; } } size_t CowWriterV3::CachedDataSize() const { size_t size = 0; for (const auto& i : cached_data_) { size += i.size(); } return size; } bool CowWriterV3::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_blocks) { bool CowWriterV3::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_blocks) { if (!CheckOpCount(num_blocks)) { if (!CheckOpCount(num_blocks)) { return false; return false; Loading Loading @@ -333,7 +341,7 @@ bool CowWriterV3::NeedsFlush() const { // Allow bigger batch sizes for ops without data. A single CowOperationV3 // Allow bigger batch sizes for ops without data. A single CowOperationV3 // struct uses 14 bytes of memory, even if we cache 200 * 16 ops in memory, // struct uses 14 bytes of memory, even if we cache 200 * 16 ops in memory, // it's only ~44K. // it's only ~44K. return cached_data_.size() >= batch_size_ || return CachedDataSize() >= batch_size_ * header_.block_size || cached_ops_.size() >= batch_size_ * kNonDataOpBufferSize; cached_ops_.size() >= batch_size_ * kNonDataOpBufferSize; } } Loading Loading @@ -388,13 +396,13 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t return false; return false; } } const auto bytes = reinterpret_cast<const uint8_t*>(data); const auto bytes = reinterpret_cast<const uint8_t*>(data); const size_t num_blocks = (size / header_.block_size); size_t num_blocks = (size / header_.block_size); for (size_t i = 0; i < num_blocks;) { size_t total_written = 0; const size_t blocks_to_write = while (total_written < num_blocks) { std::min<size_t>(batch_size_ - cached_data_.size(), num_blocks - i); size_t chunk = std::min(num_blocks - total_written, batch_size_); if (!ConstructCowOpCompressedBuffers(new_block_start + total_written, if (!ConstructCowOpCompressedBuffers(new_block_start + i, bytes + header_.block_size * i, bytes + header_.block_size * total_written, old_block + i, offset, type, blocks_to_write)) { old_block + total_written, offset, type, chunk)) { return false; return false; } } Loading @@ -404,8 +412,7 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t << ", op type: " << type; << ", op type: " << type; return false; return false; } } total_written += chunk; i += blocks_to_write; } } return true; return true; Loading Loading @@ -473,7 +480,8 @@ bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) { header_.sequence_data_count = num_ops; header_.sequence_data_count = num_ops; // Ensure next_data_pos_ is updated as previously initialized + the newly added sequence buffer. // Ensure next_data_pos_ is updated as previously initialized + the newly added sequence // buffer. CHECK_EQ(next_data_pos_ + header_.sequence_data_count * sizeof(uint32_t), CHECK_EQ(next_data_pos_ + header_.sequence_data_count * sizeof(uint32_t), GetDataOffset(header_)); GetDataOffset(header_)); next_data_pos_ = GetDataOffset(header_); next_data_pos_ = GetDataOffset(header_); Loading Loading @@ -631,8 +639,8 @@ std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreade // t1 t2 t1 t2 <- processed by these threads // t1 t2 t1 t2 <- processed by these threads // Ordering is important here. We need to retrieve the compressed data in the same order we // Ordering is important here. We need to retrieve the compressed data in the same order we // processed it and assume that that we submit data beginning with the first thread and then // processed it and assume that that we submit data beginning with the first thread and then // round robin the consecutive data calls. We need to Fetch compressed buffers from the threads // round robin the consecutive data calls. We need to Fetch compressed buffers from the // via the same ordering // threads via the same ordering for (size_t i = 0; i < compressed_vec.size(); i++) { for (size_t i = 0; i < compressed_vec.size(); i++) { compressed_buf.emplace_back(worker_buffers[i % num_threads][i / num_threads]); compressed_buf.emplace_back(worker_buffers[i % num_threads][i / num_threads]); } } Loading Loading @@ -724,7 +732,8 @@ bool CowWriterV3::WriteOperation(std::span<const CowOperationV3> ops, } } if (total_written != total_data_size) { if (total_written != total_data_size) { PLOG(ERROR) << "write failed for data of size: " << data.size() PLOG(ERROR) << "write failed for data of size: " << data.size() << " at offset: " << next_data_pos_ << " " << errno; << " at offset: " << next_data_pos_ << " " << errno << ", only wrote: " << total_written; return false; return false; } } } } Loading fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h +1 −1 Original line number Original line Diff line number Diff line Loading @@ -94,7 +94,7 @@ class CowWriterV3 : public CowWriterBase { } } return false; return false; } } size_t CachedDataSize() const; bool ReadBackVerification(); bool ReadBackVerification(); bool FlushCacheOps(); bool FlushCacheOps(); void InitWorkers(); void InitWorkers(); Loading Loading
fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h +1 −1 Original line number Original line Diff line number Diff line Loading @@ -45,7 +45,7 @@ struct CowOptions { std::optional<uint64_t> max_blocks; std::optional<uint64_t> max_blocks; // Number of CowOperations in a cluster. 0 for no clustering. Cannot be 1. // Number of CowOperations in a cluster. 0 for no clustering. Cannot be 1. uint32_t cluster_ops = 200; uint32_t cluster_ops = 1024; bool scratch_space = true; bool scratch_space = true; Loading
fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp +23 −14 Original line number Original line Diff line number Diff line Loading @@ -301,6 +301,14 @@ bool CowWriterV3::CheckOpCount(size_t op_count) { return true; return true; } } size_t CowWriterV3::CachedDataSize() const { size_t size = 0; for (const auto& i : cached_data_) { size += i.size(); } return size; } bool CowWriterV3::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_blocks) { bool CowWriterV3::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_blocks) { if (!CheckOpCount(num_blocks)) { if (!CheckOpCount(num_blocks)) { return false; return false; Loading Loading @@ -333,7 +341,7 @@ bool CowWriterV3::NeedsFlush() const { // Allow bigger batch sizes for ops without data. A single CowOperationV3 // Allow bigger batch sizes for ops without data. A single CowOperationV3 // struct uses 14 bytes of memory, even if we cache 200 * 16 ops in memory, // struct uses 14 bytes of memory, even if we cache 200 * 16 ops in memory, // it's only ~44K. // it's only ~44K. return cached_data_.size() >= batch_size_ || return CachedDataSize() >= batch_size_ * header_.block_size || cached_ops_.size() >= batch_size_ * kNonDataOpBufferSize; cached_ops_.size() >= batch_size_ * kNonDataOpBufferSize; } } Loading Loading @@ -388,13 +396,13 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t return false; return false; } } const auto bytes = reinterpret_cast<const uint8_t*>(data); const auto bytes = reinterpret_cast<const uint8_t*>(data); const size_t num_blocks = (size / header_.block_size); size_t num_blocks = (size / header_.block_size); for (size_t i = 0; i < num_blocks;) { size_t total_written = 0; const size_t blocks_to_write = while (total_written < num_blocks) { std::min<size_t>(batch_size_ - cached_data_.size(), num_blocks - i); size_t chunk = std::min(num_blocks - total_written, batch_size_); if (!ConstructCowOpCompressedBuffers(new_block_start + total_written, if (!ConstructCowOpCompressedBuffers(new_block_start + i, bytes + header_.block_size * i, bytes + header_.block_size * total_written, old_block + i, offset, type, blocks_to_write)) { old_block + total_written, offset, type, chunk)) { return false; return false; } } Loading @@ -404,8 +412,7 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t << ", op type: " << type; << ", op type: " << type; return false; return false; } } total_written += chunk; i += blocks_to_write; } } return true; return true; Loading Loading @@ -473,7 +480,8 @@ bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) { header_.sequence_data_count = num_ops; header_.sequence_data_count = num_ops; // Ensure next_data_pos_ is updated as previously initialized + the newly added sequence buffer. // Ensure next_data_pos_ is updated as previously initialized + the newly added sequence // buffer. CHECK_EQ(next_data_pos_ + header_.sequence_data_count * sizeof(uint32_t), CHECK_EQ(next_data_pos_ + header_.sequence_data_count * sizeof(uint32_t), GetDataOffset(header_)); GetDataOffset(header_)); next_data_pos_ = GetDataOffset(header_); next_data_pos_ = GetDataOffset(header_); Loading Loading @@ -631,8 +639,8 @@ std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreade // t1 t2 t1 t2 <- processed by these threads // t1 t2 t1 t2 <- processed by these threads // Ordering is important here. We need to retrieve the compressed data in the same order we // Ordering is important here. We need to retrieve the compressed data in the same order we // processed it and assume that that we submit data beginning with the first thread and then // processed it and assume that that we submit data beginning with the first thread and then // round robin the consecutive data calls. We need to Fetch compressed buffers from the threads // round robin the consecutive data calls. We need to Fetch compressed buffers from the // via the same ordering // threads via the same ordering for (size_t i = 0; i < compressed_vec.size(); i++) { for (size_t i = 0; i < compressed_vec.size(); i++) { compressed_buf.emplace_back(worker_buffers[i % num_threads][i / num_threads]); compressed_buf.emplace_back(worker_buffers[i % num_threads][i / num_threads]); } } Loading Loading @@ -724,7 +732,8 @@ bool CowWriterV3::WriteOperation(std::span<const CowOperationV3> ops, } } if (total_written != total_data_size) { if (total_written != total_data_size) { PLOG(ERROR) << "write failed for data of size: " << data.size() PLOG(ERROR) << "write failed for data of size: " << data.size() << " at offset: " << next_data_pos_ << " " << errno; << " at offset: " << next_data_pos_ << " " << errno << ", only wrote: " << total_written; return false; return false; } } } } Loading
fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h +1 −1 Original line number Original line Diff line number Diff line Loading @@ -94,7 +94,7 @@ class CowWriterV3 : public CowWriterBase { } } return false; return false; } } size_t CachedDataSize() const; bool ReadBackVerification(); bool ReadBackVerification(); bool FlushCacheOps(); bool FlushCacheOps(); void InitWorkers(); void InitWorkers(); Loading