Loading fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp +29 −23 Original line number Diff line number Diff line Loading @@ -603,41 +603,47 @@ std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithCompres std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreadedCompression( const size_t num_blocks, const void* data, CowOperationType type) { const size_t num_threads = num_compress_threads_; const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads); const uint8_t* iter = reinterpret_cast<const uint8_t*>(data); // We will alternate which thread to send compress work to. E.g. alternate between T1 and T2 // until all blocks are processed std::vector<CompressedBuffer> compressed_vec; // Submit the blocks per thread. The retrieval of // compressed buffers has to be done in the same order. // We should not poll for completed buffers in a different order as the // buffers are tightly coupled with block ordering. for (size_t i = 0; i < num_threads; i++) { CompressWorker* worker = compress_threads_[i].get(); auto blocks_in_batch = std::min(num_blocks - i * blocks_per_thread, blocks_per_thread); // Enqueue the blocks to be compressed for each thread. while (blocks_in_batch) { int iteration = 0; int blocks_to_compress = static_cast<int>(num_blocks); while (blocks_to_compress) { CompressedBuffer buffer; CompressWorker* worker = compress_threads_[iteration % num_threads].get(); const size_t compression_factor = GetCompressionFactor(blocks_in_batch, type); const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type); size_t num_blocks = compression_factor / header_.block_size; buffer.compression_factor = compression_factor; worker->EnqueueCompressBlocks(iter, compression_factor, 1); buffer.compression_factor = compression_factor; compressed_vec.push_back(std::move(buffer)); blocks_in_batch -= num_blocks; iteration++; iter += compression_factor; } blocks_to_compress -= num_blocks; } // Fetch compressed buffers from the threads std::vector<std::vector<uint8_t>> compressed_buf; std::vector<std::vector<std::vector<uint8_t>>> worker_buffers(num_threads); compressed_buf.clear(); for (size_t i = 0; i < num_threads; i++) { CompressWorker* worker = compress_threads_[i].get(); if (!worker->GetCompressedBuffers(&compressed_buf)) { if (!worker->GetCompressedBuffers(&worker_buffers[i])) { return {}; } } // compressed_vec | CB 1 | CB 2 | CB 3 | CB 4 | <-compressed buffers // t1 t2 t1 t2 <- processed by these threads // Ordering is important here. We need to retrieve the compressed data in the same order we // processed it and assume that that we submit data beginning with the first thread and then // round robin the consecutive data calls. We need to Fetch compressed buffers from the threads // via the same ordering for (size_t i = 0; i < compressed_vec.size(); i++) { compressed_buf.emplace_back(worker_buffers[i % num_threads][i / num_threads]); } if (compressed_vec.size() != compressed_buf.size()) { LOG(ERROR) << "Compressed buffer size: " << compressed_buf.size() Loading Loading
fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp +29 −23 Original line number Diff line number Diff line Loading @@ -603,41 +603,47 @@ std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithCompres std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreadedCompression( const size_t num_blocks, const void* data, CowOperationType type) { const size_t num_threads = num_compress_threads_; const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads); const uint8_t* iter = reinterpret_cast<const uint8_t*>(data); // We will alternate which thread to send compress work to. E.g. alternate between T1 and T2 // until all blocks are processed std::vector<CompressedBuffer> compressed_vec; // Submit the blocks per thread. The retrieval of // compressed buffers has to be done in the same order. // We should not poll for completed buffers in a different order as the // buffers are tightly coupled with block ordering. for (size_t i = 0; i < num_threads; i++) { CompressWorker* worker = compress_threads_[i].get(); auto blocks_in_batch = std::min(num_blocks - i * blocks_per_thread, blocks_per_thread); // Enqueue the blocks to be compressed for each thread. while (blocks_in_batch) { int iteration = 0; int blocks_to_compress = static_cast<int>(num_blocks); while (blocks_to_compress) { CompressedBuffer buffer; CompressWorker* worker = compress_threads_[iteration % num_threads].get(); const size_t compression_factor = GetCompressionFactor(blocks_in_batch, type); const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type); size_t num_blocks = compression_factor / header_.block_size; buffer.compression_factor = compression_factor; worker->EnqueueCompressBlocks(iter, compression_factor, 1); buffer.compression_factor = compression_factor; compressed_vec.push_back(std::move(buffer)); blocks_in_batch -= num_blocks; iteration++; iter += compression_factor; } blocks_to_compress -= num_blocks; } // Fetch compressed buffers from the threads std::vector<std::vector<uint8_t>> compressed_buf; std::vector<std::vector<std::vector<uint8_t>>> worker_buffers(num_threads); compressed_buf.clear(); for (size_t i = 0; i < num_threads; i++) { CompressWorker* worker = compress_threads_[i].get(); if (!worker->GetCompressedBuffers(&compressed_buf)) { if (!worker->GetCompressedBuffers(&worker_buffers[i])) { return {}; } } // compressed_vec | CB 1 | CB 2 | CB 3 | CB 4 | <-compressed buffers // t1 t2 t1 t2 <- processed by these threads // Ordering is important here. We need to retrieve the compressed data in the same order we // processed it and assume that that we submit data beginning with the first thread and then // round robin the consecutive data calls. We need to Fetch compressed buffers from the threads // via the same ordering for (size_t i = 0; i < compressed_vec.size(); i++) { compressed_buf.emplace_back(worker_buffers[i % num_threads][i / num_threads]); } if (compressed_vec.size() != compressed_buf.size()) { LOG(ERROR) << "Compressed buffer size: " << compressed_buf.size() Loading