Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e74976cc authored by David Anderson's avatar David Anderson Committed by Gerrit Code Review
Browse files

Merge changes Ib90ae553,I3ca3ad9c

* changes:
  RESTRICT AUTOMERGE: snapuserd: Remove DaemonOps.
  RESTRICT AUTOMERGE: snapuserd: Split the server into two classes.
parents b634c6f5 369c993f
Loading
Loading
Loading
Loading
+1 −0
Original line number Original line Diff line number Diff line
@@ -62,6 +62,7 @@ cc_library_static {
        "dm-snapshot-merge/snapuserd_worker.cpp",
        "dm-snapshot-merge/snapuserd_worker.cpp",
        "dm-snapshot-merge/snapuserd_readahead.cpp",
        "dm-snapshot-merge/snapuserd_readahead.cpp",
        "snapuserd_buffer.cpp",
        "snapuserd_buffer.cpp",
        "user-space-merge/handler_manager.cpp",
        "user-space-merge/snapuserd_core.cpp",
        "user-space-merge/snapuserd_core.cpp",
        "user-space-merge/snapuserd_dm_user.cpp",
        "user-space-merge/snapuserd_dm_user.cpp",
        "user-space-merge/snapuserd_merge.cpp",
        "user-space-merge/snapuserd_merge.cpp",
+1 −1
Original line number Original line Diff line number Diff line
@@ -114,7 +114,7 @@ bool Daemon::StartServerForUserspaceSnapshots(int arg_start, int argc, char** ar
            return false;
            return false;
        }
        }
        auto handler = user_server_.AddHandler(parts[0], parts[1], parts[2], parts[3]);
        auto handler = user_server_.AddHandler(parts[0], parts[1], parts[2], parts[3]);
        if (!handler || !user_server_.StartHandler(handler)) {
        if (!handler || !user_server_.StartHandler(parts[0])) {
            return false;
            return false;
        }
        }
    }
    }
+371 −0
Original line number Original line Diff line number Diff line
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "handler_manager.h"

#include <sys/eventfd.h>

#include <android-base/logging.h>

#include "snapuserd_core.h"

namespace android {
namespace snapshot {

static constexpr uint8_t kMaxMergeThreads = 2;

void HandlerThread::FreeResources() {
    // Each worker thread holds a reference to snapuserd.
    // Clear them so that all the resources
    // held by snapuserd is released
    if (snapuserd_) {
        snapuserd_->FreeResources();
        snapuserd_ = nullptr;
    }
}

SnapshotHandlerManager::SnapshotHandlerManager() {
    monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
    if (monitor_merge_event_fd_ == -1) {
        PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
    }
}

std::shared_ptr<HandlerThread> SnapshotHandlerManager::AddHandler(
        const std::string& misc_name, const std::string& cow_device_path,
        const std::string& backing_device, const std::string& base_path_merge,
        int num_worker_threads, bool use_iouring, bool perform_verification) {
    auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
                                                       base_path_merge, num_worker_threads,
                                                       use_iouring, perform_verification);
    if (!snapuserd->InitCowDevice()) {
        LOG(ERROR) << "Failed to initialize Snapuserd";
        return nullptr;
    }

    if (!snapuserd->InitializeWorkers()) {
        LOG(ERROR) << "Failed to initialize workers";
        return nullptr;
    }

    auto handler = std::make_shared<HandlerThread>(snapuserd);
    {
        std::lock_guard<std::mutex> lock(lock_);
        if (FindHandler(&lock, misc_name) != dm_users_.end()) {
            LOG(ERROR) << "Handler already exists: " << misc_name;
            return nullptr;
        }
        dm_users_.push_back(handler);
    }
    return handler;
}

bool SnapshotHandlerManager::StartHandler(const std::string& misc_name) {
    std::lock_guard<std::mutex> lock(lock_);
    auto iter = FindHandler(&lock, misc_name);
    if (iter == dm_users_.end()) {
        LOG(ERROR) << "Could not find handler: " << misc_name;
        return false;
    }
    if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
        LOG(ERROR) << "Tried to re-attach control device: " << misc_name;
        return false;
    }
    if (!StartHandler(*iter)) {
        return false;
    }
    return true;
}

bool SnapshotHandlerManager::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
    if (handler->snapuserd()->IsAttached()) {
        LOG(ERROR) << "Handler already attached";
        return false;
    }

    handler->snapuserd()->AttachControlDevice();

    handler->thread() = std::thread(std::bind(&SnapshotHandlerManager::RunThread, this, handler));
    return true;
}

bool SnapshotHandlerManager::DeleteHandler(const std::string& misc_name) {
    {
        std::lock_guard<std::mutex> lock(lock_);
        auto iter = FindHandler(&lock, misc_name);
        if (iter == dm_users_.end()) {
            // After merge is completed, we swap dm-user table with
            // the underlying dm-linear base device. Hence, worker
            // threads would have terminted and was removed from
            // the list.
            LOG(DEBUG) << "Could not find handler: " << misc_name;
            return true;
        }

        if (!(*iter)->ThreadTerminated()) {
            (*iter)->snapuserd()->NotifyIOTerminated();
        }
    }
    if (!RemoveAndJoinHandler(misc_name)) {
        return false;
    }
    return true;
}

void SnapshotHandlerManager::RunThread(std::shared_ptr<HandlerThread> handler) {
    LOG(INFO) << "Entering thread for handler: " << handler->misc_name();

    if (!handler->snapuserd()->Start()) {
        LOG(ERROR) << " Failed to launch all worker threads";
    }

    handler->snapuserd()->CloseFds();
    bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
    handler->snapuserd()->UnmapBufferRegion();

    auto misc_name = handler->misc_name();
    LOG(INFO) << "Handler thread about to exit: " << misc_name;

    {
        std::lock_guard<std::mutex> lock(lock_);
        if (merge_completed) {
            num_partitions_merge_complete_ += 1;
            active_merge_threads_ -= 1;
            WakeupMonitorMergeThread();
        }
        handler->SetThreadTerminated();
        auto iter = FindHandler(&lock, handler->misc_name());
        if (iter == dm_users_.end()) {
            // RemoveAndJoinHandler() already removed us from the list, and is
            // now waiting on a join(), so just return. Additionally, release
            // all the resources held by snapuserd object which are shared
            // by worker threads. This should be done when the last reference
            // of "handler" is released; but we will explicitly release here
            // to make sure snapuserd object is freed as it is the biggest
            // consumer of memory in the daemon.
            handler->FreeResources();
            LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
            return;
        }

        LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;

        if (handler->snapuserd()->IsAttached()) {
            handler->thread().detach();
        }

        // Important: free resources within the lock. This ensures that if
        // WaitForDelete() is called, the handler is either in the list, or
        // it's not and its resources are guaranteed to be freed.
        handler->FreeResources();
        dm_users_.erase(iter);
    }
}

bool SnapshotHandlerManager::InitiateMerge(const std::string& misc_name) {
    std::lock_guard<std::mutex> lock(lock_);
    auto iter = FindHandler(&lock, misc_name);
    if (iter == dm_users_.end()) {
        LOG(ERROR) << "Could not find handler: " << misc_name;
        return false;
    }

    return StartMerge(&lock, *iter);
}

bool SnapshotHandlerManager::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
                                        const std::shared_ptr<HandlerThread>& handler) {
    CHECK(proof_of_lock);

    if (!handler->snapuserd()->IsAttached()) {
        LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
        return false;
    }

    handler->snapuserd()->MonitorMerge();

    if (!is_merge_monitor_started_) {
        std::thread(&SnapshotHandlerManager::MonitorMerge, this).detach();
        is_merge_monitor_started_ = true;
    }

    merge_handlers_.push(handler);
    WakeupMonitorMergeThread();
    return true;
}

void SnapshotHandlerManager::WakeupMonitorMergeThread() {
    uint64_t notify = 1;
    ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
    if (rc < 0) {
        PLOG(FATAL) << "failed to notify monitor merge thread";
    }
}

void SnapshotHandlerManager::MonitorMerge() {
    while (!stop_monitor_merge_thread_) {
        uint64_t testVal;
        ssize_t ret =
                TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
        if (ret == -1) {
            PLOG(FATAL) << "Failed to read from eventfd";
        } else if (ret == 0) {
            LOG(FATAL) << "Hit EOF on eventfd";
        }

        LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
        {
            std::lock_guard<std::mutex> lock(lock_);
            while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
                auto handler = merge_handlers_.front();
                merge_handlers_.pop();

                if (!handler->snapuserd()) {
                    LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
                    continue;
                }

                LOG(INFO) << "Starting merge for partition: "
                          << handler->snapuserd()->GetMiscName();
                handler->snapuserd()->InitiateMerge();
                active_merge_threads_ += 1;
            }
        }
    }

    LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
}

std::string SnapshotHandlerManager::GetMergeStatus(const std::string& misc_name) {
    std::lock_guard<std::mutex> lock(lock_);
    auto iter = FindHandler(&lock, misc_name);
    if (iter == dm_users_.end()) {
        LOG(ERROR) << "Could not find handler: " << misc_name;
        return {};
    }

    return (*iter)->snapuserd()->GetMergeStatus();
}

double SnapshotHandlerManager::GetMergePercentage() {
    std::lock_guard<std::mutex> lock(lock_);

    double percentage = 0.0;
    int n = 0;

    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
        auto& th = (*iter)->thread();
        if (th.joinable()) {
            // Merge percentage by individual partitions wherein merge is still
            // in-progress
            percentage += (*iter)->snapuserd()->GetMergePercentage();
            n += 1;
        }
    }

    // Calculate final merge including those partitions where merge was already
    // completed - num_partitions_merge_complete_ will track them when each
    // thread exists in RunThread.
    int total_partitions = n + num_partitions_merge_complete_;

    if (total_partitions) {
        percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
    }

    LOG(DEBUG) << "Merge %: " << percentage
               << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
               << " total_partitions: " << total_partitions << " n: " << n;
    return percentage;
}

bool SnapshotHandlerManager::GetVerificationStatus() {
    std::lock_guard<std::mutex> lock(lock_);

    bool status = true;
    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
        auto& th = (*iter)->thread();
        if (th.joinable() && status) {
            status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
        } else {
            // return immediately if there is a failure
            return false;
        }
    }

    return status;
}

bool SnapshotHandlerManager::RemoveAndJoinHandler(const std::string& misc_name) {
    std::shared_ptr<HandlerThread> handler;
    {
        std::lock_guard<std::mutex> lock(lock_);

        auto iter = FindHandler(&lock, misc_name);
        if (iter == dm_users_.end()) {
            // Client already deleted.
            return true;
        }
        handler = std::move(*iter);
        dm_users_.erase(iter);
    }

    auto& th = handler->thread();
    if (th.joinable()) {
        th.join();
    }
    return true;
}

void SnapshotHandlerManager::TerminateMergeThreads() {
    std::lock_guard<std::mutex> guard(lock_);

    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
        if (!(*iter)->ThreadTerminated()) {
            (*iter)->snapuserd()->NotifyIOTerminated();
        }
    }
}

void SnapshotHandlerManager::JoinAllThreads() {
    // Acquire the thread list within the lock.
    std::vector<std::shared_ptr<HandlerThread>> dm_users;
    {
        std::lock_guard<std::mutex> guard(lock_);
        dm_users = std::move(dm_users_);
    }

    for (auto& client : dm_users) {
        auto& th = client->thread();

        if (th.joinable()) th.join();
    }

    stop_monitor_merge_thread_ = true;
    WakeupMonitorMergeThread();
}

auto SnapshotHandlerManager::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
                                         const std::string& misc_name) -> HandlerList::iterator {
    CHECK(proof_of_lock);

    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
        if ((*iter)->misc_name() == misc_name) {
            return iter;
        }
    }
    return dm_users_.end();
}

}  // namespace snapshot
}  // namespace android
+131 −0
Original line number Original line Diff line number Diff line
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>
#include <queue>
#include <string>
#include <thread>
#include <vector>

#include <android-base/unique_fd.h>

namespace android {
namespace snapshot {

class SnapshotHandler;

class HandlerThread {
  public:
    explicit HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd);

    void FreeResources();
    const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
    std::thread& thread() { return thread_; }

    const std::string& misc_name() const { return misc_name_; }
    bool ThreadTerminated() { return thread_terminated_; }
    void SetThreadTerminated() { thread_terminated_ = true; }

  private:
    std::thread thread_;
    std::shared_ptr<SnapshotHandler> snapuserd_;
    std::string misc_name_;
    bool thread_terminated_ = false;
};

class ISnapshotHandlerManager {
  public:
    virtual ~ISnapshotHandlerManager() {}

    // Add a new snapshot handler but do not start serving requests yet.
    virtual std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
                                                      const std::string& cow_device_path,
                                                      const std::string& backing_device,
                                                      const std::string& base_path_merge,
                                                      int num_worker_threads, bool use_iouring,
                                                      bool perform_verification) = 0;

    // Start serving requests on a snapshot handler.
    virtual bool StartHandler(const std::string& misc_name) = 0;

    // Stop serving requests on a snapshot handler and remove it.
    virtual bool DeleteHandler(const std::string& misc_name) = 0;

    // Begin merging blocks on the given snapshot handler.
    virtual bool InitiateMerge(const std::string& misc_name) = 0;

    // Return a string containing a status code indicating the merge status
    // on the handler. Returns empty on error.
    virtual std::string GetMergeStatus(const std::string& misc_name) = 0;

    // Wait until all handlers have terminated.
    virtual void JoinAllThreads() = 0;

    // Stop any in-progress merge threads.
    virtual void TerminateMergeThreads() = 0;

    // Returns the merge progress across all merging snapshot handlers.
    virtual double GetMergePercentage() = 0;

    // Returns whether all snapshots have verified.
    virtual bool GetVerificationStatus() = 0;
};

class SnapshotHandlerManager final : public ISnapshotHandlerManager {
  public:
    SnapshotHandlerManager();
    std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
                                              const std::string& cow_device_path,
                                              const std::string& backing_device,
                                              const std::string& base_path_merge,
                                              int num_worker_threads, bool use_iouring,
                                              bool perform_verification) override;
    bool StartHandler(const std::string& misc_name) override;
    bool DeleteHandler(const std::string& misc_name) override;
    bool InitiateMerge(const std::string& misc_name) override;
    std::string GetMergeStatus(const std::string& misc_name) override;
    void JoinAllThreads() override;
    void TerminateMergeThreads() override;
    double GetMergePercentage() override;
    bool GetVerificationStatus() override;

  private:
    bool StartHandler(const std::shared_ptr<HandlerThread>& handler);
    void RunThread(std::shared_ptr<HandlerThread> handler);
    bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
                    const std::shared_ptr<HandlerThread>& handler);
    void MonitorMerge();
    void WakeupMonitorMergeThread();
    bool RemoveAndJoinHandler(const std::string& misc_name);

    // Find a HandlerThread within a lock.
    using HandlerList = std::vector<std::shared_ptr<HandlerThread>>;
    HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
                                      const std::string& misc_name);

    std::mutex lock_;
    HandlerList dm_users_;

    bool is_merge_monitor_started_ = false;
    bool stop_monitor_merge_thread_ = false;
    int active_merge_threads_ = 0;
    int num_partitions_merge_complete_ = 0;
    std::queue<std::shared_ptr<HandlerThread>> merge_handlers_;
    android::base::unique_fd monitor_merge_event_fd_;
};

}  // namespace snapshot
}  // namespace android
+101 −431

File changed.

Preview size limit exceeded, changes collapsed.

Loading