Loading fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h +7 −7 Original line number Original line Diff line number Diff line Loading @@ -110,16 +110,17 @@ class ICowWriter { class CompressWorker { class CompressWorker { public: public: CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size); CompressWorker(CowCompression compression, uint32_t block_size); bool RunThread(); bool RunThread(); void EnqueueCompressBlocks(const void* buffer, size_t num_blocks); void EnqueueCompressBlocks(const void* buffer, size_t num_blocks); bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf); bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf); void Finalize(); void Finalize(); static std::basic_string<uint8_t> Compress(CowCompressionAlgorithm compression, static uint32_t GetDefaultCompressionLevel(CowCompressionAlgorithm compression); const void* data, size_t length); static std::basic_string<uint8_t> Compress(CowCompression compression, const void* data, size_t length); static bool CompressBlocks(CowCompressionAlgorithm compression, size_t block_size, static bool CompressBlocks(CowCompression compression, size_t block_size, const void* buffer, const void* buffer, size_t num_blocks, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data); std::vector<std::basic_string<uint8_t>>* compressed_data); private: private: Loading @@ -130,7 +131,7 @@ class CompressWorker { std::vector<std::basic_string<uint8_t>> compressed_data; std::vector<std::basic_string<uint8_t>> compressed_data; }; }; CowCompressionAlgorithm compression_; CowCompression compression_; uint32_t block_size_; uint32_t block_size_; std::queue<CompressWork> work_queue_; std::queue<CompressWork> work_queue_; Loading @@ -139,7 +140,6 @@ class CompressWorker { std::condition_variable cv_; std::condition_variable cv_; bool stopped_ = false; bool stopped_ = false; std::basic_string<uint8_t> Compress(const void* data, size_t length); bool CompressBlocks(const void* buffer, size_t num_blocks, bool CompressBlocks(const void* buffer, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data); std::vector<std::basic_string<uint8_t>>* compressed_data); }; }; Loading fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp +36 −13 Original line number Original line Diff line number Diff line Loading @@ -46,24 +46,47 @@ std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::strin } else if (name == "none" || name.empty()) { } else if (name == "none" || name.empty()) { return {kCowCompressNone}; return {kCowCompressNone}; } else { } else { LOG(ERROR) << "unable to determine default compression algorithm for: " << name; return {}; return {}; } } } } std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) { // 1. Default compression level is determined by compression algorithm return Compress(compression_, data, length); // 2. There might be compatibility issues if a value is changed here, as some older versions of // Android will assume a different compression level, causing cow_size estimation differences that // will lead to OTA failure. Ensure that the device and OTA package use the same compression level // for OTA to succeed. uint32_t CompressWorker::GetDefaultCompressionLevel(CowCompressionAlgorithm compression) { switch (compression) { case kCowCompressGz: { return Z_BEST_COMPRESSION; } case kCowCompressBrotli: { return BROTLI_DEFAULT_QUALITY; } case kCowCompressLz4: { break; } case kCowCompressZstd: { return ZSTD_defaultCLevel(); } case kCowCompressNone: { break; } } return 0; } } std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm compression, std::basic_string<uint8_t> CompressWorker::Compress(CowCompression compression, const void* data, const void* data, size_t length) { size_t length) { switch (compression) { switch (compression.algorithm) { case kCowCompressGz: { case kCowCompressGz: { const auto bound = compressBound(length); const auto bound = compressBound(length); std::basic_string<uint8_t> buffer(bound, '\0'); std::basic_string<uint8_t> buffer(bound, '\0'); uLongf dest_len = bound; uLongf dest_len = bound; auto rv = compress2(buffer.data(), &dest_len, reinterpret_cast<const Bytef*>(data), auto rv = compress2(buffer.data(), &dest_len, reinterpret_cast<const Bytef*>(data), length, Z_BEST_COMPRESSION); length, compression.compression_level); if (rv != Z_OK) { if (rv != Z_OK) { LOG(ERROR) << "compress2 returned: " << rv; LOG(ERROR) << "compress2 returned: " << rv; return {}; return {}; Loading @@ -81,8 +104,8 @@ std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm comp size_t encoded_size = bound; size_t encoded_size = bound; auto rv = BrotliEncoderCompress( auto rv = BrotliEncoderCompress( BROTLI_DEFAULT_QUALITY, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, length, compression.compression_level, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, reinterpret_cast<const uint8_t*>(data), &encoded_size, buffer.data()); length, reinterpret_cast<const uint8_t*>(data), &encoded_size, buffer.data()); if (!rv) { if (!rv) { LOG(ERROR) << "BrotliEncoderCompress failed"; LOG(ERROR) << "BrotliEncoderCompress failed"; return {}; return {}; Loading Loading @@ -117,8 +140,8 @@ std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm comp } } case kCowCompressZstd: { case kCowCompressZstd: { std::basic_string<uint8_t> buffer(ZSTD_compressBound(length), '\0'); std::basic_string<uint8_t> buffer(ZSTD_compressBound(length), '\0'); const auto compressed_size = const auto compressed_size = ZSTD_compress(buffer.data(), buffer.size(), data, length, ZSTD_compress(buffer.data(), buffer.size(), data, length, 0); compression.compression_level); if (compressed_size <= 0) { if (compressed_size <= 0) { LOG(ERROR) << "ZSTD compression failed " << compressed_size; LOG(ERROR) << "ZSTD compression failed " << compressed_size; return {}; return {}; Loading @@ -133,7 +156,7 @@ std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm comp return buffer; return buffer; } } default: default: LOG(ERROR) << "unhandled compression type: " << compression; LOG(ERROR) << "unhandled compression type: " << compression.algorithm; break; break; } } return {}; return {}; Loading @@ -143,7 +166,7 @@ bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data); return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data); } } bool CompressWorker::CompressBlocks(CowCompressionAlgorithm compression, size_t block_size, bool CompressWorker::CompressBlocks(CowCompression compression, size_t block_size, const void* buffer, size_t num_blocks, const void* buffer, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data) { std::vector<std::basic_string<uint8_t>>* compressed_data) { const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer); const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer); Loading Loading @@ -255,7 +278,7 @@ void CompressWorker::Finalize() { cv_.notify_all(); cv_.notify_all(); } } CompressWorker::CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size) CompressWorker::CompressWorker(CowCompression compression, uint32_t block_size) : compression_(compression), block_size_(block_size) {} : compression_(compression), block_size_(block_size) {} } // namespace snapshot } // namespace snapshot Loading fs_mgr/libsnapshot/libsnapshot_cow/test_v2.cpp +13 −1 Original line number Original line Diff line number Diff line Loading @@ -480,7 +480,7 @@ TEST_P(CompressionTest, HorribleStream) { std::string expected = "The quick brown fox jumps over the lazy dog."; std::string expected = "The quick brown fox jumps over the lazy dog."; expected.resize(4096, '\0'); expected.resize(4096, '\0'); auto result = CompressWorker::Compress(*algorithm, expected.data(), expected.size()); auto result = CompressWorker::Compress(compression, expected.data(), expected.size()); ASSERT_FALSE(result.empty()); ASSERT_FALSE(result.empty()); HorribleStream<uint8_t> stream(result); HorribleStream<uint8_t> stream(result); Loading Loading @@ -1409,6 +1409,18 @@ TEST_F(CowTest, RevMergeOpItrTest) { ASSERT_TRUE(iter->AtEnd()); ASSERT_TRUE(iter->AtEnd()); } } TEST_F(CowTest, ParseOptionsTest) { CowOptions options; std::vector<std::pair<std::string, bool>> testcases = { {"gz,4", true}, {"gz,4,4", false}, {"lz4,4", true}, {"brotli,4", true}, {"zstd,4", true}, {"zstd,x", false}, {"zs,4", false}, {"zstd.4", false}}; for (size_t i = 0; i < testcases.size(); i++) { options.compression = testcases[i].first; CowWriterV2 writer(options, GetCowFd()); ASSERT_EQ(writer.Initialize(), testcases[i].second); } } TEST_F(CowTest, LegacyRevMergeOpItrTest) { TEST_F(CowTest, LegacyRevMergeOpItrTest) { CowOptions options; CowOptions options; options.cluster_ops = 5; options.cluster_ops = 5; Loading fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp +27 −8 Original line number Original line Diff line number Diff line Loading @@ -39,6 +39,8 @@ #include <sys/ioctl.h> #include <sys/ioctl.h> #include <unistd.h> #include <unistd.h> #include "android-base/parseint.h" #include "android-base/strings.h" #include "parser_v2.h" #include "parser_v2.h" // The info messages here are spammy, but as useful for update_engine. Disable // The info messages here are spammy, but as useful for update_engine. Disable Loading Loading @@ -119,11 +121,28 @@ void CowWriterV2::SetupHeaders() { } } bool CowWriterV2::ParseOptions() { bool CowWriterV2::ParseOptions() { auto algorithm = CompressionAlgorithmFromString(options_.compression); auto parts = android::base::Split(options_.compression, ","); if (parts.size() > 2) { LOG(ERROR) << "failed to parse compression parameters: invalid argument count: " << parts.size() << " " << options_.compression; return false; } auto algorithm = CompressionAlgorithmFromString(parts[0]); if (!algorithm) { if (!algorithm) { LOG(ERROR) << "unrecognized compression: " << options_.compression; LOG(ERROR) << "unrecognized compression: " << options_.compression; return false; return false; } } if (parts.size() > 1) { if (!android::base::ParseUint(parts[1], &compression_.compression_level)) { LOG(ERROR) << "failed to parse compression level invalid type: " << parts[1]; return false; } } else { compression_.compression_level = CompressWorker::GetDefaultCompressionLevel(algorithm.value()); } compression_.algorithm = *algorithm; compression_.algorithm = *algorithm; if (options_.cluster_ops == 1) { if (options_.cluster_ops == 1) { Loading Loading @@ -165,7 +184,7 @@ void CowWriterV2::InitWorkers() { return; return; } } for (int i = 0; i < num_compress_threads_; i++) { for (int i = 0; i < num_compress_threads_; i++) { auto wt = std::make_unique<CompressWorker>(compression_.algorithm, header_.block_size); auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size); threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get())); threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get())); compress_threads_.push_back(std::move(wt)); compress_threads_.push_back(std::move(wt)); } } Loading Loading @@ -320,8 +339,8 @@ bool CowWriterV2::CompressBlocks(size_t num_blocks, const void* data) { const uint8_t* iter = reinterpret_cast<const uint8_t*>(data); const uint8_t* iter = reinterpret_cast<const uint8_t*>(data); compressed_buf_.clear(); compressed_buf_.clear(); if (num_threads <= 1) { if (num_threads <= 1) { return CompressWorker::CompressBlocks(compression_.algorithm, options_.block_size, data, return CompressWorker::CompressBlocks(compression_, options_.block_size, data, num_blocks, num_blocks, &compressed_buf_); &compressed_buf_); } } // Submit the blocks per thread. The retrieval of // Submit the blocks per thread. The retrieval of Loading Loading @@ -393,8 +412,8 @@ bool CowWriterV2::EmitBlocks(uint64_t new_block_start, const void* data, size_t buf_iter_++; buf_iter_++; return data; return data; } else { } else { auto data = CompressWorker::Compress(compression_.algorithm, iter, auto data = header_.block_size); CompressWorker::Compress(compression_, iter, header_.block_size); return data; return data; } } }(); }(); Loading Loading @@ -507,8 +526,8 @@ bool CowWriterV2::Finalize() { } } } } // Footer should be at the end of a file, so if there is data after the current block, end it // Footer should be at the end of a file, so if there is data after the current block, end // and start a new cluster. // it and start a new cluster. if (cluster_size_ && current_data_size_ > 0) { if (cluster_size_ && current_data_size_ > 0) { EmitCluster(); EmitCluster(); extra_cluster = true; extra_cluster = true; Loading Loading
fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h +7 −7 Original line number Original line Diff line number Diff line Loading @@ -110,16 +110,17 @@ class ICowWriter { class CompressWorker { class CompressWorker { public: public: CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size); CompressWorker(CowCompression compression, uint32_t block_size); bool RunThread(); bool RunThread(); void EnqueueCompressBlocks(const void* buffer, size_t num_blocks); void EnqueueCompressBlocks(const void* buffer, size_t num_blocks); bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf); bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf); void Finalize(); void Finalize(); static std::basic_string<uint8_t> Compress(CowCompressionAlgorithm compression, static uint32_t GetDefaultCompressionLevel(CowCompressionAlgorithm compression); const void* data, size_t length); static std::basic_string<uint8_t> Compress(CowCompression compression, const void* data, size_t length); static bool CompressBlocks(CowCompressionAlgorithm compression, size_t block_size, static bool CompressBlocks(CowCompression compression, size_t block_size, const void* buffer, const void* buffer, size_t num_blocks, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data); std::vector<std::basic_string<uint8_t>>* compressed_data); private: private: Loading @@ -130,7 +131,7 @@ class CompressWorker { std::vector<std::basic_string<uint8_t>> compressed_data; std::vector<std::basic_string<uint8_t>> compressed_data; }; }; CowCompressionAlgorithm compression_; CowCompression compression_; uint32_t block_size_; uint32_t block_size_; std::queue<CompressWork> work_queue_; std::queue<CompressWork> work_queue_; Loading @@ -139,7 +140,6 @@ class CompressWorker { std::condition_variable cv_; std::condition_variable cv_; bool stopped_ = false; bool stopped_ = false; std::basic_string<uint8_t> Compress(const void* data, size_t length); bool CompressBlocks(const void* buffer, size_t num_blocks, bool CompressBlocks(const void* buffer, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data); std::vector<std::basic_string<uint8_t>>* compressed_data); }; }; Loading
fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp +36 −13 Original line number Original line Diff line number Diff line Loading @@ -46,24 +46,47 @@ std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::strin } else if (name == "none" || name.empty()) { } else if (name == "none" || name.empty()) { return {kCowCompressNone}; return {kCowCompressNone}; } else { } else { LOG(ERROR) << "unable to determine default compression algorithm for: " << name; return {}; return {}; } } } } std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) { // 1. Default compression level is determined by compression algorithm return Compress(compression_, data, length); // 2. There might be compatibility issues if a value is changed here, as some older versions of // Android will assume a different compression level, causing cow_size estimation differences that // will lead to OTA failure. Ensure that the device and OTA package use the same compression level // for OTA to succeed. uint32_t CompressWorker::GetDefaultCompressionLevel(CowCompressionAlgorithm compression) { switch (compression) { case kCowCompressGz: { return Z_BEST_COMPRESSION; } case kCowCompressBrotli: { return BROTLI_DEFAULT_QUALITY; } case kCowCompressLz4: { break; } case kCowCompressZstd: { return ZSTD_defaultCLevel(); } case kCowCompressNone: { break; } } return 0; } } std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm compression, std::basic_string<uint8_t> CompressWorker::Compress(CowCompression compression, const void* data, const void* data, size_t length) { size_t length) { switch (compression) { switch (compression.algorithm) { case kCowCompressGz: { case kCowCompressGz: { const auto bound = compressBound(length); const auto bound = compressBound(length); std::basic_string<uint8_t> buffer(bound, '\0'); std::basic_string<uint8_t> buffer(bound, '\0'); uLongf dest_len = bound; uLongf dest_len = bound; auto rv = compress2(buffer.data(), &dest_len, reinterpret_cast<const Bytef*>(data), auto rv = compress2(buffer.data(), &dest_len, reinterpret_cast<const Bytef*>(data), length, Z_BEST_COMPRESSION); length, compression.compression_level); if (rv != Z_OK) { if (rv != Z_OK) { LOG(ERROR) << "compress2 returned: " << rv; LOG(ERROR) << "compress2 returned: " << rv; return {}; return {}; Loading @@ -81,8 +104,8 @@ std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm comp size_t encoded_size = bound; size_t encoded_size = bound; auto rv = BrotliEncoderCompress( auto rv = BrotliEncoderCompress( BROTLI_DEFAULT_QUALITY, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, length, compression.compression_level, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, reinterpret_cast<const uint8_t*>(data), &encoded_size, buffer.data()); length, reinterpret_cast<const uint8_t*>(data), &encoded_size, buffer.data()); if (!rv) { if (!rv) { LOG(ERROR) << "BrotliEncoderCompress failed"; LOG(ERROR) << "BrotliEncoderCompress failed"; return {}; return {}; Loading Loading @@ -117,8 +140,8 @@ std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm comp } } case kCowCompressZstd: { case kCowCompressZstd: { std::basic_string<uint8_t> buffer(ZSTD_compressBound(length), '\0'); std::basic_string<uint8_t> buffer(ZSTD_compressBound(length), '\0'); const auto compressed_size = const auto compressed_size = ZSTD_compress(buffer.data(), buffer.size(), data, length, ZSTD_compress(buffer.data(), buffer.size(), data, length, 0); compression.compression_level); if (compressed_size <= 0) { if (compressed_size <= 0) { LOG(ERROR) << "ZSTD compression failed " << compressed_size; LOG(ERROR) << "ZSTD compression failed " << compressed_size; return {}; return {}; Loading @@ -133,7 +156,7 @@ std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm comp return buffer; return buffer; } } default: default: LOG(ERROR) << "unhandled compression type: " << compression; LOG(ERROR) << "unhandled compression type: " << compression.algorithm; break; break; } } return {}; return {}; Loading @@ -143,7 +166,7 @@ bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data); return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data); } } bool CompressWorker::CompressBlocks(CowCompressionAlgorithm compression, size_t block_size, bool CompressWorker::CompressBlocks(CowCompression compression, size_t block_size, const void* buffer, size_t num_blocks, const void* buffer, size_t num_blocks, std::vector<std::basic_string<uint8_t>>* compressed_data) { std::vector<std::basic_string<uint8_t>>* compressed_data) { const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer); const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer); Loading Loading @@ -255,7 +278,7 @@ void CompressWorker::Finalize() { cv_.notify_all(); cv_.notify_all(); } } CompressWorker::CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size) CompressWorker::CompressWorker(CowCompression compression, uint32_t block_size) : compression_(compression), block_size_(block_size) {} : compression_(compression), block_size_(block_size) {} } // namespace snapshot } // namespace snapshot Loading
fs_mgr/libsnapshot/libsnapshot_cow/test_v2.cpp +13 −1 Original line number Original line Diff line number Diff line Loading @@ -480,7 +480,7 @@ TEST_P(CompressionTest, HorribleStream) { std::string expected = "The quick brown fox jumps over the lazy dog."; std::string expected = "The quick brown fox jumps over the lazy dog."; expected.resize(4096, '\0'); expected.resize(4096, '\0'); auto result = CompressWorker::Compress(*algorithm, expected.data(), expected.size()); auto result = CompressWorker::Compress(compression, expected.data(), expected.size()); ASSERT_FALSE(result.empty()); ASSERT_FALSE(result.empty()); HorribleStream<uint8_t> stream(result); HorribleStream<uint8_t> stream(result); Loading Loading @@ -1409,6 +1409,18 @@ TEST_F(CowTest, RevMergeOpItrTest) { ASSERT_TRUE(iter->AtEnd()); ASSERT_TRUE(iter->AtEnd()); } } TEST_F(CowTest, ParseOptionsTest) { CowOptions options; std::vector<std::pair<std::string, bool>> testcases = { {"gz,4", true}, {"gz,4,4", false}, {"lz4,4", true}, {"brotli,4", true}, {"zstd,4", true}, {"zstd,x", false}, {"zs,4", false}, {"zstd.4", false}}; for (size_t i = 0; i < testcases.size(); i++) { options.compression = testcases[i].first; CowWriterV2 writer(options, GetCowFd()); ASSERT_EQ(writer.Initialize(), testcases[i].second); } } TEST_F(CowTest, LegacyRevMergeOpItrTest) { TEST_F(CowTest, LegacyRevMergeOpItrTest) { CowOptions options; CowOptions options; options.cluster_ops = 5; options.cluster_ops = 5; Loading
fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp +27 −8 Original line number Original line Diff line number Diff line Loading @@ -39,6 +39,8 @@ #include <sys/ioctl.h> #include <sys/ioctl.h> #include <unistd.h> #include <unistd.h> #include "android-base/parseint.h" #include "android-base/strings.h" #include "parser_v2.h" #include "parser_v2.h" // The info messages here are spammy, but as useful for update_engine. Disable // The info messages here are spammy, but as useful for update_engine. Disable Loading Loading @@ -119,11 +121,28 @@ void CowWriterV2::SetupHeaders() { } } bool CowWriterV2::ParseOptions() { bool CowWriterV2::ParseOptions() { auto algorithm = CompressionAlgorithmFromString(options_.compression); auto parts = android::base::Split(options_.compression, ","); if (parts.size() > 2) { LOG(ERROR) << "failed to parse compression parameters: invalid argument count: " << parts.size() << " " << options_.compression; return false; } auto algorithm = CompressionAlgorithmFromString(parts[0]); if (!algorithm) { if (!algorithm) { LOG(ERROR) << "unrecognized compression: " << options_.compression; LOG(ERROR) << "unrecognized compression: " << options_.compression; return false; return false; } } if (parts.size() > 1) { if (!android::base::ParseUint(parts[1], &compression_.compression_level)) { LOG(ERROR) << "failed to parse compression level invalid type: " << parts[1]; return false; } } else { compression_.compression_level = CompressWorker::GetDefaultCompressionLevel(algorithm.value()); } compression_.algorithm = *algorithm; compression_.algorithm = *algorithm; if (options_.cluster_ops == 1) { if (options_.cluster_ops == 1) { Loading Loading @@ -165,7 +184,7 @@ void CowWriterV2::InitWorkers() { return; return; } } for (int i = 0; i < num_compress_threads_; i++) { for (int i = 0; i < num_compress_threads_; i++) { auto wt = std::make_unique<CompressWorker>(compression_.algorithm, header_.block_size); auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size); threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get())); threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get())); compress_threads_.push_back(std::move(wt)); compress_threads_.push_back(std::move(wt)); } } Loading Loading @@ -320,8 +339,8 @@ bool CowWriterV2::CompressBlocks(size_t num_blocks, const void* data) { const uint8_t* iter = reinterpret_cast<const uint8_t*>(data); const uint8_t* iter = reinterpret_cast<const uint8_t*>(data); compressed_buf_.clear(); compressed_buf_.clear(); if (num_threads <= 1) { if (num_threads <= 1) { return CompressWorker::CompressBlocks(compression_.algorithm, options_.block_size, data, return CompressWorker::CompressBlocks(compression_, options_.block_size, data, num_blocks, num_blocks, &compressed_buf_); &compressed_buf_); } } // Submit the blocks per thread. The retrieval of // Submit the blocks per thread. The retrieval of Loading Loading @@ -393,8 +412,8 @@ bool CowWriterV2::EmitBlocks(uint64_t new_block_start, const void* data, size_t buf_iter_++; buf_iter_++; return data; return data; } else { } else { auto data = CompressWorker::Compress(compression_.algorithm, iter, auto data = header_.block_size); CompressWorker::Compress(compression_, iter, header_.block_size); return data; return data; } } }(); }(); Loading Loading @@ -507,8 +526,8 @@ bool CowWriterV2::Finalize() { } } } } // Footer should be at the end of a file, so if there is data after the current block, end it // Footer should be at the end of a file, so if there is data after the current block, end // and start a new cluster. // it and start a new cluster. if (cluster_size_ && current_data_size_ > 0) { if (cluster_size_ && current_data_size_ > 0) { EmitCluster(); EmitCluster(); extra_cluster = true; extra_cluster = true; Loading