Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b19326fb authored by David Anderson's avatar David Anderson Committed by Automerger Merge Worker
Browse files

Merge changes I1625d1a6,I2db9cfa2,I59c31318,Ic0ed1a8d,I612374bb into main am: 83ebc437

parents 245da012 83ebc437
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -64,12 +64,13 @@ cc_library_static {
        "dm-snapshot-merge/snapuserd_readahead.cpp",
        "snapuserd_buffer.cpp",
        "user-space-merge/handler_manager.cpp",
        "user-space-merge/read_worker.cpp",
        "user-space-merge/snapuserd_core.cpp",
        "user-space-merge/snapuserd_dm_user.cpp",
        "user-space-merge/snapuserd_merge.cpp",
        "user-space-merge/snapuserd_readahead.cpp",
        "user-space-merge/snapuserd_transitions.cpp",
        "user-space-merge/snapuserd_verify.cpp",
        "user-space-merge/worker.cpp",
    ],
    static_libs: [
        "libbase",
+2 −0
Original line number Diff line number Diff line
@@ -18,7 +18,9 @@

#include <android-base/logging.h>

#include "read_worker.h"
#include "snapuserd_core.h"
#include "snapuserd_merge.h"

namespace android {
namespace snapshot {
+40 −76
Original line number Diff line number Diff line
@@ -14,6 +14,8 @@
 * limitations under the License.
 */

#include "read_worker.h"

#include "snapuserd_core.h"

namespace android {
@@ -23,59 +25,24 @@ using namespace android;
using namespace android::dm;
using android::base::unique_fd;

Worker::Worker(const std::string& cow_device, const std::string& backing_device,
               const std::string& control_device, const std::string& misc_name,
               const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd) {
    cow_device_ = cow_device;
    backing_store_device_ = backing_device;
    control_device_ = control_device;
    misc_name_ = misc_name;
    base_path_merge_ = base_path_merge;
    snapuserd_ = snapuserd;
}

bool Worker::InitializeFds() {
    backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
    if (backing_store_fd_ < 0) {
        SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
        return false;
    }

    cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
    if (cow_fd_ < 0) {
        SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
        return false;
    }

    ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
    if (ctrl_fd_ < 0) {
        SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
        return false;
    }

    // Base device used by merge thread
    base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR));
    if (base_path_merge_fd_ < 0) {
        SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_;
        return false;
void ReadWorker::CloseFds() {
    ctrl_fd_ = {};
    backing_store_fd_ = {};
    Worker::CloseFds();
}

    return true;
}

bool Worker::InitReader() {
    reader_ = snapuserd_->CloneReaderForWorker();

    if (!reader_->InitForMerge(std::move(cow_fd_))) {
        return false;
    }
    return true;
}
ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing_device,
                       const std::string& control_device, const std::string& misc_name,
                       const std::string& base_path_merge,
                       std::shared_ptr<SnapshotHandler> snapuserd)
    : Worker(cow_device, misc_name, base_path_merge, snapuserd),
      backing_store_device_(backing_device),
      control_device_(control_device) {}

// Start the replace operation. This will read the
// internal COW format and if the block is compressed,
// it will be de-compressed.
bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) {
    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (!buffer) {
        SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer";
@@ -88,7 +55,7 @@ bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
    return true;
}

bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) {
    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (buffer == nullptr) {
        SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
@@ -118,7 +85,7 @@ bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {

// Start the copy operation. This will read the backing
// block device which is represented by cow_op->source.
bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op) {
    if (!ReadFromSourceDevice(cow_op)) {
        return false;
    }
@@ -126,7 +93,7 @@ bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
    return true;
}

bool Worker::ProcessXorOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessXorOp(const CowOperation* cow_op) {
    if (!ReadFromSourceDevice(cow_op)) {
        return false;
    }
@@ -153,7 +120,7 @@ bool Worker::ProcessXorOp(const CowOperation* cow_op) {
    return true;
}

bool Worker::ProcessZeroOp() {
bool ReadWorker::ProcessZeroOp() {
    // Zero out the entire block
    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (buffer == nullptr) {
@@ -165,7 +132,7 @@ bool Worker::ProcessZeroOp() {
    return true;
}

bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
    if (buffer == nullptr) {
        SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
@@ -218,7 +185,7 @@ bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
    return false;
}

bool Worker::ProcessCowOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
    if (cow_op == nullptr) {
        SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
        return false;
@@ -246,31 +213,28 @@ bool Worker::ProcessCowOp(const CowOperation* cow_op) {
    return false;
}

void Worker::InitializeBufsink() {
    // Allocate the buffer which is used to communicate between
    // daemon and dm-user. The buffer comprises of header and a fixed payload.
    // If the dm-user requests a big IO, the IO will be broken into chunks
    // of PAYLOAD_BUFFER_SZ.
    size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_BUFFER_SZ;
    bufsink_.Initialize(buf_size);
bool ReadWorker::Init() {
    if (!Worker::Init()) {
        return false;
    }

bool Worker::Init() {
    InitializeBufsink();
    xorsink_.Initialize(&bufsink_, BLOCK_SZ);

    if (!InitializeFds()) {
    backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
    if (backing_store_fd_ < 0) {
        SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
        return false;
    }

    if (!InitReader()) {
    ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
    if (ctrl_fd_ < 0) {
        SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
        return false;
    }

    xorsink_.Initialize(&bufsink_, BLOCK_SZ);
    return true;
}

bool Worker::RunThread() {
bool ReadWorker::Run() {
    SNAP_LOG(INFO) << "Processing snapshot I/O requests....";

    if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
@@ -291,7 +255,7 @@ bool Worker::RunThread() {
}

// Send the payload/data back to dm-user misc device.
bool Worker::WriteDmUserPayload(size_t size) {
bool ReadWorker::WriteDmUserPayload(size_t size) {
    size_t payload_size = size;
    void* buf = bufsink_.GetPayloadBufPtr();
    if (header_response_) {
@@ -310,7 +274,7 @@ bool Worker::WriteDmUserPayload(size_t size) {
    return true;
}

bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
    CHECK(read_size <= BLOCK_SZ);

    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
@@ -329,7 +293,7 @@ bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
    return true;
}

bool Worker::ReadAlignedSector(sector_t sector, size_t sz) {
bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
    size_t remaining_size = sz;
    std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
    int ret = 0;
@@ -389,7 +353,7 @@ bool Worker::ReadAlignedSector(sector_t sector, size_t sz) {
    return true;
}

int Worker::ReadUnalignedSector(
int ReadWorker::ReadUnalignedSector(
        sector_t sector, size_t size,
        std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
    size_t skip_sector_size = 0;
@@ -424,7 +388,7 @@ int Worker::ReadUnalignedSector(
    return std::min(size, (BLOCK_SZ - skip_sector_size));
}

bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
    bufsink_.ResetBufferOffset();
    std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();

@@ -563,7 +527,7 @@ bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
    return true;
}

void Worker::RespondIOError() {
void ReadWorker::RespondIOError() {
    struct dm_user_header* header = bufsink_.GetHeaderPtr();
    header->type = DM_USER_RESP_ERROR;
    // This is an issue with the dm-user interface. There
@@ -580,7 +544,7 @@ void Worker::RespondIOError() {
    WriteDmUserPayload(0);
}

bool Worker::DmuserReadRequest() {
bool ReadWorker::DmuserReadRequest() {
    struct dm_user_header* header = bufsink_.GetHeaderPtr();

    // Unaligned I/O request
@@ -591,7 +555,7 @@ bool Worker::DmuserReadRequest() {
    return ReadAlignedSector(header->sector, header->len);
}

bool Worker::ProcessIORequest() {
bool ReadWorker::ProcessIORequest() {
    // Read Header from dm-user misc device. This gives
    // us the sector number for which IO is issued by dm-snapshot device
    struct dm_user_header* header = bufsink_.GetHeaderPtr();
+70 −0
Original line number Diff line number Diff line
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <utility>
#include <vector>

#include "worker.h"

namespace android {
namespace snapshot {

class ReadWorker : public Worker {
  public:
    ReadWorker(const std::string& cow_device, const std::string& backing_device,
               const std::string& control_device, const std::string& misc_name,
               const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);

    bool Run();
    bool Init() override;
    void CloseFds() override;

  private:
    // Functions interacting with dm-user
    bool ProcessIORequest();
    bool WriteDmUserPayload(size_t size);
    bool DmuserReadRequest();
    void RespondIOError();

    bool ProcessCowOp(const CowOperation* cow_op);
    bool ProcessXorOp(const CowOperation* cow_op);
    bool ProcessOrderedOp(const CowOperation* cow_op);
    bool ProcessCopyOp(const CowOperation* cow_op);
    bool ProcessReplaceOp(const CowOperation* cow_op);
    bool ProcessZeroOp();

    bool ReadAlignedSector(sector_t sector, size_t sz);
    bool ReadUnalignedSector(sector_t sector, size_t size);
    int ReadUnalignedSector(sector_t sector, size_t size,
                            std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
    bool ReadFromSourceDevice(const CowOperation* cow_op);
    bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);

    constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
    constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }

    std::string backing_store_device_;
    unique_fd backing_store_fd_;

    std::string control_device_;
    unique_fd ctrl_fd_;

    XorSink xorsink_;
    bool header_response_ = false;
};

}  // namespace snapshot
}  // namespace android
+15 −7
Original line number Diff line number Diff line
@@ -23,6 +23,9 @@
#include <android-base/scopeguard.h>
#include <android-base/strings.h>

#include "read_worker.h"
#include "snapuserd_merge.h"

namespace android {
namespace snapshot {

@@ -46,8 +49,7 @@ SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,

bool SnapshotHandler::InitializeWorkers() {
    for (int i = 0; i < num_worker_threads_; i++) {
        std::unique_ptr<Worker> wt =
                std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
        auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, control_device_,
                                               misc_name_, base_path_merge_, GetSharedPtr());
        if (!wt->Init()) {
            SNAP_LOG(ERROR) << "Thread initialization failed";
@@ -57,8 +59,8 @@ bool SnapshotHandler::InitializeWorkers() {
        worker_threads_.push_back(std::move(wt));
    }

    merge_thread_ = std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
                                             misc_name_, base_path_merge_, GetSharedPtr());
    merge_thread_ = std::make_unique<MergeWorker>(cow_device_, misc_name_, base_path_merge_,
                                                  GetSharedPtr());

    read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
                                                     GetSharedPtr());
@@ -312,11 +314,11 @@ bool SnapshotHandler::Start() {
    // Launch worker threads
    for (int i = 0; i < worker_threads_.size(); i++) {
        threads.emplace_back(
                std::async(std::launch::async, &Worker::RunThread, worker_threads_[i].get()));
                std::async(std::launch::async, &ReadWorker::Run, worker_threads_[i].get()));
    }

    std::future<bool> merge_thread =
            std::async(std::launch::async, &Worker::RunMergeThread, merge_thread_.get());
            std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get());

    // Now that the worker threads are up, scan the partitions.
    if (perform_verification_) {
@@ -452,5 +454,11 @@ bool SnapshotHandler::CheckPartitionVerification() {
    return update_verify_->CheckPartitionVerification();
}

void SnapshotHandler::FreeResources() {
    worker_threads_.clear();
    read_ahead_thread_ = nullptr;
    merge_thread_ = nullptr;
}

}  // namespace snapshot
}  // namespace android
Loading