Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 98c5abe6 authored by David Anderson's avatar David Anderson Committed by Automerger Merge Worker
Browse files

Merge "snapuserd: Refactor how buffers are managed." into main am: 72c3ca16...

Merge "snapuserd: Refactor how buffers are managed." into main am: 72c3ca16 am: a99f70a2 am: f5714e77

Original change: https://android-review.googlesource.com/c/platform/system/core/+/2633935



Change-Id: I520f49f558d3484cf0a5d5411478d2e297933770
Signed-off-by: default avatarAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
parents c0fa980b f5714e77
Loading
Loading
Loading
Loading
+15 −14
Original line number Original line Diff line number Diff line
@@ -36,25 +36,26 @@ class BufferSink final {
    struct dm_user_header* GetHeaderPtr();
    struct dm_user_header* GetHeaderPtr();
    void ResetBufferOffset() { buffer_offset_ = 0; }
    void ResetBufferOffset() { buffer_offset_ = 0; }
    void* GetPayloadBufPtr();
    void* GetPayloadBufPtr();
    loff_t GetPayloadBytesWritten() { return buffer_offset_; }


  private:
    // Same as calling GetPayloadBuffer and then UpdateBufferOffset.
    std::unique_ptr<uint8_t[]> buffer_;
    //
    loff_t buffer_offset_;
    // This is preferred over GetPayloadBuffer as it does not require a
    size_t buffer_size_;
    // separate call to UpdateBufferOffset.
};
    void* AcquireBuffer(size_t size) { return AcquireBuffer(size, size); }


class XorSink final {
    // Same as AcquireBuffer, but separates the requested size from the buffer
  public:
    // offset. This is useful for a situation where a full run of data will be
    void Initialize(BufferSink* sink, size_t size);
    // read, but only a partial amount will be returned.
    void Reset();
    //
    void* GetBuffer(size_t requested, size_t* actual);
    // If size != to_write, the excess bytes may be reallocated by the next
    bool ReturnData(void* buffer, size_t len);
    // call to AcquireBuffer.
    void* AcquireBuffer(size_t size, size_t to_write);


  private:
  private:
    BufferSink* bufsink_;
    std::unique_ptr<uint8_t[]> buffer_;
    std::unique_ptr<uint8_t[]> buffer_;
    loff_t buffer_offset_;
    size_t buffer_size_;
    size_t buffer_size_;
    size_t returned_;
};
};


}  // namespace snapshot
}  // namespace snapshot
+16 −35
Original line number Original line Diff line number Diff line
@@ -15,6 +15,8 @@
 */
 */


#include <snapuserd/snapuserd_buffer.h>
#include <snapuserd/snapuserd_buffer.h>

#include <android-base/logging.h>
#include <snapuserd/snapuserd_kernel.h>
#include <snapuserd/snapuserd_kernel.h>


namespace android {
namespace android {
@@ -26,11 +28,23 @@ void BufferSink::Initialize(size_t size) {
    buffer_ = std::make_unique<uint8_t[]>(size);
    buffer_ = std::make_unique<uint8_t[]>(size);
}
}


void* BufferSink::GetPayloadBuffer(size_t size) {
void* BufferSink::AcquireBuffer(size_t size, size_t to_write) {
    if ((buffer_size_ - buffer_offset_) < size) return nullptr;
    CHECK(to_write <= size);

    void* ptr = GetPayloadBuffer(size);
    if (!ptr) {
        return nullptr;
    }
    UpdateBufferOffset(to_write);
    return ptr;
}


void* BufferSink::GetPayloadBuffer(size_t size) {
    char* buffer = reinterpret_cast<char*>(GetBufPtr());
    char* buffer = reinterpret_cast<char*>(GetBufPtr());
    struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
    struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
    if ((buffer_size_ - buffer_offset_ - sizeof(msg->header)) < size) {
        return nullptr;
    }
    return (char*)msg->payload.buf + buffer_offset_;
    return (char*)msg->payload.buf + buffer_offset_;
}
}


@@ -59,38 +73,5 @@ void* BufferSink::GetPayloadBufPtr() {
    return msg->payload.buf;
    return msg->payload.buf;
}
}


void XorSink::Initialize(BufferSink* sink, size_t size) {
    bufsink_ = sink;
    buffer_size_ = size;
    returned_ = 0;
    buffer_ = std::make_unique<uint8_t[]>(size);
}

void XorSink::Reset() {
    returned_ = 0;
}

void* XorSink::GetBuffer(size_t requested, size_t* actual) {
    if (requested > buffer_size_) {
        *actual = buffer_size_;
    } else {
        *actual = requested;
    }
    return buffer_.get();
}

bool XorSink::ReturnData(void* buffer, size_t len) {
    uint8_t* xor_data = reinterpret_cast<uint8_t*>(buffer);
    uint8_t* buff = reinterpret_cast<uint8_t*>(bufsink_->GetPayloadBuffer(len + returned_));
    if (buff == nullptr) {
        return false;
    }
    for (size_t i = 0; i < len; i++) {
        buff[returned_ + i] ^= xor_data[i];
    }
    returned_ += len;
    return true;
}

}  // namespace snapshot
}  // namespace snapshot
}  // namespace android
}  // namespace android
+73 −105
Original line number Original line Diff line number Diff line
@@ -42,12 +42,7 @@ ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing
// Start the replace operation. This will read the
// Start the replace operation. This will read the
// internal COW format and if the block is compressed,
// internal COW format and if the block is compressed,
// it will be de-compressed.
// it will be de-compressed.
bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op, void* buffer) {
    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (!buffer) {
        SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer";
        return false;
    }
    if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
    if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
        SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
        SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
        return false;
        return false;
@@ -55,12 +50,7 @@ bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) {
    return true;
    return true;
}
}


bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) {
bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op, void* buffer) {
    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (buffer == nullptr) {
        SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
        return false;
    }
    uint64_t offset;
    uint64_t offset;
    if (!reader_->GetSourceOffset(cow_op, &offset)) {
    if (!reader_->GetSourceOffset(cow_op, &offset)) {
        SNAP_LOG(ERROR) << "ReadFromSourceDevice: Failed to get source offset";
        SNAP_LOG(ERROR) << "ReadFromSourceDevice: Failed to get source offset";
@@ -85,60 +75,43 @@ bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) {


// Start the copy operation. This will read the backing
// Start the copy operation. This will read the backing
// block device which is represented by cow_op->source.
// block device which is represented by cow_op->source.
bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op, void* buffer) {
    if (!ReadFromSourceDevice(cow_op)) {
    if (!ReadFromSourceDevice(cow_op, buffer)) {
        return false;
        return false;
    }
    }

    return true;
    return true;
}
}


bool ReadWorker::ProcessXorOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessXorOp(const CowOperation* cow_op, void* buffer) {
    if (!ReadFromSourceDevice(cow_op)) {
    if (!ReadFromSourceDevice(cow_op, buffer)) {
        return false;
        return false;
    }
    }


    xorsink_.Reset();
    if (xor_buffer_.empty()) {

        xor_buffer_.resize(BLOCK_SZ);
    size_t actual = 0;
    void* buffer = xorsink_.GetBuffer(BLOCK_SZ, &actual);
    if (!buffer || actual < BLOCK_SZ) {
        SNAP_LOG(ERROR) << "ProcessXorOp failed to get buffer of " << BLOCK_SZ << " size, got "
                        << actual;
        return false;
    }
    }
    ssize_t size = reader_->ReadData(cow_op, buffer, BLOCK_SZ);
    CHECK(xor_buffer_.size() == BLOCK_SZ);

    ssize_t size = reader_->ReadData(cow_op, xor_buffer_.data(), xor_buffer_.size());
    if (size != BLOCK_SZ) {
    if (size != BLOCK_SZ) {
        SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block
        SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block
                        << ", return value: " << size;
                        << ", return value: " << size;
        return false;
        return false;
    }
    }
    if (!xorsink_.ReturnData(buffer, size)) {

        SNAP_LOG(ERROR) << "ProcessXorOp failed to return data";
    auto xor_out = reinterpret_cast<uint8_t*>(buffer);
        return false;
    for (size_t i = 0; i < BLOCK_SZ; i++) {
        xor_out[i] ^= xor_buffer_[i];
    }
    }
    return true;
    return true;
}
}


bool ReadWorker::ProcessZeroOp() {
bool ReadWorker::ProcessZeroOp(void* buffer) {
    // Zero out the entire block
    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (buffer == nullptr) {
        SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
        return false;
    }

    memset(buffer, 0, BLOCK_SZ);
    memset(buffer, 0, BLOCK_SZ);
    return true;
    return true;
}
}


bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op, void* buffer) {
    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (buffer == nullptr) {
        SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
        return false;
    }

    MERGE_GROUP_STATE state = snapuserd_->ProcessMergingBlock(cow_op->new_block, buffer);
    MERGE_GROUP_STATE state = snapuserd_->ProcessMergingBlock(cow_op->new_block, buffer);


    switch (state) {
    switch (state) {
@@ -148,7 +121,7 @@ bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
            SNAP_LOG(DEBUG) << "Merge-completed: Reading from base device sector: "
            SNAP_LOG(DEBUG) << "Merge-completed: Reading from base device sector: "
                            << (cow_op->new_block >> SECTOR_SHIFT)
                            << (cow_op->new_block >> SECTOR_SHIFT)
                            << " Block-number: " << cow_op->new_block;
                            << " Block-number: " << cow_op->new_block;
            if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), BLOCK_SZ)) {
            if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), buffer, BLOCK_SZ)) {
                SNAP_LOG(ERROR) << "ReadDataFromBaseDevice at sector: "
                SNAP_LOG(ERROR) << "ReadDataFromBaseDevice at sector: "
                                << (cow_op->new_block >> SECTOR_SHIFT) << " after merge-complete.";
                                << (cow_op->new_block >> SECTOR_SHIFT) << " after merge-complete.";
                return false;
                return false;
@@ -158,9 +131,9 @@ bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
        case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
        case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
            bool ret;
            bool ret;
            if (cow_op->type == kCowCopyOp) {
            if (cow_op->type == kCowCopyOp) {
                ret = ProcessCopyOp(cow_op);
                ret = ProcessCopyOp(cow_op, buffer);
            } else {
            } else {
                ret = ProcessXorOp(cow_op);
                ret = ProcessXorOp(cow_op, buffer);
            }
            }


            // I/O is complete - decrement the refcount irrespective of the return
            // I/O is complete - decrement the refcount irrespective of the return
@@ -185,7 +158,7 @@ bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
    return false;
    return false;
}
}


bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessCowOp(const CowOperation* cow_op, void* buffer) {
    if (cow_op == nullptr) {
    if (cow_op == nullptr) {
        SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
        SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
        return false;
        return false;
@@ -193,17 +166,17 @@ bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {


    switch (cow_op->type) {
    switch (cow_op->type) {
        case kCowReplaceOp: {
        case kCowReplaceOp: {
            return ProcessReplaceOp(cow_op);
            return ProcessReplaceOp(cow_op, buffer);
        }
        }


        case kCowZeroOp: {
        case kCowZeroOp: {
            return ProcessZeroOp();
            return ProcessZeroOp(buffer);
        }
        }


        case kCowCopyOp:
        case kCowCopyOp:
            [[fallthrough]];
            [[fallthrough]];
        case kCowXorOp: {
        case kCowXorOp: {
            return ProcessOrderedOp(cow_op);
            return ProcessOrderedOp(cow_op, buffer);
        }
        }


        default: {
        default: {
@@ -229,8 +202,6 @@ bool ReadWorker::Init() {
        SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
        SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
        return false;
        return false;
    }
    }

    xorsink_.Initialize(&bufsink_, BLOCK_SZ);
    return true;
    return true;
}
}


@@ -271,18 +242,15 @@ bool ReadWorker::WriteDmUserPayload(size_t size) {
    // After the first header is sent in response to a request, we cannot
    // After the first header is sent in response to a request, we cannot
    // send any additional headers.
    // send any additional headers.
    header_response_ = false;
    header_response_ = false;

    // Reset the buffer for use by the next request.
    bufsink_.ResetBufferOffset();
    return true;
    return true;
}
}


bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, void* buffer, size_t read_size) {
    CHECK(read_size <= BLOCK_SZ);
    CHECK(read_size <= BLOCK_SZ);


    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (buffer == nullptr) {
        SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
        return false;
    }

    loff_t offset = sector << SECTOR_SHIFT;
    loff_t offset = sector << SECTOR_SHIFT;
    if (!android::base::ReadFullyAtOffset(base_path_merge_fd_, buffer, read_size, offset)) {
    if (!android::base::ReadFullyAtOffset(base_path_merge_fd_, buffer, read_size, offset)) {
        SNAP_PLOG(ERROR) << "ReadDataFromBaseDevice failed. fd: " << base_path_merge_fd_
        SNAP_PLOG(ERROR) << "ReadDataFromBaseDevice failed. fd: " << base_path_merge_fd_
@@ -303,7 +271,6 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
        size_t read_size = std::min(PAYLOAD_BUFFER_SZ, remaining_size);
        size_t read_size = std::min(PAYLOAD_BUFFER_SZ, remaining_size);


        size_t total_bytes_read = 0;
        size_t total_bytes_read = 0;
        bufsink_.ResetBufferOffset();


        while (read_size) {
        while (read_size) {
            // We need to check every 4k block to verify if it is
            // We need to check every 4k block to verify if it is
@@ -314,11 +281,17 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
                                       std::make_pair(sector, nullptr), SnapshotHandler::compare);
                                       std::make_pair(sector, nullptr), SnapshotHandler::compare);
            bool not_found = (it == chunk_vec.end() || it->first != sector);
            bool not_found = (it == chunk_vec.end() || it->first != sector);


            void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ, size);
            if (!buffer) {
                SNAP_LOG(ERROR) << "AcquireBuffer failed in ReadAlignedSector";
                return false;
            }

            if (not_found) {
            if (not_found) {
                // Block not found in map - which means this block was not
                // Block not found in map - which means this block was not
                // changed as per the OTA. Just route the I/O to the base
                // changed as per the OTA. Just route the I/O to the base
                // device.
                // device.
                if (!ReadDataFromBaseDevice(sector, size)) {
                if (!ReadDataFromBaseDevice(sector, buffer, size)) {
                    SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
                    SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
                    return false;
                    return false;
                }
                }
@@ -327,7 +300,7 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
            } else {
            } else {
                // We found the sector in mapping. Check the type of COW OP and
                // We found the sector in mapping. Check the type of COW OP and
                // process it.
                // process it.
                if (!ProcessCowOp(it->second)) {
                if (!ProcessCowOp(it->second, buffer)) {
                    SNAP_LOG(ERROR) << "ProcessCowOp failed";
                    SNAP_LOG(ERROR) << "ProcessCowOp failed";
                    return false;
                    return false;
                }
                }
@@ -338,14 +311,13 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
            read_size -= ret;
            read_size -= ret;
            total_bytes_read += ret;
            total_bytes_read += ret;
            sector += (ret >> SECTOR_SHIFT);
            sector += (ret >> SECTOR_SHIFT);
            bufsink_.UpdateBufferOffset(ret);
        }
        }


        if (!WriteDmUserPayload(total_bytes_read)) {
        if (!SendBufferedIo()) {
            return false;
            return false;
        }
        }


        SNAP_LOG(DEBUG) << "WriteDmUserPayload success total_bytes_read: " << total_bytes_read
        SNAP_LOG(DEBUG) << "SendBufferedIo success total_bytes_read: " << total_bytes_read
                        << " remaining_size: " << remaining_size;
                        << " remaining_size: " << remaining_size;
        remaining_size -= total_bytes_read;
        remaining_size -= total_bytes_read;
    } while (remaining_size > 0);
    } while (remaining_size > 0);
@@ -356,40 +328,36 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
int ReadWorker::ReadUnalignedSector(
int ReadWorker::ReadUnalignedSector(
        sector_t sector, size_t size,
        sector_t sector, size_t size,
        std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
        std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
    size_t skip_sector_size = 0;

    SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
    SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
                    << " Aligned sector: " << it->first;
                    << " Aligned sector: " << it->first;


    if (!ProcessCowOp(it->second)) {
    int num_sectors_skip = sector - it->first;
    size_t skip_size = num_sectors_skip << SECTOR_SHIFT;
    size_t write_size = std::min(size, BLOCK_SZ - skip_size);
    auto buffer = reinterpret_cast<uint8_t*>(bufsink_.AcquireBuffer(BLOCK_SZ, write_size));
    if (!buffer) {
        SNAP_LOG(ERROR) << "ProcessCowOp failed to allocate buffer";
        return -1;
    }

    if (!ProcessCowOp(it->second, buffer)) {
        SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
        SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
                        << " Aligned sector: " << it->first;
                        << " Aligned sector: " << it->first;
        return -1;
        return -1;
    }
    }


    int num_sectors_skip = sector - it->first;
    if (skip_size) {

        if (skip_size == BLOCK_SZ) {
    if (num_sectors_skip > 0) {
        skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
        char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
        struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));

        if (skip_sector_size == BLOCK_SZ) {
            SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
            SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
                            << " Base-sector: " << it->first;
                            << " Base-sector: " << it->first;
            return -1;
            return -1;
        }
        }

        memmove(buffer, buffer + skip_size, write_size);
        memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
                (BLOCK_SZ - skip_sector_size));
    }
    }

    return write_size;
    bufsink_.ResetBufferOffset();
    return std::min(size, (BLOCK_SZ - skip_sector_size));
}
}


bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
    bufsink_.ResetBufferOffset();
    std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
    std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();


    auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
    auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
@@ -458,7 +426,6 @@ bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
    // requested offset in this case is beyond the last mapped COW op size (which
    // requested offset in this case is beyond the last mapped COW op size (which
    // is block 1 in this case).
    // is block 1 in this case).


    size_t total_bytes_read = 0;
    size_t remaining_size = size;
    size_t remaining_size = size;
    int ret = 0;
    int ret = 0;
    if (!merge_complete && (requested_offset >= final_offset) &&
    if (!merge_complete && (requested_offset >= final_offset) &&
@@ -472,11 +439,10 @@ bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
        }
        }


        remaining_size -= ret;
        remaining_size -= ret;
        total_bytes_read += ret;
        sector += (ret >> SECTOR_SHIFT);
        sector += (ret >> SECTOR_SHIFT);


        // Send the data back
        // Send the data back
        if (!WriteDmUserPayload(total_bytes_read)) {
        if (!SendBufferedIo()) {
            return false;
            return false;
        }
        }


@@ -494,26 +460,21 @@ bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
        // Find the diff of the aligned offset
        // Find the diff of the aligned offset
        size_t diff_size = aligned_offset - requested_offset;
        size_t diff_size = aligned_offset - requested_offset;
        CHECK(diff_size <= BLOCK_SZ);
        CHECK(diff_size <= BLOCK_SZ);
        if (remaining_size < diff_size) {
            if (!ReadDataFromBaseDevice(sector, remaining_size)) {
                return false;
            }
            total_bytes_read += remaining_size;


            if (!WriteDmUserPayload(total_bytes_read)) {
        size_t read_size = std::min(remaining_size, diff_size);
        void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ, read_size);
        if (!buffer) {
            SNAP_LOG(ERROR) << "AcquireBuffer failed in ReadUnalignedSector";
            return false;
            return false;
        }
        }
        } else {
        if (!ReadDataFromBaseDevice(sector, buffer, read_size)) {
            if (!ReadDataFromBaseDevice(sector, diff_size)) {
            return false;
            return false;
        }
        }

        if (!SendBufferedIo()) {
            total_bytes_read += diff_size;

            if (!WriteDmUserPayload(total_bytes_read)) {
            return false;
            return false;
        }
        }


        if (remaining_size >= diff_size) {
            remaining_size -= diff_size;
            remaining_size -= diff_size;
            size_t num_sectors_read = (diff_size >> SECTOR_SHIFT);
            size_t num_sectors_read = (diff_size >> SECTOR_SHIFT);
            sector += num_sectors_read;
            sector += num_sectors_read;
@@ -555,6 +516,10 @@ bool ReadWorker::DmuserReadRequest() {
    return ReadAlignedSector(header->sector, header->len);
    return ReadAlignedSector(header->sector, header->len);
}
}


bool ReadWorker::SendBufferedIo() {
    return WriteDmUserPayload(bufsink_.GetPayloadBytesWritten());
}

bool ReadWorker::ProcessIORequest() {
bool ReadWorker::ProcessIORequest() {
    // Read Header from dm-user misc device. This gives
    // Read Header from dm-user misc device. This gives
    // us the sector number for which IO is issued by dm-snapshot device
    // us the sector number for which IO is issued by dm-snapshot device
@@ -579,6 +544,9 @@ bool ReadWorker::ProcessIORequest() {
    header->type = DM_USER_RESP_SUCCESS;
    header->type = DM_USER_RESP_SUCCESS;
    header_response_ = true;
    header_response_ = true;


    // Reset the output buffer.
    bufsink_.ResetBufferOffset();

    bool ok;
    bool ok;
    switch (request_type) {
    switch (request_type) {
        case DM_USER_REQ_MAP_READ:
        case DM_USER_REQ_MAP_READ:
+10 −9
Original line number Original line Diff line number Diff line
@@ -37,21 +37,22 @@ class ReadWorker : public Worker {
    bool ProcessIORequest();
    bool ProcessIORequest();
    bool WriteDmUserPayload(size_t size);
    bool WriteDmUserPayload(size_t size);
    bool DmuserReadRequest();
    bool DmuserReadRequest();
    bool SendBufferedIo();
    void RespondIOError();
    void RespondIOError();


    bool ProcessCowOp(const CowOperation* cow_op);
    bool ProcessCowOp(const CowOperation* cow_op, void* buffer);
    bool ProcessXorOp(const CowOperation* cow_op);
    bool ProcessXorOp(const CowOperation* cow_op, void* buffer);
    bool ProcessOrderedOp(const CowOperation* cow_op);
    bool ProcessOrderedOp(const CowOperation* cow_op, void* buffer);
    bool ProcessCopyOp(const CowOperation* cow_op);
    bool ProcessCopyOp(const CowOperation* cow_op, void* buffer);
    bool ProcessReplaceOp(const CowOperation* cow_op);
    bool ProcessReplaceOp(const CowOperation* cow_op, void* buffer);
    bool ProcessZeroOp();
    bool ProcessZeroOp(void* buffer);


    bool ReadAlignedSector(sector_t sector, size_t sz);
    bool ReadAlignedSector(sector_t sector, size_t sz);
    bool ReadUnalignedSector(sector_t sector, size_t size);
    bool ReadUnalignedSector(sector_t sector, size_t size);
    int ReadUnalignedSector(sector_t sector, size_t size,
    int ReadUnalignedSector(sector_t sector, size_t size,
                            std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
                            std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
    bool ReadFromSourceDevice(const CowOperation* cow_op);
    bool ReadFromSourceDevice(const CowOperation* cow_op, void* buffer);
    bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
    bool ReadDataFromBaseDevice(sector_t sector, void* buffer, size_t read_size);


    constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
    constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
    constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
    constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
@@ -62,8 +63,8 @@ class ReadWorker : public Worker {
    std::string control_device_;
    std::string control_device_;
    unique_fd ctrl_fd_;
    unique_fd ctrl_fd_;


    XorSink xorsink_;
    bool header_response_ = false;
    bool header_response_ = false;
    std::basic_string<uint8_t> xor_buffer_;
};
};


}  // namespace snapshot
}  // namespace snapshot
+2 −4
Original line number Original line Diff line number Diff line
@@ -106,9 +106,9 @@ bool MergeWorker::MergeReplaceZeroOps() {
        for (size_t i = 0; i < replace_zero_vec.size(); i++) {
        for (size_t i = 0; i < replace_zero_vec.size(); i++) {
            const CowOperation* cow_op = replace_zero_vec[i];
            const CowOperation* cow_op = replace_zero_vec[i];


            void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
            void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
            if (!buffer) {
            if (!buffer) {
                SNAP_LOG(ERROR) << "Failed to acquire buffer in merge";
                SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
                return false;
                return false;
            }
            }
            if (cow_op->type == kCowReplaceOp) {
            if (cow_op->type == kCowReplaceOp) {
@@ -120,8 +120,6 @@ bool MergeWorker::MergeReplaceZeroOps() {
                CHECK(cow_op->type == kCowZeroOp);
                CHECK(cow_op->type == kCowZeroOp);
                memset(buffer, 0, BLOCK_SZ);
                memset(buffer, 0, BLOCK_SZ);
            }
            }

            bufsink_.UpdateBufferOffset(BLOCK_SZ);
        }
        }


        size_t io_size = linear_blocks * BLOCK_SZ;
        size_t io_size = linear_blocks * BLOCK_SZ;
Loading