diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h index a9682a17563bcce6453ee433e3ab5a9db8d7658b..798bc7345e9bdf5627fa7aea7a20bdca311d39a8 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h @@ -16,10 +16,17 @@ #include +#include #include +#include #include +#include #include +#include #include +#include +#include +#include #include #include @@ -42,6 +49,12 @@ struct CowOptions { // Preset the number of merged ops. Only useful for testing. uint64_t num_merge_ops = 0; + + // Number of threads for compression + int num_compress_threads = 0; + + // Batch write cluster ops + bool batch_write = false; }; // Interface for writing to a snapuserd COW. All operations are ordered; merges @@ -100,9 +113,40 @@ class ICowWriter { CowOptions options_; }; +class CompressWorker { + public: + CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size); + bool RunThread(); + void EnqueueCompressBlocks(const void* buffer, size_t num_blocks); + bool GetCompressedBuffers(std::vector>* compressed_buf); + void Finalize(); + + private: + struct CompressWork { + const void* buffer; + size_t num_blocks; + bool compression_status = false; + std::vector> compressed_data; + }; + + CowCompressionAlgorithm compression_; + uint32_t block_size_; + + std::queue work_queue_; + std::queue compressed_queue_; + std::mutex lock_; + std::condition_variable cv_; + bool stopped_ = false; + + std::basic_string Compress(const void* data, size_t length); + bool CompressBlocks(const void* buffer, size_t num_blocks, + std::vector>* compressed_data); +}; + class CowWriter : public ICowWriter { public: explicit CowWriter(const CowOptions& options); + ~CowWriter(); // Set up the writer. // The file starts from the beginning. @@ -138,6 +182,7 @@ class CowWriter : public ICowWriter { bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block, uint16_t offset, uint8_t type); void SetupHeaders(); + void SetupWriteOptions(); bool ParseOptions(); bool OpenForWrite(); bool OpenForAppend(uint64_t label); @@ -145,9 +190,12 @@ class CowWriter : public ICowWriter { bool WriteRawData(const void* data, size_t size); bool WriteOperation(const CowOperation& op, const void* data = nullptr, size_t size = 0); void AddOperation(const CowOperation& op); - std::basic_string Compress(const void* data, size_t length); void InitPos(); + void InitBatchWrites(); + void InitWorkers(); + bool FlushCluster(); + bool CompressBlocks(size_t num_blocks, const void* data); bool SetFd(android::base::borrowed_fd fd); bool Sync(); bool Truncate(off_t length); @@ -159,8 +207,11 @@ class CowWriter : public ICowWriter { CowHeader header_{}; CowFooter footer_{}; CowCompressionAlgorithm compression_ = kCowCompressNone; + uint64_t current_op_pos_ = 0; uint64_t next_op_pos_ = 0; uint64_t next_data_pos_ = 0; + uint64_t current_data_pos_ = 0; + ssize_t total_data_written_ = 0; uint32_t cluster_size_ = 0; uint32_t current_cluster_size_ = 0; uint64_t current_data_size_ = 0; @@ -168,6 +219,21 @@ class CowWriter : public ICowWriter { bool merge_in_progress_ = false; bool is_block_device_ = false; uint64_t cow_image_size_ = INT64_MAX; + + int num_compress_threads_ = 1; + std::vector> compress_threads_; + std::vector> threads_; + std::vector> compressed_buf_; + std::vector>::iterator buf_iter_; + + std::vector> opbuffer_vec_; + std::vector> databuffer_vec_; + std::unique_ptr cowop_vec_; + int op_vec_index_ = 0; + + std::unique_ptr data_vec_; + int data_vec_index_ = 0; + bool batch_write_ = false; }; } // namespace snapshot diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp index 2c1187fd2bf89f87ad518ca403749b8eac77b135..862ce5511ea71b0220a1755c6a3d3f3b075aeac4 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp @@ -298,6 +298,150 @@ TEST_F(CowTest, CompressGz) { ASSERT_TRUE(iter->Done()); } +class CompressionRWTest : public CowTest, public testing::WithParamInterface {}; + +TEST_P(CompressionRWTest, ThreadedBatchWrites) { + CowOptions options; + options.compression = GetParam(); + options.num_compress_threads = 2; + + CowWriter writer(options); + + ASSERT_TRUE(writer.Initialize(cow_->fd)); + + std::string xor_data = "This is test data-1. Testing xor"; + xor_data.resize(options.block_size, '\0'); + ASSERT_TRUE(writer.AddXorBlocks(50, xor_data.data(), xor_data.size(), 24, 10)); + + std::string data = "This is test data-2. Testing replace ops"; + data.resize(options.block_size * 2048, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(100, data.data(), data.size())); + + std::string data2 = "This is test data-3. Testing replace ops"; + data2.resize(options.block_size * 259, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(6000, data2.data(), data2.size())); + + std::string data3 = "This is test data-4. Testing replace ops"; + data3.resize(options.block_size, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(9000, data3.data(), data3.size())); + + ASSERT_TRUE(writer.Finalize()); + + int expected_blocks = (1 + 2048 + 259 + 1); + ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0); + + CowReader reader; + ASSERT_TRUE(reader.Parse(cow_->fd)); + + auto iter = reader.GetOpIter(); + ASSERT_NE(iter, nullptr); + + int total_blocks = 0; + while (!iter->Done()) { + auto op = &iter->Get(); + + if (op->type == kCowXorOp) { + total_blocks += 1; + StringSink sink; + ASSERT_EQ(op->new_block, 50); + ASSERT_EQ(op->source, 98314); // 4096 * 24 + 10 + ASSERT_TRUE(reader.ReadData(*op, &sink)); + ASSERT_EQ(sink.stream(), xor_data); + } + + if (op->type == kCowReplaceOp) { + total_blocks += 1; + if (op->new_block == 100) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + data.resize(options.block_size); + ASSERT_EQ(sink.stream(), data); + } + if (op->new_block == 6000) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + data2.resize(options.block_size); + ASSERT_EQ(sink.stream(), data2); + } + if (op->new_block == 9000) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + ASSERT_EQ(sink.stream(), data3); + } + } + + iter->Next(); + } + + ASSERT_EQ(total_blocks, expected_blocks); +} + +TEST_P(CompressionRWTest, NoBatchWrites) { + CowOptions options; + options.compression = GetParam(); + options.num_compress_threads = 1; + options.cluster_ops = 0; + + CowWriter writer(options); + + ASSERT_TRUE(writer.Initialize(cow_->fd)); + + std::string data = "Testing replace ops without batch writes"; + data.resize(options.block_size * 1024, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(50, data.data(), data.size())); + + std::string data2 = "Testing odd blocks without batch writes"; + data2.resize(options.block_size * 111, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(3000, data2.data(), data2.size())); + + std::string data3 = "Testing single 4k block"; + data3.resize(options.block_size, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(5000, data3.data(), data3.size())); + + ASSERT_TRUE(writer.Finalize()); + + int expected_blocks = (1024 + 111 + 1); + ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0); + + CowReader reader; + ASSERT_TRUE(reader.Parse(cow_->fd)); + + auto iter = reader.GetOpIter(); + ASSERT_NE(iter, nullptr); + + int total_blocks = 0; + while (!iter->Done()) { + auto op = &iter->Get(); + + if (op->type == kCowReplaceOp) { + total_blocks += 1; + if (op->new_block == 50) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + data.resize(options.block_size); + ASSERT_EQ(sink.stream(), data); + } + if (op->new_block == 3000) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + data2.resize(options.block_size); + ASSERT_EQ(sink.stream(), data2); + } + if (op->new_block == 5000) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + ASSERT_EQ(sink.stream(), data3); + } + } + + iter->Next(); + } + + ASSERT_EQ(total_blocks, expected_blocks); +} + +INSTANTIATE_TEST_SUITE_P(CowApi, CompressionRWTest, testing::Values("none", "gz", "brotli", "lz4")); + TEST_F(CowTest, ClusterCompressGz) { CowOptions options; options.compression = "gz"; diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp index 0eb231bdf950e7972ecaec135668c6c660d29bd2..4d9b74837119fe03a1a298d6deb13bd1fc2da35c 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp @@ -33,7 +33,7 @@ namespace android { namespace snapshot { -std::basic_string CowWriter::Compress(const void* data, size_t length) { +std::basic_string CompressWorker::Compress(const void* data, size_t length) { switch (compression_) { case kCowCompressGz: { const auto bound = compressBound(length); @@ -100,5 +100,119 @@ std::basic_string CowWriter::Compress(const void* data, size_t length) return {}; } +bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, + std::vector>* compressed_data) { + const uint8_t* iter = reinterpret_cast(buffer); + while (num_blocks) { + auto data = Compress(iter, block_size_); + if (data.empty()) { + PLOG(ERROR) << "CompressBlocks: Compression failed"; + return false; + } + if (data.size() > std::numeric_limits::max()) { + LOG(ERROR) << "Compressed block is too large: " << data.size(); + return false; + } + + compressed_data->emplace_back(std::move(data)); + num_blocks -= 1; + iter += block_size_; + } + return true; +} + +bool CompressWorker::RunThread() { + while (true) { + // Wait for work + CompressWork blocks; + { + std::unique_lock lock(lock_); + while (work_queue_.empty() && !stopped_) { + cv_.wait(lock); + } + + if (stopped_) { + return true; + } + + blocks = std::move(work_queue_.front()); + work_queue_.pop(); + } + + // Compress blocks + bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data); + blocks.compression_status = ret; + { + std::lock_guard lock(lock_); + compressed_queue_.push(std::move(blocks)); + } + + // Notify completion + cv_.notify_all(); + + if (!ret) { + LOG(ERROR) << "CompressBlocks failed"; + return false; + } + } + + return true; +} + +void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) { + { + std::lock_guard lock(lock_); + + CompressWork blocks = {}; + blocks.buffer = buffer; + blocks.num_blocks = num_blocks; + work_queue_.push(std::move(blocks)); + } + cv_.notify_all(); +} + +bool CompressWorker::GetCompressedBuffers(std::vector>* compressed_buf) { + { + std::unique_lock lock(lock_); + while (compressed_queue_.empty() && !stopped_) { + cv_.wait(lock); + } + + if (stopped_) { + return true; + } + } + + { + std::lock_guard lock(lock_); + while (compressed_queue_.size() > 0) { + CompressWork blocks = std::move(compressed_queue_.front()); + compressed_queue_.pop(); + + if (blocks.compression_status) { + compressed_buf->insert(compressed_buf->end(), + std::make_move_iterator(blocks.compressed_data.begin()), + std::make_move_iterator(blocks.compressed_data.end())); + } else { + LOG(ERROR) << "Block compression failed"; + return false; + } + } + } + + return true; +} + +void CompressWorker::Finalize() { + { + std::unique_lock lock(lock_); + stopped_ = true; + } + cv_.notify_all(); +} + +CompressWorker::CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size) + : compression_(compression), block_size_(block_size) {} + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp index 43c17a634246f80864133fcf8daef96812e1ebe5..2d5e4bc36c93330e7ba7d66f191333ad6988fbfe 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp @@ -15,6 +15,7 @@ // #include +#include #include #include @@ -22,6 +23,7 @@ #include #include +#include #include #include #include @@ -132,6 +134,46 @@ bool ICowWriter::ValidateNewBlock(uint64_t new_block) { CowWriter::CowWriter(const CowOptions& options) : ICowWriter(options), fd_(-1) { SetupHeaders(); + SetupWriteOptions(); +} + +CowWriter::~CowWriter() { + for (size_t i = 0; i < compress_threads_.size(); i++) { + CompressWorker* worker = compress_threads_[i].get(); + if (worker) { + worker->Finalize(); + } + } + + bool ret = true; + for (auto& t : threads_) { + ret = t.get() && ret; + } + + if (!ret) { + LOG(ERROR) << "Compression failed"; + } + compress_threads_.clear(); +} + +void CowWriter::SetupWriteOptions() { + num_compress_threads_ = options_.num_compress_threads; + + if (!num_compress_threads_) { + num_compress_threads_ = 1; + // We prefer not to have more than two threads as the overhead of additional + // threads is far greater than cutting down compression time. + if (header_.cluster_ops && + android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) { + num_compress_threads_ = 2; + } + } + + if (header_.cluster_ops && + (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) || + options_.batch_write)) { + batch_write_ = true; + } } void CowWriter::SetupHeaders() { @@ -206,6 +248,42 @@ bool CowWriter::SetFd(android::base::borrowed_fd fd) { return true; } +void CowWriter::InitBatchWrites() { + if (batch_write_) { + cowop_vec_ = std::make_unique(header_.cluster_ops); + data_vec_ = std::make_unique(header_.cluster_ops); + struct iovec* cowop_ptr = cowop_vec_.get(); + struct iovec* data_ptr = data_vec_.get(); + for (size_t i = 0; i < header_.cluster_ops; i++) { + std::unique_ptr op = std::make_unique(); + cowop_ptr[i].iov_base = op.get(); + cowop_ptr[i].iov_len = sizeof(CowOperation); + opbuffer_vec_.push_back(std::move(op)); + + std::unique_ptr buffer = std::make_unique(header_.block_size * 2); + data_ptr[i].iov_base = buffer.get(); + data_ptr[i].iov_len = header_.block_size * 2; + databuffer_vec_.push_back(std::move(buffer)); + } + + current_op_pos_ = next_op_pos_; + current_data_pos_ = next_data_pos_; + } + + std::string batch_write = batch_write_ ? "enabled" : "disabled"; + LOG(INFO) << "Batch writes: " << batch_write; +} + +void CowWriter::InitWorkers() { + for (int i = 0; i < num_compress_threads_; i++) { + auto wt = std::make_unique(compression_, header_.block_size); + threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get())); + compress_threads_.push_back(std::move(wt)); + } + + LOG(INFO) << num_compress_threads_ << " thread used for compression"; +} + bool CowWriter::Initialize(unique_fd&& fd) { owned_fd_ = std::move(fd); return Initialize(borrowed_fd{owned_fd_}); @@ -216,7 +294,13 @@ bool CowWriter::Initialize(borrowed_fd fd) { return false; } - return OpenForWrite(); + bool ret = OpenForWrite(); + + if (ret) { + InitWorkers(); + } + + return ret; } bool CowWriter::InitializeAppend(android::base::unique_fd&& fd, uint64_t label) { @@ -229,7 +313,13 @@ bool CowWriter::InitializeAppend(android::base::borrowed_fd fd, uint64_t label) return false; } - return OpenForAppend(label); + bool ret = OpenForAppend(label); + + if (ret && !compress_threads_.size()) { + InitWorkers(); + } + + return ret; } void CowWriter::InitPos() { @@ -287,6 +377,7 @@ bool CowWriter::OpenForWrite() { } InitPos(); + InitBatchWrites(); return true; } @@ -320,6 +411,9 @@ bool CowWriter::OpenForAppend(uint64_t label) { PLOG(ERROR) << "lseek failed"; return false; } + + InitBatchWrites(); + return EmitClusterIfNeeded(); } @@ -348,47 +442,99 @@ bool CowWriter::EmitXorBlocks(uint32_t new_block_start, const void* data, size_t return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp); } +bool CowWriter::CompressBlocks(size_t num_blocks, const void* data) { + size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_; + size_t num_blocks_per_thread = num_blocks / num_threads; + const uint8_t* iter = reinterpret_cast(data); + compressed_buf_.clear(); + + // Submit the blocks per thread. The retrieval of + // compressed buffers has to be done in the same order. + // We should not poll for completed buffers in a different order as the + // buffers are tightly coupled with block ordering. + for (size_t i = 0; i < num_threads; i++) { + CompressWorker* worker = compress_threads_[i].get(); + if (i == num_threads - 1) { + num_blocks_per_thread = num_blocks; + } + worker->EnqueueCompressBlocks(iter, num_blocks_per_thread); + iter += (num_blocks_per_thread * header_.block_size); + num_blocks -= num_blocks_per_thread; + } + + for (size_t i = 0; i < num_threads; i++) { + CompressWorker* worker = compress_threads_[i].get(); + if (!worker->GetCompressedBuffers(&compressed_buf_)) { + return false; + } + } + + return true; +} + bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block, uint16_t offset, uint8_t type) { - const uint8_t* iter = reinterpret_cast(data); CHECK(!merge_in_progress_); - for (size_t i = 0; i < size / header_.block_size; i++) { - CowOperation op = {}; - op.new_block = new_block_start + i; - op.type = type; - if (type == kCowXorOp) { - op.source = (old_block + i) * header_.block_size + offset; - } else { - op.source = next_data_pos_; - } + const uint8_t* iter = reinterpret_cast(data); + + // Update engine can potentially send 100MB of blocks at a time. We + // don't want to process all those blocks in one shot as it can + // stress the memory. Hence, process the blocks in chunks. + // + // 1024 blocks is reasonable given we will end up using max + // memory of ~4MB. + const size_t kProcessingBlocks = 1024; + size_t num_blocks = (size / header_.block_size); + size_t i = 0; + + while (num_blocks) { + size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks)); if (compression_) { - auto data = Compress(iter, header_.block_size); - if (data.empty()) { - PLOG(ERROR) << "AddRawBlocks: compression failed"; + if (!CompressBlocks(pending_blocks, iter)) { return false; } - if (data.size() > std::numeric_limits::max()) { - LOG(ERROR) << "Compressed block is too large: " << data.size() << " bytes"; - return false; - } - op.compression = compression_; - op.data_length = static_cast(data.size()); + buf_iter_ = compressed_buf_.begin(); + CHECK(pending_blocks == compressed_buf_.size()); + iter += (pending_blocks * header_.block_size); + } - if (!WriteOperation(op, data.data(), data.size())) { - PLOG(ERROR) << "AddRawBlocks: write failed, bytes requested: " << size - << ", bytes written: " << i * header_.block_size; - return false; + num_blocks -= pending_blocks; + + while (i < size / header_.block_size && pending_blocks) { + CowOperation op = {}; + op.new_block = new_block_start + i; + op.type = type; + if (type == kCowXorOp) { + op.source = (old_block + i) * header_.block_size + offset; + } else { + op.source = next_data_pos_; } - } else { - op.data_length = static_cast(header_.block_size); - if (!WriteOperation(op, iter, header_.block_size)) { - PLOG(ERROR) << "AddRawBlocks: write failed"; - return false; + + if (compression_) { + auto data = std::move(*buf_iter_); + op.compression = compression_; + op.data_length = static_cast(data.size()); + + if (!WriteOperation(op, data.data(), data.size())) { + PLOG(ERROR) << "AddRawBlocks: write failed"; + return false; + } + buf_iter_++; + } else { + op.data_length = static_cast(header_.block_size); + if (!WriteOperation(op, iter, header_.block_size)) { + PLOG(ERROR) << "AddRawBlocks: write failed"; + return false; + } + iter += header_.block_size; } + + i += 1; + pending_blocks -= 1; } - iter += header_.block_size; + CHECK(pending_blocks == 0); } return true; } @@ -416,7 +562,7 @@ bool CowWriter::EmitLabel(uint64_t label) { bool CowWriter::EmitSequenceData(size_t num_ops, const uint32_t* data) { CHECK(!merge_in_progress_); size_t to_add = 0; - size_t max_ops = std::numeric_limits::max() / sizeof(uint32_t); + size_t max_ops = (header_.block_size * 2) / sizeof(uint32_t); while (num_ops > 0) { CowOperation op = {}; op.type = kCowSequenceOp; @@ -461,6 +607,11 @@ static void SHA256(const void*, size_t, uint8_t[]) { } bool CowWriter::Finalize() { + if (!FlushCluster()) { + LOG(ERROR) << "Finalize: FlushCluster() failed"; + return false; + } + auto continue_cluster_size = current_cluster_size_; auto continue_data_size = current_data_size_; auto continue_data_pos = next_data_pos_; @@ -525,6 +676,9 @@ bool CowWriter::Finalize() { next_op_pos_ = continue_op_pos; footer_.op.num_ops = continue_num_ops; } + + FlushCluster(); + return Sync(); } @@ -556,6 +710,35 @@ bool CowWriter::EnsureSpaceAvailable(const uint64_t bytes_needed) const { return true; } +bool CowWriter::FlushCluster() { + ssize_t ret; + + if (op_vec_index_) { + ret = pwritev(fd_.get(), cowop_vec_.get(), op_vec_index_, current_op_pos_); + if (ret != (op_vec_index_ * sizeof(CowOperation))) { + PLOG(ERROR) << "pwritev failed for CowOperation. Expected: " + << (op_vec_index_ * sizeof(CowOperation)); + return false; + } + } + + if (data_vec_index_) { + ret = pwritev(fd_.get(), data_vec_.get(), data_vec_index_, current_data_pos_); + if (ret != total_data_written_) { + PLOG(ERROR) << "pwritev failed for data. Expected: " << total_data_written_; + return false; + } + } + + total_data_written_ = 0; + op_vec_index_ = 0; + data_vec_index_ = 0; + current_op_pos_ = next_op_pos_; + current_data_pos_ = next_data_pos_; + + return true; +} + bool CowWriter::WriteOperation(const CowOperation& op, const void* data, size_t size) { if (!EnsureSpaceAvailable(next_op_pos_ + sizeof(op))) { return false; @@ -564,14 +747,43 @@ bool CowWriter::WriteOperation(const CowOperation& op, const void* data, size_t return false; } - if (!android::base::WriteFullyAtOffset(fd_, reinterpret_cast(&op), sizeof(op), - next_op_pos_)) { - return false; - } - if (data != nullptr && size > 0) { - if (!WriteRawData(data, size)) return false; + if (batch_write_) { + CowOperation* cow_op = reinterpret_cast(cowop_vec_[op_vec_index_].iov_base); + std::memcpy(cow_op, &op, sizeof(CowOperation)); + op_vec_index_ += 1; + + if (data != nullptr && size > 0) { + struct iovec* data_ptr = data_vec_.get(); + std::memcpy(data_ptr[data_vec_index_].iov_base, data, size); + data_ptr[data_vec_index_].iov_len = size; + data_vec_index_ += 1; + total_data_written_ += size; + } + } else { + if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) { + PLOG(ERROR) << "lseek failed for writing operation."; + return false; + } + if (!android::base::WriteFully(fd_, reinterpret_cast(&op), sizeof(op))) { + return false; + } + if (data != nullptr && size > 0) { + if (!WriteRawData(data, size)) return false; + } } + AddOperation(op); + + if (batch_write_) { + if (op_vec_index_ == header_.cluster_ops || data_vec_index_ == header_.cluster_ops || + op.type == kCowLabelOp || op.type == kCowClusterOp) { + if (!FlushCluster()) { + LOG(ERROR) << "Failed to flush cluster data"; + return false; + } + } + } + return EmitClusterIfNeeded(); }