Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit be46ca7f authored by Akilesh Kailash's avatar Akilesh Kailash
Browse files

snapuserd: Service I/O requests from dm-user



Now that merging is done in user-space and
partitions are mounted off dm-user directly,
daemon will have to serve every I/O request.

Daemon has to handle this wherein we need to
check if the given I/O request block has been
modified in the OTA. Furthermore, if merge is
in-progress, we will have to synchronize with
the merge thread before servicing the I/O.

If the I/O request maps to a REPLACE or ZERO op,
we will just read the data from COW device.

If the I/O request maps to a COPY or XOR op,
the worker thread will have to synchronize
with the merge thread and if the merge is
in progress, fetch the data directly from RA buffer.

This patch handles I/O requests only if the
sectors are 4k aligned.

Bug: 196929997
Test: snapuserd_test

Signed-off-by: default avatarAkilesh Kailash <akailash@google.com>
Change-Id: I08562b8927e1c22dd9d9ef160e873280854eac99
parent cab12f8a
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -26,8 +26,8 @@ static constexpr uint32_t kCowVersionMinor = 0;

static constexpr uint32_t kCowVersionManifest = 2;

static constexpr uint32_t BLOCK_SZ = 4096;
static constexpr uint32_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1);
static constexpr size_t BLOCK_SZ = 4096;
static constexpr size_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1);

// This header appears as the first sequence of bytes in the COW. All fields
// in the layout are little-endian encoded. The on-disk layout is:
+37 −1
Original line number Diff line number Diff line
@@ -155,13 +155,43 @@ bool SnapshotHandler::ReadMetadata() {
    // Initialize the iterator for reading metadata
    std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();

    int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
    int ra_index = 0;

    size_t copy_ops = 0, replace_ops = 0, zero_ops = 0, xor_ops = 0;

    while (!cowop_iter->Done()) {
        const CowOperation* cow_op = &cowop_iter->Get();

        if (cow_op->type == kCowCopyOp) {
            copy_ops += 1;
        } else if (cow_op->type == kCowReplaceOp) {
            replace_ops += 1;
        } else if (cow_op->type == kCowZeroOp) {
            zero_ops += 1;
        } else if (cow_op->type == kCowXorOp) {
            xor_ops += 1;
        }

        chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op));

        if (!ra_thread_ && IsOrderedOp(*cow_op)) {
        if (IsOrderedOp(*cow_op)) {
            ra_thread_ = true;
            block_to_ra_index_[cow_op->new_block] = ra_index;
            num_ra_ops_per_iter -= 1;

            if ((ra_index + 1) - merge_blk_state_.size() == 1) {
                std::unique_ptr<MergeGroupState> blk_state = std::make_unique<MergeGroupState>(
                        MERGE_GROUP_STATE::GROUP_MERGE_PENDING, 0);

                merge_blk_state_.push_back(std::move(blk_state));
            }

            // Move to next RA block
            if (num_ra_ops_per_iter == 0) {
                num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
                ra_index += 1;
            }
        }
        cowop_iter->Next();
    }
@@ -173,6 +203,12 @@ bool SnapshotHandler::ReadMetadata() {

    PrepareReadAhead();

    SNAP_LOG(INFO) << "Merged-ops: " << header.num_merge_ops
                   << " Total-data-ops: " << reader_->get_num_total_data_ops()
                   << " Unmerged-ops: " << chunk_vec_.size() << " Copy-ops: " << copy_ops
                   << " Zero-ops: " << zero_ops << " Replace-ops: " << replace_ops
                   << " Xor-ops: " << xor_ops;

    return true;
}

+55 −1
Original line number Diff line number Diff line
@@ -67,6 +67,27 @@ enum class MERGE_IO_TRANSITION {

class SnapshotHandler;

enum class MERGE_GROUP_STATE {
    GROUP_MERGE_PENDING,
    GROUP_MERGE_RA_READY,
    GROUP_MERGE_IN_PROGRESS,
    GROUP_MERGE_COMPLETED,
    GROUP_MERGE_FAILED,
    GROUP_INVALID,
};

struct MergeGroupState {
    MERGE_GROUP_STATE merge_state_;
    // Ref count I/O when group state
    // is in "GROUP_MERGE_PENDING"
    size_t num_ios_in_progress;
    std::mutex m_lock;
    std::condition_variable m_cv;

    MergeGroupState(MERGE_GROUP_STATE state, size_t n_ios)
        : merge_state_(state), num_ios_in_progress(n_ios) {}
};

class ReadAhead {
  public:
    ReadAhead(const std::string& cow_device, const std::string& backing_device,
@@ -133,16 +154,30 @@ class Worker {
        base_path_merge_fd_ = {};
    }

    // Functions interacting with dm-user
    bool ReadDmUserHeader();
    bool WriteDmUserPayload(size_t size, bool header_response);
    bool DmuserReadRequest();

    // IO Path
    bool ProcessIORequest();
    bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }

    bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
    bool ReadFromSourceDevice(const CowOperation* cow_op);

    bool ReadAlignedSector(sector_t sector, size_t sz, bool header_response);
    bool RespondIOError(bool header_response);

    // Processing COW operations
    bool ProcessCowOp(const CowOperation* cow_op);
    bool ProcessReplaceOp(const CowOperation* cow_op);
    bool ProcessZeroOp();

    // Handles Copy and Xor
    bool ProcessCopyOp(const CowOperation* cow_op);
    bool ProcessXorOp(const CowOperation* cow_op);
    bool ProcessOrderedOp(const CowOperation* cow_op);

    // Merge related ops
    bool Merge();
@@ -152,6 +187,9 @@ class Worker {
                     const std::unique_ptr<ICowOpIter>& cowop_iter,
                     std::vector<const CowOperation*>* replace_zero_vec = nullptr);

    sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
    chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }

    std::unique_ptr<CowReader> reader_;
    BufferSink bufsink_;
    XorSink xorsink_;
@@ -210,6 +248,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
    // Read-ahead related functions
    void* GetMappedAddr() { return mapped_addr_; }
    void PrepareReadAhead();
    std::unordered_map<uint64_t, void*>& GetReadAheadMap() { return read_ahead_buffer_map_; }

    // State transitions for merge
    void InitiateMerge();
@@ -239,10 +278,19 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
    void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
    bool MergeInitiated() { return merge_initiated_; }

    // Merge Block State Transitions
    void SetMergeCompleted(size_t block_index);
    void SetMergeInProgress(size_t block_index);
    void SetMergeFailed(size_t block_index);
    void NotifyIOCompletion(uint64_t new_block);
    bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
    MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer);

  private:
    bool ReadMetadata();
    sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
    chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
    bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
    struct BufferState* GetBufferState();

    void ReadBlocks(const std::string partition_name, const std::string& dm_block_device);
@@ -261,7 +309,6 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {

    unique_fd cow_fd_;

    // Number of sectors required when initializing dm-user
    uint64_t num_sectors_;

    std::unique_ptr<CowReader> reader_;
@@ -283,6 +330,13 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
    int total_ra_blocks_merged_ = 0;
    MERGE_IO_TRANSITION io_state_;
    std::unique_ptr<ReadAhead> read_ahead_thread_;
    std::unordered_map<uint64_t, void*> read_ahead_buffer_map_;

    // user-space-merging
    std::unordered_map<uint64_t, int> block_to_ra_index_;

    // Merge Block state
    std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;

    std::unique_ptr<Worker> merge_thread_;

+321 −6
Original line number Diff line number Diff line
@@ -84,6 +84,56 @@ bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
    return true;
}

bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (buffer == nullptr) {
        SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
        return false;
    }
    SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
                    << " Source: " << cow_op->source;
    uint64_t offset = cow_op->source;
    if (cow_op->type == kCowCopyOp) {
        offset *= BLOCK_SZ;
    }
    if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) {
        std::string op;
        if (cow_op->type == kCowCopyOp)
            op = "Copy-op";
        else {
            op = "Xor-op";
        }
        SNAP_PLOG(ERROR) << op << " failed. Read from backing store: " << backing_store_device_
                         << "at block :" << offset / BLOCK_SZ << " offset:" << offset % BLOCK_SZ;
        return false;
    }

    return true;
}

// Start the copy operation. This will read the backing
// block device which is represented by cow_op->source.
bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
    if (!ReadFromSourceDevice(cow_op)) {
        return false;
    }

    return true;
}

bool Worker::ProcessXorOp(const CowOperation* cow_op) {
    if (!ReadFromSourceDevice(cow_op)) {
        return false;
    }
    xorsink_.Reset();
    if (!reader_->ReadData(*cow_op, &xorsink_)) {
        SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block;
        return false;
    }

    return true;
}

bool Worker::ProcessZeroOp() {
    // Zero out the entire block
    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
@@ -96,13 +146,86 @@ bool Worker::ProcessZeroOp() {
    return true;
}

bool Worker::ProcessCopyOp(const CowOperation*) {
bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (buffer == nullptr) {
        SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
        return false;
    }

    MERGE_GROUP_STATE state = snapuserd_->ProcessMergingBlock(cow_op->new_block, buffer);

    switch (state) {
        case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: {
            // Merge is completed for this COW op; just read directly from
            // the base device
            SNAP_LOG(DEBUG) << "Merge-completed: Reading from base device sector: "
                            << (cow_op->new_block >> SECTOR_SHIFT)
                            << " Block-number: " << cow_op->new_block;
            if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), BLOCK_SZ)) {
                SNAP_LOG(ERROR) << "ReadDataFromBaseDevice at sector: "
                                << (cow_op->new_block >> SECTOR_SHIFT) << " after merge-complete.";
                return false;
            }
            return true;
        }
        case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
            bool ret;
            if (cow_op->type == kCowCopyOp) {
                ret = ProcessCopyOp(cow_op);
            } else {
                ret = ProcessXorOp(cow_op);
            }

bool Worker::ProcessXorOp(const CowOperation*) {
            // I/O is complete - decrement the refcount irrespective of the return
            // status
            snapuserd_->NotifyIOCompletion(cow_op->new_block);
            return ret;
        }
        // We already have the data in the buffer retrieved from RA thread.
        // Nothing to process further.
        case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: {
            [[fallthrough]];
        }
        case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: {
            return true;
        }
        default: {
            // All other states, fail the I/O viz (GROUP_MERGE_FAILED and GROUP_INVALID)
            return false;
        }
    }

    return false;
}

bool Worker::ProcessCowOp(const CowOperation* cow_op) {
    if (cow_op == nullptr) {
        SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
        return false;
    }

    switch (cow_op->type) {
        case kCowReplaceOp: {
            return ProcessReplaceOp(cow_op);
        }

        case kCowZeroOp: {
            return ProcessZeroOp();
        }

        case kCowCopyOp:
            [[fallthrough]];
        case kCowXorOp: {
            return ProcessOrderedOp(cow_op);
        }

        default: {
            SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type;
        }
    }
    return false;
}

void Worker::InitializeBufsink() {
    // Allocate the buffer which is used to communicate between
@@ -129,7 +252,7 @@ bool Worker::Init() {
}

bool Worker::RunThread() {
    SNAP_LOG(DEBUG) << "Processing snapshot I/O requests...";
    SNAP_LOG(INFO) << "Processing snapshot I/O requests....";
    // Start serving IO
    while (true) {
        if (!ProcessIORequest()) {
@@ -143,8 +266,200 @@ bool Worker::RunThread() {
    return true;
}

// Read Header from dm-user misc device. This gives
// us the sector number for which IO is issued by dm-snapshot device
bool Worker::ReadDmUserHeader() {
    if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) {
        if (errno != ENOTBLK) {
            SNAP_PLOG(ERROR) << "Control-read failed";
        }

        SNAP_PLOG(DEBUG) << "ReadDmUserHeader failed....";
        return false;
    }

    return true;
}

// Send the payload/data back to dm-user misc device.
bool Worker::WriteDmUserPayload(size_t size, bool header_response) {
    size_t payload_size = size;
    void* buf = bufsink_.GetPayloadBufPtr();
    if (header_response) {
        payload_size += sizeof(struct dm_user_header);
        buf = bufsink_.GetBufPtr();
    }

    if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
        SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
        return false;
    }

    return true;
}

bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
    CHECK(read_size <= BLOCK_SZ);

    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (buffer == nullptr) {
        SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
        return false;
    }

    loff_t offset = sector << SECTOR_SHIFT;
    if (!android::base::ReadFullyAtOffset(base_path_merge_fd_, buffer, read_size, offset)) {
        SNAP_PLOG(ERROR) << "ReadDataFromBaseDevice failed. fd: " << base_path_merge_fd_
                         << "at sector :" << sector << " size: " << read_size;
        return false;
    }

    return true;
}

bool Worker::ReadAlignedSector(sector_t sector, size_t sz, bool header_response) {
    struct dm_user_header* header = bufsink_.GetHeaderPtr();
    size_t remaining_size = sz;
    std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
    bool io_error = false;
    int ret = 0;

    do {
        // Process 1MB payload at a time
        size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);

        header->type = DM_USER_RESP_SUCCESS;
        size_t total_bytes_read = 0;
        io_error = false;
        bufsink_.ResetBufferOffset();

        while (read_size) {
            // We need to check every 4k block to verify if it is
            // present in the mapping.
            size_t size = std::min(BLOCK_SZ, read_size);

            auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
                                       std::make_pair(sector, nullptr), SnapshotHandler::compare);
            bool not_found = (it == chunk_vec.end() || it->first != sector);

            if (not_found) {
                // Block not found in map - which means this block was not
                // changed as per the OTA. Just route the I/O to the base
                // device.
                if (!ReadDataFromBaseDevice(sector, size)) {
                    SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
                    header->type = DM_USER_RESP_ERROR;
                }

                ret = size;
            } else {
                // We found the sector in mapping. Check the type of COW OP and
                // process it.
                if (!ProcessCowOp(it->second)) {
                    SNAP_LOG(ERROR) << "ProcessCowOp failed";
                    header->type = DM_USER_RESP_ERROR;
                }

                ret = BLOCK_SZ;
            }

            // Just return the header if it is an error
            if (header->type == DM_USER_RESP_ERROR) {
                if (!RespondIOError(header_response)) {
                    return false;
                }

                io_error = true;
                break;
            }

            read_size -= ret;
            total_bytes_read += ret;
            sector += (ret >> SECTOR_SHIFT);
            bufsink_.UpdateBufferOffset(ret);
        }

        if (!io_error) {
            if (!WriteDmUserPayload(total_bytes_read, header_response)) {
                return false;
            }

            SNAP_LOG(DEBUG) << "WriteDmUserPayload success total_bytes_read: " << total_bytes_read
                            << " header-response: " << header_response
                            << " remaining_size: " << remaining_size;
            header_response = false;
            remaining_size -= total_bytes_read;
        }
    } while (remaining_size > 0 && !io_error);

    return true;
}

bool Worker::RespondIOError(bool header_response) {
    struct dm_user_header* header = bufsink_.GetHeaderPtr();
    header->type = DM_USER_RESP_ERROR;
    // This is an issue with the dm-user interface. There
    // is no way to propagate the I/O error back to dm-user
    // if we have already communicated the header back. Header
    // is responded once at the beginning; however I/O can
    // be processed in chunks. If we encounter an I/O error
    // somewhere in the middle of the processing, we can't communicate
    // this back to dm-user.
    //
    // TODO: Fix the interface
    CHECK(header_response);

    if (!WriteDmUserPayload(0, header_response)) {
        return false;
    }

    // There is no need to process further as we have already seen
    // an I/O error
    return true;
}

bool Worker::DmuserReadRequest() {
    struct dm_user_header* header = bufsink_.GetHeaderPtr();

    // Unaligned I/O request
    if (!IsBlockAligned(header->sector << SECTOR_SHIFT)) {
        SNAP_LOG(ERROR) << "I/O request is not 4k aligned.";
        return false;
    }

    return ReadAlignedSector(header->sector, header->len, true);
}

bool Worker::ProcessIORequest() {
    // No communication with dm-user yet
    struct dm_user_header* header = bufsink_.GetHeaderPtr();

    if (!ReadDmUserHeader()) {
        return false;
    }

    SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
    SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
    SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
    SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
    SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;

    switch (header->type) {
        case DM_USER_REQ_MAP_READ: {
            if (!DmuserReadRequest()) {
                return false;
            }
            break;
        }

        case DM_USER_REQ_MAP_WRITE: {
            // TODO: We should not get any write request
            // to dm-user as we mount all partitions
            // as read-only. Need to verify how are TRIM commands
            // handled during mount.
            return false;
        }
    }

    return true;
}

+12 −1
Original line number Diff line number Diff line
@@ -52,7 +52,6 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
                    break;
                }

                // Check for consecutive blocks
                uint64_t next_offset = op->new_block * BLOCK_SZ;
                if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
                    break;
@@ -177,6 +176,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
    void* mapped_addr = snapuserd_->GetMappedAddr();
    void* read_ahead_buffer =
            static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
    size_t block_index = 0;

    SNAP_LOG(INFO) << "MergeOrderedOps started....";

@@ -190,9 +190,12 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
        // Wait for RA thread to notify that the merge window
        // is ready for merging.
        if (!snapuserd_->WaitForMergeBegin()) {
            snapuserd_->SetMergeFailed(block_index);
            return false;
        }

        snapuserd_->SetMergeInProgress(block_index);

        loff_t offset = 0;
        int num_ops = snapuserd_->GetTotalBlocksToMerge();
        SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
@@ -213,6 +216,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
            if (ret < 0 || ret != io_size) {
                SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
                                << " at offset: " << source_offset << " io_size: " << io_size;
                snapuserd_->SetMergeFailed(block_index);
                return false;
            }

@@ -226,6 +230,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
        // Flush the data
        if (fsync(base_path_merge_fd_.get()) < 0) {
            SNAP_LOG(ERROR) << " Failed to fsync merged data";
            snapuserd_->SetMergeFailed(block_index);
            return false;
        }

@@ -233,14 +238,20 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
        // the merge completion
        if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
            SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
            snapuserd_->SetMergeFailed(block_index);
            return false;
        }

        SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
        // Mark the block as merge complete
        snapuserd_->SetMergeCompleted(block_index);

        // Notify RA thread that the merge thread is ready to merge the next
        // window
        snapuserd_->NotifyRAForMergeReady();

        // Get the next block
        block_index += 1;
    }

    return true;
Loading