Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0d8d7e8c authored by Daichi Hirono's avatar Daichi Hirono Committed by android-build-merger
Browse files

Change FuseAppLoop so that it can process messages asynchronously.

am: 8e16ceec

Change-Id: I89abe0e9f457e9e91fdbb4acee10e996f81c9ac5
parents 5a23eb19 8e16ceec
Loading
Loading
Loading
Loading
+186 −159
Original line number Diff line number Diff line
@@ -16,205 +16,232 @@

#include "libappfuse/FuseAppLoop.h"

#include <sys/eventfd.h>
#include <sys/stat.h>

#include <android-base/logging.h>
#include <android-base/unique_fd.h>

#include "libappfuse/EpollController.h"

namespace android {
namespace fuse {

namespace {

void HandleLookUp(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
bool HandleLookUp(FuseAppLoop* loop, FuseBuffer* buffer, FuseAppLoopCallback* callback) {
    // AppFuse does not support directory structure now.
    // It can lookup only files under the mount point.
    if (buffer->request.header.nodeid != FUSE_ROOT_ID) {
        LOG(ERROR) << "Nodeid is not FUSE_ROOT_ID.";
    buffer->response.Reset(0, -ENOENT, buffer->request.header.unique);
    return;
        return loop->ReplySimple(buffer->request.header.unique, -ENOENT);
    }

    // Ensure that the filename ends with 0.
  const size_t filename_length =
      buffer->request.header.len - sizeof(fuse_in_header);
    const size_t filename_length = buffer->request.header.len - sizeof(fuse_in_header);
    if (buffer->request.lookup_name[filename_length - 1] != 0) {
        LOG(ERROR) << "File name does not end with 0.";
    buffer->response.Reset(0, -ENOENT, buffer->request.header.unique);
    return;
        return loop->ReplySimple(buffer->request.header.unique, -ENOENT);
    }

  const uint64_t inode =
      static_cast<uint64_t>(atol(buffer->request.lookup_name));
    const uint64_t inode = static_cast<uint64_t>(atol(buffer->request.lookup_name));
    if (inode == 0 || inode == LONG_MAX) {
        LOG(ERROR) << "Invalid filename";
    buffer->response.Reset(0, -ENOENT, buffer->request.header.unique);
    return;
        return loop->ReplySimple(buffer->request.header.unique, -ENOENT);
    }

  const int64_t size = callback->OnGetSize(inode);
  if (size < 0) {
    buffer->response.Reset(0, size, buffer->request.header.unique);
    return;
  }

  buffer->response.Reset(sizeof(fuse_entry_out), 0,
                         buffer->request.header.unique);
  buffer->response.entry_out.nodeid = inode;
  buffer->response.entry_out.attr_valid = 10;
  buffer->response.entry_out.entry_valid = 10;
  buffer->response.entry_out.attr.ino = inode;
  buffer->response.entry_out.attr.mode = S_IFREG | 0777;
  buffer->response.entry_out.attr.size = size;
    callback->OnLookup(buffer->request.header.unique, inode);
    return true;
}

void HandleGetAttr(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  const uint64_t nodeid = buffer->request.header.nodeid;
  int64_t size;
  uint32_t mode;
  if (nodeid == FUSE_ROOT_ID) {
    size = 0;
    mode = S_IFDIR | 0777;
bool HandleGetAttr(FuseAppLoop* loop, FuseBuffer* buffer, FuseAppLoopCallback* callback) {
    if (buffer->request.header.nodeid == FUSE_ROOT_ID) {
        return loop->ReplyGetAttr(buffer->request.header.unique, buffer->request.header.nodeid, 0,
                                  S_IFDIR | 0777);
    } else {
    size = callback->OnGetSize(buffer->request.header.nodeid);
    if (size < 0) {
      buffer->response.Reset(0, size, buffer->request.header.unique);
      return;
        callback->OnGetAttr(buffer->request.header.unique, buffer->request.header.nodeid);
        return true;
    }
    mode = S_IFREG | 0777;
}

  buffer->response.Reset(sizeof(fuse_attr_out), 0,
                         buffer->request.header.unique);
  buffer->response.attr_out.attr_valid = 10;
  buffer->response.attr_out.attr.ino = nodeid;
  buffer->response.attr_out.attr.mode = mode;
  buffer->response.attr_out.attr.size = size;
bool HandleRead(FuseAppLoop* loop, FuseBuffer* buffer, FuseAppLoopCallback* callback) {
    if (buffer->request.read_in.size > kFuseMaxRead) {
        return loop->ReplySimple(buffer->request.header.unique, -EINVAL);
    }

void HandleOpen(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  const int32_t file_handle = callback->OnOpen(buffer->request.header.nodeid);
  if (file_handle < 0) {
    buffer->response.Reset(0, file_handle, buffer->request.header.unique);
    return;
    callback->OnRead(buffer->request.header.unique, buffer->request.header.nodeid,
                     buffer->request.read_in.offset, buffer->request.read_in.size);
    return true;
}
  buffer->response.Reset(sizeof(fuse_open_out), kFuseSuccess,
                         buffer->request.header.unique);
  buffer->response.open_out.fh = file_handle;

bool HandleWrite(FuseAppLoop* loop, FuseBuffer* buffer, FuseAppLoopCallback* callback) {
    if (buffer->request.write_in.size > kFuseMaxWrite) {
        return loop->ReplySimple(buffer->request.header.unique, -EINVAL);
    }

void HandleFsync(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  buffer->response.Reset(0, callback->OnFsync(buffer->request.header.nodeid),
                         buffer->request.header.unique);
    callback->OnWrite(buffer->request.header.unique, buffer->request.header.nodeid,
                      buffer->request.write_in.offset, buffer->request.write_in.size,
                      buffer->request.write_data);
    return true;
}

void HandleRelease(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  buffer->response.Reset(0, callback->OnRelease(buffer->request.header.nodeid),
                         buffer->request.header.unique);
bool HandleMessage(FuseAppLoop* loop, FuseBuffer* buffer, int fd, FuseAppLoopCallback* callback) {
    if (!buffer->request.Read(fd)) {
        return false;
    }

void HandleRead(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  const uint64_t unique = buffer->request.header.unique;
  const uint64_t nodeid = buffer->request.header.nodeid;
  const uint64_t offset = buffer->request.read_in.offset;
  const uint32_t size = buffer->request.read_in.size;
    const uint32_t opcode = buffer->request.header.opcode;
    LOG(VERBOSE) << "Read a fuse packet, opcode=" << opcode;
    switch (opcode) {
        case FUSE_FORGET:
            // Do not reply to FUSE_FORGET.
            return true;

  if (size > kFuseMaxRead) {
    buffer->response.Reset(0, -EINVAL, buffer->request.header.unique);
    return;
  }
        case FUSE_LOOKUP:
            return HandleLookUp(loop, buffer, callback);

  const int32_t read_size = callback->OnRead(nodeid, offset, size,
                                             buffer->response.read_data);
  if (read_size < 0) {
    buffer->response.Reset(0, read_size, buffer->request.header.unique);
    return;
  }
        case FUSE_GETATTR:
            return HandleGetAttr(loop, buffer, callback);

  buffer->response.ResetHeader(read_size, kFuseSuccess, unique);
}
        case FUSE_OPEN:
            callback->OnOpen(buffer->request.header.unique, buffer->request.header.nodeid);
            return true;

void HandleWrite(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  const uint64_t unique = buffer->request.header.unique;
  const uint64_t nodeid = buffer->request.header.nodeid;
  const uint64_t offset = buffer->request.write_in.offset;
  const uint32_t size = buffer->request.write_in.size;
        case FUSE_READ:
            return HandleRead(loop, buffer, callback);

  if (size > kFuseMaxWrite) {
    buffer->response.Reset(0, -EINVAL, buffer->request.header.unique);
    return;
        case FUSE_WRITE:
            return HandleWrite(loop, buffer, callback);

        case FUSE_RELEASE:
            callback->OnRelease(buffer->request.header.unique, buffer->request.header.nodeid);
            return true;

        case FUSE_FSYNC:
            callback->OnFsync(buffer->request.header.unique, buffer->request.header.nodeid);
            return true;

        default:
            buffer->HandleNotImpl();
            return buffer->response.Write(fd);
    }
}

  const int32_t write_size = callback->OnWrite(nodeid, offset, size,
                                               buffer->request.write_data);
  if (write_size < 0) {
    buffer->response.Reset(0, write_size, buffer->request.header.unique);
    return;
} // namespace

FuseAppLoopCallback::~FuseAppLoopCallback() = default;

FuseAppLoop::FuseAppLoop(base::unique_fd&& fd) : fd_(std::move(fd)) {}

void FuseAppLoop::Break() {
    const int64_t value = 1;
    if (write(break_fd_, &value, sizeof(value)) == -1) {
        PLOG(ERROR) << "Failed to send a break event";
    }
}

  buffer->response.Reset(sizeof(fuse_write_out), kFuseSuccess, unique);
  buffer->response.write_out.size = write_size;
bool FuseAppLoop::ReplySimple(uint64_t unique, int32_t result) {
    if (result == -ENOSYS) {
        // We should not return -ENOSYS because the kernel stops delivering FUSE
        // command after receiving -ENOSYS as a result for the command.
        result = -EBADF;
    }
    FuseSimpleResponse response;
    response.Reset(0, result, unique);
    return response.Write(fd_);
}

} // namespace
bool FuseAppLoop::ReplyLookup(uint64_t unique, uint64_t inode, int64_t size) {
    FuseSimpleResponse response;
    response.Reset(sizeof(fuse_entry_out), 0, unique);
    response.entry_out.nodeid = inode;
    response.entry_out.attr_valid = 10;
    response.entry_out.entry_valid = 10;
    response.entry_out.attr.ino = inode;
    response.entry_out.attr.mode = S_IFREG | 0777;
    response.entry_out.attr.size = size;
    return response.Write(fd_);
}

bool StartFuseAppLoop(int raw_fd, FuseAppLoopCallback* callback) {
  base::unique_fd fd(raw_fd);
  FuseBuffer buffer;
bool FuseAppLoop::ReplyGetAttr(uint64_t unique, uint64_t inode, int64_t size, int mode) {
    CHECK(mode == (S_IFREG | 0777) || mode == (S_IFDIR | 0777));
    FuseSimpleResponse response;
    response.Reset(sizeof(fuse_attr_out), 0, unique);
    response.attr_out.attr_valid = 10;
    response.attr_out.attr.ino = inode;
    response.attr_out.attr.mode = mode;
    response.attr_out.attr.size = size;
    return response.Write(fd_);
}

  LOG(DEBUG) << "Start fuse loop.";
  while (callback->IsActive()) {
    if (!buffer.request.Read(fd)) {
      return false;
bool FuseAppLoop::ReplyOpen(uint64_t unique, uint64_t fh) {
    FuseSimpleResponse response;
    response.Reset(sizeof(fuse_open_out), kFuseSuccess, unique);
    response.open_out.fh = fh;
    return response.Write(fd_);
}

    const uint32_t opcode = buffer.request.header.opcode;
    LOG(VERBOSE) << "Read a fuse packet, opcode=" << opcode;
    switch (opcode) {
      case FUSE_FORGET:
        // Do not reply to FUSE_FORGET.
        continue;
bool FuseAppLoop::ReplyWrite(uint64_t unique, uint32_t size) {
    CHECK(size <= kFuseMaxWrite);
    FuseSimpleResponse response;
    response.Reset(sizeof(fuse_write_out), kFuseSuccess, unique);
    response.write_out.size = size;
    return response.Write(fd_);
}

      case FUSE_LOOKUP:
        HandleLookUp(&buffer, callback);
        break;
bool FuseAppLoop::ReplyRead(uint64_t unique, uint32_t size, const void* data) {
    CHECK(size <= kFuseMaxRead);
    FuseSimpleResponse response;
    response.ResetHeader(size, kFuseSuccess, unique);
    return response.WriteWithBody(fd_, sizeof(FuseResponse), data);
}

      case FUSE_GETATTR:
        HandleGetAttr(&buffer, callback);
        break;
void FuseAppLoop::Start(FuseAppLoopCallback* callback) {
    break_fd_.reset(eventfd(/* initval */ 0, EFD_CLOEXEC));
    if (break_fd_.get() == -1) {
        PLOG(ERROR) << "Failed to open FD for break event";
        return;
    }

      case FUSE_OPEN:
        HandleOpen(&buffer, callback);
        break;
    base::unique_fd epoll_fd(epoll_create1(EPOLL_CLOEXEC));
    if (epoll_fd.get() == -1) {
        PLOG(ERROR) << "Failed to open FD for epoll";
        return;
    }

      case FUSE_READ:
        HandleRead(&buffer, callback);
        break;
    int last_event;
    int break_event;

      case FUSE_WRITE:
        HandleWrite(&buffer, callback);
        break;
    std::unique_ptr<EpollController> epoll_controller(new EpollController(std::move(epoll_fd)));
    if (!epoll_controller->AddFd(fd_, EPOLLIN, &last_event)) {
        return;
    }
    if (!epoll_controller->AddFd(break_fd_, EPOLLIN, &break_event)) {
        return;
    }

      case FUSE_RELEASE:
        HandleRelease(&buffer, callback);
        break;
    last_event = 0;
    break_event = 0;

      case FUSE_FSYNC:
        HandleFsync(&buffer, callback);
    FuseBuffer buffer;
    while (true) {
        if (!epoll_controller->Wait(1)) {
            break;
        }
        last_event = 0;
        *reinterpret_cast<int*>(epoll_controller->events()[0].data.ptr) =
            epoll_controller->events()[0].events;

      default:
        buffer.HandleNotImpl();
        if (break_event != 0 || (last_event & ~EPOLLIN) != 0) {
            break;
        }

    if (!buffer.response.Write(fd)) {
      LOG(ERROR) << "Failed to write a response to the device.";
      return false;
        if (!HandleMessage(this, &buffer, fd_, callback)) {
            break;
        }
    }

  return true;
    LOG(VERBOSE) << "FuseAppLoop exit";
}

}  // namespace fuse
+38 −10
Original line number Diff line number Diff line
@@ -17,23 +17,51 @@
#ifndef ANDROID_LIBAPPFUSE_FUSEAPPLOOP_H_
#define ANDROID_LIBAPPFUSE_FUSEAPPLOOP_H_

#include <memory>
#include <mutex>

#include <android-base/unique_fd.h>

#include "libappfuse/FuseBuffer.h"

namespace android {
namespace fuse {

class EpollController;

class FuseAppLoopCallback {
 public:
  virtual bool IsActive() = 0;
  virtual int64_t OnGetSize(uint64_t inode) = 0;
  virtual int32_t OnFsync(uint64_t inode) = 0;
  virtual int32_t OnWrite(
      uint64_t inode, uint64_t offset, uint32_t size, const void* data) = 0;
  virtual int32_t OnRead(
      uint64_t inode, uint64_t offset, uint32_t size, void* data) = 0;
  virtual int32_t OnOpen(uint64_t inode) = 0;
  virtual int32_t OnRelease(uint64_t inode) = 0;
  virtual ~FuseAppLoopCallback() = default;
   virtual void OnLookup(uint64_t unique, uint64_t inode) = 0;
   virtual void OnGetAttr(uint64_t unique, uint64_t inode) = 0;
   virtual void OnFsync(uint64_t unique, uint64_t inode) = 0;
   virtual void OnWrite(uint64_t unique, uint64_t inode, uint64_t offset, uint32_t size,
                        const void* data) = 0;
   virtual void OnRead(uint64_t unique, uint64_t inode, uint64_t offset, uint32_t size) = 0;
   virtual void OnOpen(uint64_t unique, uint64_t inode) = 0;
   virtual void OnRelease(uint64_t unique, uint64_t inode) = 0;
   virtual ~FuseAppLoopCallback();
};

class FuseAppLoop final {
  public:
    FuseAppLoop(base::unique_fd&& fd);

    void Start(FuseAppLoopCallback* callback);
    void Break();

    bool ReplySimple(uint64_t unique, int32_t result);
    bool ReplyLookup(uint64_t unique, uint64_t inode, int64_t size);
    bool ReplyGetAttr(uint64_t unique, uint64_t inode, int64_t size, int mode);
    bool ReplyOpen(uint64_t unique, uint64_t fh);
    bool ReplyWrite(uint64_t unique, uint32_t size);
    bool ReplyRead(uint64_t unique, uint32_t size, const void* data);

  private:
    base::unique_fd fd_;
    base::unique_fd break_fd_;

    // Lock for multi-threading.
    std::mutex mutex_;
};

bool StartFuseAppLoop(int fd, FuseAppLoopCallback* callback);
+53 −58
Original line number Diff line number Diff line
@@ -23,6 +23,9 @@
#include <gtest/gtest.h>
#include <thread>

#include "libappfuse/EpollController.h"
#include "libappfuse/FuseBridgeLoop.h"

namespace android {
namespace fuse {
namespace {
@@ -37,82 +40,61 @@ struct CallbackRequest {
class Callback : public FuseAppLoopCallback {
 public:
  std::vector<CallbackRequest> requests;
  FuseAppLoop* loop;

  bool IsActive() override {
    return true;
  void OnGetAttr(uint64_t seq, uint64_t inode) override {
      EXPECT_NE(FUSE_ROOT_ID, static_cast<int>(inode));
      EXPECT_TRUE(loop->ReplyGetAttr(seq, inode, kTestFileSize, S_IFREG | 0777));
  }

  int64_t OnGetSize(uint64_t inode) override {
    if (inode == FUSE_ROOT_ID) {
      return 0;
    } else {
      return kTestFileSize;
    }
  void OnLookup(uint64_t unique, uint64_t inode) override {
      EXPECT_NE(FUSE_ROOT_ID, static_cast<int>(inode));
      EXPECT_TRUE(loop->ReplyLookup(unique, inode, kTestFileSize));
  }

  int32_t OnFsync(uint64_t inode) override {
    requests.push_back({
      .code = FUSE_FSYNC,
      .inode = inode
    });
    return 0;
  void OnFsync(uint64_t seq, uint64_t inode) override {
      requests.push_back({.code = FUSE_FSYNC, .inode = inode});
      loop->ReplySimple(seq, 0);
  }

  int32_t OnWrite(uint64_t inode,
                  uint64_t offset ATTRIBUTE_UNUSED,
                  uint32_t size ATTRIBUTE_UNUSED,
                  const void* data ATTRIBUTE_UNUSED) override {
    requests.push_back({
      .code = FUSE_WRITE,
      .inode = inode
    });
    return 0;
  void OnWrite(uint64_t seq, uint64_t inode, uint64_t offset ATTRIBUTE_UNUSED,
               uint32_t size ATTRIBUTE_UNUSED, const void* data ATTRIBUTE_UNUSED) override {
      requests.push_back({.code = FUSE_WRITE, .inode = inode});
      loop->ReplyWrite(seq, 0);
  }

  int32_t OnRead(uint64_t inode,
                 uint64_t offset ATTRIBUTE_UNUSED,
                 uint32_t size ATTRIBUTE_UNUSED,
                 void* data ATTRIBUTE_UNUSED) override {
    requests.push_back({
      .code = FUSE_READ,
      .inode = inode
    });
    return 0;
  void OnRead(uint64_t seq, uint64_t inode, uint64_t offset ATTRIBUTE_UNUSED,
              uint32_t size ATTRIBUTE_UNUSED) override {
      requests.push_back({.code = FUSE_READ, .inode = inode});
      loop->ReplySimple(seq, 0);
  }

  int32_t OnOpen(uint64_t inode) override {
    requests.push_back({
      .code = FUSE_OPEN,
      .inode = inode
    });
    return 0;
  void OnOpen(uint64_t seq, uint64_t inode) override {
      requests.push_back({.code = FUSE_OPEN, .inode = inode});
      loop->ReplyOpen(seq, inode);
  }

  int32_t OnRelease(uint64_t inode) override {
    requests.push_back({
      .code = FUSE_RELEASE,
      .inode = inode
    });
    return 0;
  void OnRelease(uint64_t seq, uint64_t inode) override {
      requests.push_back({.code = FUSE_RELEASE, .inode = inode});
      loop->ReplySimple(seq, 0);
  }
};

class FuseAppLoopTest : public ::testing::Test {
 private:
  std::thread thread_;

 protected:
   std::thread thread_;
   base::unique_fd sockets_[2];
   Callback callback_;
   FuseRequest request_;
   FuseResponse response_;
   std::unique_ptr<FuseAppLoop> loop_;

   void SetUp() override {
       base::SetMinimumLogSeverity(base::VERBOSE);
       ASSERT_TRUE(SetupMessageSockets(&sockets_));
    thread_ = std::thread([this] {
      StartFuseAppLoop(sockets_[1].release(), &callback_);
    });
       loop_.reset(new FuseAppLoop(std::move(sockets_[1])));
       callback_.loop = loop_.get();
       thread_ = std::thread([this] { loop_->Start(&callback_); });
  }

  void CheckCallback(
@@ -300,5 +282,18 @@ TEST_F(FuseAppLoopTest, Write) {
  CheckCallback(sizeof(fuse_write_in), FUSE_WRITE, sizeof(fuse_write_out));
}

TEST_F(FuseAppLoopTest, Break) {
    // Ensure that the loop started.
    request_.Reset(sizeof(fuse_open_in), FUSE_OPEN, 1);
    request_.header.nodeid = 10;
    ASSERT_TRUE(request_.Write(sockets_[0]));
    ASSERT_TRUE(response_.Read(sockets_[0]));

    loop_->Break();
    if (thread_.joinable()) {
        thread_.join();
    }
}

}  // namespace fuse
}  // namespace android