Loading fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp +23 −5 Original line number Diff line number Diff line Loading @@ -76,11 +76,15 @@ bool Worker::InitReader() { // internal COW format and if the block is compressed, // it will be de-compressed. bool Worker::ProcessReplaceOp(const CowOperation* cow_op) { if (!reader_->ReadData(*cow_op, &bufsink_)) { void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); if (!buffer) { SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer"; return false; } if (!reader_->ReadData(*cow_op, buffer, BLOCK_SZ)) { SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block; return false; } return true; } Loading Loading @@ -125,12 +129,26 @@ bool Worker::ProcessXorOp(const CowOperation* cow_op) { if (!ReadFromSourceDevice(cow_op)) { return false; } xorsink_.Reset(); if (!reader_->ReadData(*cow_op, &xorsink_)) { SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block; size_t actual = 0; void* buffer = xorsink_.GetBuffer(BLOCK_SZ, &actual); if (!buffer || actual < BLOCK_SZ) { SNAP_LOG(ERROR) << "ProcessXorOp failed to get buffer of " << BLOCK_SZ << " size, got " << actual; return false; } ssize_t size = reader_->ReadData(*cow_op, buffer, BLOCK_SZ); if (size != BLOCK_SZ) { SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block << ", return value: " << size; return false; } if (!xorsink_.ReturnData(buffer, size)) { SNAP_LOG(ERROR) << "ProcessXorOp failed to return data"; return false; } return true; } Loading fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp +44 −11 Original line number Diff line number Diff line Loading @@ -114,6 +114,24 @@ int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops, return nr_consecutive; } class [[nodiscard]] AutoNotifyReadAheadFailed { public: AutoNotifyReadAheadFailed(std::shared_ptr<SnapshotHandler> snapuserd) : snapuserd_(snapuserd) {} ~AutoNotifyReadAheadFailed() { if (cancelled_) { return; } snapuserd_->ReadAheadIOFailed(); } void Cancel() { cancelled_ = true; } private: std::shared_ptr<SnapshotHandler> snapuserd_; bool cancelled_ = false; }; bool ReadAhead::ReconstructDataFromCow() { std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap(); loff_t metadata_offset = 0; Loading Loading @@ -145,6 +163,8 @@ bool ReadAhead::ReconstructDataFromCow() { metadata_offset += sizeof(struct ScratchMetadata); } AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_); // We are done re-constructing the mapping; however, we need to make sure // all the COW operations to-be merged are present in the re-constructed // mapping. Loading @@ -162,7 +182,6 @@ bool ReadAhead::ReconstructDataFromCow() { if (!(num_ops == 0)) { SNAP_LOG(ERROR) << "ReconstructDataFromCow failed. Not all ops recoverd " << " Pending ops: " << num_ops; snapuserd_->ReadAheadIOFailed(); return false; } Loading @@ -175,11 +194,11 @@ bool ReadAhead::ReconstructDataFromCow() { if (!snapuserd_->ReadAheadIOCompleted(true)) { SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed..."; snapuserd_->ReadAheadIOFailed(); return false; } SNAP_LOG(INFO) << "ReconstructDataFromCow success"; notify_read_ahead_failed.Cancel(); return true; } Loading Loading @@ -467,9 +486,16 @@ bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index, if (xor_op_index < xor_op_vec.size()) { const CowOperation* xor_op = xor_op_vec[xor_op_index]; if (xor_op->new_block == new_block) { if (!reader_->ReadData(*xor_op, &bufsink_)) { void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); if (!buffer) { SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer for block: " << xor_op->new_block; return false; } if (ssize_t rv = reader_->ReadData(*xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) { SNAP_LOG(ERROR) << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block; << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block << ", return value: " << rv; return false; } Loading @@ -492,6 +518,8 @@ bool ReadAhead::ReadAheadSyncIO() { blocks_.clear(); std::vector<const CowOperation*> xor_op_vec; AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_); bufsink_.ResetBufferOffset(); // Number of ops to be merged in this window. This is a fixed size Loading @@ -518,8 +546,6 @@ bool ReadAhead::ReadAheadSyncIO() { << " offset :" << source_offset % BLOCK_SZ << " buffer_offset : " << buffer_offset << " io_size : " << io_size << " buf-addr : " << read_ahead_buffer_; snapuserd_->ReadAheadIOFailed(); return false; } Loading @@ -530,6 +556,7 @@ bool ReadAhead::ReadAheadSyncIO() { // Done with merging ordered ops if (RAIterDone() && total_blocks_merged_ == 0) { notify_read_ahead_failed.Cancel(); return true; } Loading Loading @@ -560,20 +587,25 @@ bool ReadAhead::ReadAheadSyncIO() { // Check if this block is an XOR op if (xor_op->new_block == new_block) { // Read the xor'ed data from COW if (!reader_->ReadData(*xor_op, &bufsink)) { void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); if (!buffer) { SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer"; return false; } if (ssize_t rv = reader_->ReadData(*xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) { SNAP_LOG(ERROR) << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block; snapuserd_->ReadAheadIOFailed(); << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block << ", return value: " << rv; return false; } // Pointer to the data read from base device uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr); uint8_t* read_buffer = reinterpret_cast<uint8_t*>(bufptr); // Get the xor'ed data read from COW device uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink.GetPayloadBufPtr()); // Retrieve the original data for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) { buffer[byte_offset] ^= xor_data[byte_offset]; read_buffer[byte_offset] ^= xor_data[byte_offset]; } // Move to next XOR op Loading Loading @@ -604,6 +636,7 @@ bool ReadAhead::ReadAheadSyncIO() { bm->new_block = 0; bm->file_offset = 0; notify_read_ahead_failed.Cancel(); return true; } Loading Loading
fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp +23 −5 Original line number Diff line number Diff line Loading @@ -76,11 +76,15 @@ bool Worker::InitReader() { // internal COW format and if the block is compressed, // it will be de-compressed. bool Worker::ProcessReplaceOp(const CowOperation* cow_op) { if (!reader_->ReadData(*cow_op, &bufsink_)) { void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); if (!buffer) { SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer"; return false; } if (!reader_->ReadData(*cow_op, buffer, BLOCK_SZ)) { SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block; return false; } return true; } Loading Loading @@ -125,12 +129,26 @@ bool Worker::ProcessXorOp(const CowOperation* cow_op) { if (!ReadFromSourceDevice(cow_op)) { return false; } xorsink_.Reset(); if (!reader_->ReadData(*cow_op, &xorsink_)) { SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block; size_t actual = 0; void* buffer = xorsink_.GetBuffer(BLOCK_SZ, &actual); if (!buffer || actual < BLOCK_SZ) { SNAP_LOG(ERROR) << "ProcessXorOp failed to get buffer of " << BLOCK_SZ << " size, got " << actual; return false; } ssize_t size = reader_->ReadData(*cow_op, buffer, BLOCK_SZ); if (size != BLOCK_SZ) { SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block << ", return value: " << size; return false; } if (!xorsink_.ReturnData(buffer, size)) { SNAP_LOG(ERROR) << "ProcessXorOp failed to return data"; return false; } return true; } Loading
fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp +44 −11 Original line number Diff line number Diff line Loading @@ -114,6 +114,24 @@ int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops, return nr_consecutive; } class [[nodiscard]] AutoNotifyReadAheadFailed { public: AutoNotifyReadAheadFailed(std::shared_ptr<SnapshotHandler> snapuserd) : snapuserd_(snapuserd) {} ~AutoNotifyReadAheadFailed() { if (cancelled_) { return; } snapuserd_->ReadAheadIOFailed(); } void Cancel() { cancelled_ = true; } private: std::shared_ptr<SnapshotHandler> snapuserd_; bool cancelled_ = false; }; bool ReadAhead::ReconstructDataFromCow() { std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap(); loff_t metadata_offset = 0; Loading Loading @@ -145,6 +163,8 @@ bool ReadAhead::ReconstructDataFromCow() { metadata_offset += sizeof(struct ScratchMetadata); } AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_); // We are done re-constructing the mapping; however, we need to make sure // all the COW operations to-be merged are present in the re-constructed // mapping. Loading @@ -162,7 +182,6 @@ bool ReadAhead::ReconstructDataFromCow() { if (!(num_ops == 0)) { SNAP_LOG(ERROR) << "ReconstructDataFromCow failed. Not all ops recoverd " << " Pending ops: " << num_ops; snapuserd_->ReadAheadIOFailed(); return false; } Loading @@ -175,11 +194,11 @@ bool ReadAhead::ReconstructDataFromCow() { if (!snapuserd_->ReadAheadIOCompleted(true)) { SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed..."; snapuserd_->ReadAheadIOFailed(); return false; } SNAP_LOG(INFO) << "ReconstructDataFromCow success"; notify_read_ahead_failed.Cancel(); return true; } Loading Loading @@ -467,9 +486,16 @@ bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index, if (xor_op_index < xor_op_vec.size()) { const CowOperation* xor_op = xor_op_vec[xor_op_index]; if (xor_op->new_block == new_block) { if (!reader_->ReadData(*xor_op, &bufsink_)) { void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); if (!buffer) { SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer for block: " << xor_op->new_block; return false; } if (ssize_t rv = reader_->ReadData(*xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) { SNAP_LOG(ERROR) << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block; << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block << ", return value: " << rv; return false; } Loading @@ -492,6 +518,8 @@ bool ReadAhead::ReadAheadSyncIO() { blocks_.clear(); std::vector<const CowOperation*> xor_op_vec; AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_); bufsink_.ResetBufferOffset(); // Number of ops to be merged in this window. This is a fixed size Loading @@ -518,8 +546,6 @@ bool ReadAhead::ReadAheadSyncIO() { << " offset :" << source_offset % BLOCK_SZ << " buffer_offset : " << buffer_offset << " io_size : " << io_size << " buf-addr : " << read_ahead_buffer_; snapuserd_->ReadAheadIOFailed(); return false; } Loading @@ -530,6 +556,7 @@ bool ReadAhead::ReadAheadSyncIO() { // Done with merging ordered ops if (RAIterDone() && total_blocks_merged_ == 0) { notify_read_ahead_failed.Cancel(); return true; } Loading Loading @@ -560,20 +587,25 @@ bool ReadAhead::ReadAheadSyncIO() { // Check if this block is an XOR op if (xor_op->new_block == new_block) { // Read the xor'ed data from COW if (!reader_->ReadData(*xor_op, &bufsink)) { void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); if (!buffer) { SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer"; return false; } if (ssize_t rv = reader_->ReadData(*xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) { SNAP_LOG(ERROR) << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block; snapuserd_->ReadAheadIOFailed(); << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block << ", return value: " << rv; return false; } // Pointer to the data read from base device uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr); uint8_t* read_buffer = reinterpret_cast<uint8_t*>(bufptr); // Get the xor'ed data read from COW device uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink.GetPayloadBufPtr()); // Retrieve the original data for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) { buffer[byte_offset] ^= xor_data[byte_offset]; read_buffer[byte_offset] ^= xor_data[byte_offset]; } // Move to next XOR op Loading Loading @@ -604,6 +636,7 @@ bool ReadAhead::ReadAheadSyncIO() { bm->new_block = 0; bm->file_offset = 0; notify_read_ahead_failed.Cancel(); return true; } Loading