Loading fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h +6 −0 Original line number Diff line number Diff line Loading @@ -120,6 +120,12 @@ class CompressWorker { void EnqueueCompressBlocks(const void* buffer, size_t num_blocks); bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf); void Finalize(); static std::basic_string<uint8_t> Compress(CowCompressionAlgorithm compression, const void* data, size_t length); static bool CompressBlocks(CowCompressionAlgorithm compression, size_t block_size, const void* buffer, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data); private: struct CompressWork { Loading fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp +15 −6 Original line number Diff line number Diff line Loading @@ -32,9 +32,13 @@ namespace android { namespace snapshot { std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) { switch (compression_) { return Compress(compression_, data, length); } std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm compression, const void* data, size_t length) { switch (compression) { case kCowCompressGz: { const auto bound = compressBound(length); std::basic_string<uint8_t> buffer(bound, '\0'); Loading Loading @@ -94,17 +98,22 @@ std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t len return buffer; } default: LOG(ERROR) << "unhandled compression type: " << compression_; LOG(ERROR) << "unhandled compression type: " << compression; break; } return {}; } bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data) { return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data); } bool CompressWorker::CompressBlocks(CowCompressionAlgorithm compression, size_t block_size, const void* buffer, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data) { const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer); while (num_blocks) { auto data = Compress(iter, block_size_); auto data = Compress(compression, iter, block_size); if (data.empty()) { PLOG(ERROR) << "CompressBlocks: Compression failed"; return false; Loading @@ -116,7 +125,7 @@ bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, compressed_data->emplace_back(std::move(data)); num_blocks -= 1; iter += block_size_; iter += block_size; } return true; } Loading fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp +21 −5 Original line number Diff line number Diff line Loading @@ -275,6 +275,10 @@ void CowWriter::InitBatchWrites() { } void CowWriter::InitWorkers() { if (num_compress_threads_ <= 1) { LOG(INFO) << "Not creating new threads for compression."; return; } for (int i = 0; i < num_compress_threads_; i++) { auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size); threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get())); Loading Loading @@ -447,6 +451,10 @@ bool CowWriter::CompressBlocks(size_t num_blocks, const void* data) { size_t num_blocks_per_thread = num_blocks / num_threads; const uint8_t* iter = reinterpret_cast<const uint8_t*>(data); compressed_buf_.clear(); if (num_threads <= 1) { return CompressWorker::CompressBlocks(compression_, options_.block_size, data, num_blocks, &compressed_buf_); } // Submit the blocks per thread. The retrieval of // compressed buffers has to be done in the same order. Loading Loading @@ -490,13 +498,12 @@ bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t si while (num_blocks) { size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks)); if (compression_) { if (compression_ && num_compress_threads_ > 1) { if (!CompressBlocks(pending_blocks, iter)) { return false; } buf_iter_ = compressed_buf_.begin(); CHECK(pending_blocks == compressed_buf_.size()); iter += (pending_blocks * header_.block_size); } num_blocks -= pending_blocks; Loading @@ -512,7 +519,17 @@ bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t si } if (compression_) { auto data = [&, this]() { if (num_compress_threads_ > 1) { auto data = std::move(*buf_iter_); buf_iter_++; return data; } else { auto data = CompressWorker::Compress(compression_, iter, header_.block_size); return data; } }(); op.compression = compression_; op.data_length = static_cast<uint16_t>(data.size()); Loading @@ -520,15 +537,14 @@ bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t si PLOG(ERROR) << "AddRawBlocks: write failed"; return false; } buf_iter_++; } else { op.data_length = static_cast<uint16_t>(header_.block_size); if (!WriteOperation(op, iter, header_.block_size)) { PLOG(ERROR) << "AddRawBlocks: write failed"; return false; } iter += header_.block_size; } iter += header_.block_size; i += 1; pending_blocks -= 1; Loading Loading
fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h +6 −0 Original line number Diff line number Diff line Loading @@ -120,6 +120,12 @@ class CompressWorker { void EnqueueCompressBlocks(const void* buffer, size_t num_blocks); bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf); void Finalize(); static std::basic_string<uint8_t> Compress(CowCompressionAlgorithm compression, const void* data, size_t length); static bool CompressBlocks(CowCompressionAlgorithm compression, size_t block_size, const void* buffer, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data); private: struct CompressWork { Loading
fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp +15 −6 Original line number Diff line number Diff line Loading @@ -32,9 +32,13 @@ namespace android { namespace snapshot { std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) { switch (compression_) { return Compress(compression_, data, length); } std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm compression, const void* data, size_t length) { switch (compression) { case kCowCompressGz: { const auto bound = compressBound(length); std::basic_string<uint8_t> buffer(bound, '\0'); Loading Loading @@ -94,17 +98,22 @@ std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t len return buffer; } default: LOG(ERROR) << "unhandled compression type: " << compression_; LOG(ERROR) << "unhandled compression type: " << compression; break; } return {}; } bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data) { return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data); } bool CompressWorker::CompressBlocks(CowCompressionAlgorithm compression, size_t block_size, const void* buffer, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data) { const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer); while (num_blocks) { auto data = Compress(iter, block_size_); auto data = Compress(compression, iter, block_size); if (data.empty()) { PLOG(ERROR) << "CompressBlocks: Compression failed"; return false; Loading @@ -116,7 +125,7 @@ bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, compressed_data->emplace_back(std::move(data)); num_blocks -= 1; iter += block_size_; iter += block_size; } return true; } Loading
fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp +21 −5 Original line number Diff line number Diff line Loading @@ -275,6 +275,10 @@ void CowWriter::InitBatchWrites() { } void CowWriter::InitWorkers() { if (num_compress_threads_ <= 1) { LOG(INFO) << "Not creating new threads for compression."; return; } for (int i = 0; i < num_compress_threads_; i++) { auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size); threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get())); Loading Loading @@ -447,6 +451,10 @@ bool CowWriter::CompressBlocks(size_t num_blocks, const void* data) { size_t num_blocks_per_thread = num_blocks / num_threads; const uint8_t* iter = reinterpret_cast<const uint8_t*>(data); compressed_buf_.clear(); if (num_threads <= 1) { return CompressWorker::CompressBlocks(compression_, options_.block_size, data, num_blocks, &compressed_buf_); } // Submit the blocks per thread. The retrieval of // compressed buffers has to be done in the same order. Loading Loading @@ -490,13 +498,12 @@ bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t si while (num_blocks) { size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks)); if (compression_) { if (compression_ && num_compress_threads_ > 1) { if (!CompressBlocks(pending_blocks, iter)) { return false; } buf_iter_ = compressed_buf_.begin(); CHECK(pending_blocks == compressed_buf_.size()); iter += (pending_blocks * header_.block_size); } num_blocks -= pending_blocks; Loading @@ -512,7 +519,17 @@ bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t si } if (compression_) { auto data = [&, this]() { if (num_compress_threads_ > 1) { auto data = std::move(*buf_iter_); buf_iter_++; return data; } else { auto data = CompressWorker::Compress(compression_, iter, header_.block_size); return data; } }(); op.compression = compression_; op.data_length = static_cast<uint16_t>(data.size()); Loading @@ -520,15 +537,14 @@ bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t si PLOG(ERROR) << "AddRawBlocks: write failed"; return false; } buf_iter_++; } else { op.data_length = static_cast<uint16_t>(header_.block_size); if (!WriteOperation(op, iter, header_.block_size)) { PLOG(ERROR) << "AddRawBlocks: write failed"; return false; } iter += header_.block_size; } iter += header_.block_size; i += 1; pending_blocks -= 1; Loading