Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 8e16ceec authored by Daichi Hirono's avatar Daichi Hirono
Browse files

Change FuseAppLoop so that it can process messages asynchronously.

Previously FuseAppLoopCallback needs to return values in a synchrnous
manner. The CL changes it to asynchronous mannger so that apps can
process FUSE message asynchrnously.

Bug: 35229514
Test: FuseAppLoopTest
Change-Id: I8edcfdb003a25cfd5e9c490ec871140220b21e35
(cherry picked from commit f5d15f9f)
parent df937b82
Loading
Loading
Loading
Loading
+186 −159
Original line number Diff line number Diff line
@@ -16,205 +16,232 @@

#include "libappfuse/FuseAppLoop.h"

#include <sys/eventfd.h>
#include <sys/stat.h>

#include <android-base/logging.h>
#include <android-base/unique_fd.h>

#include "libappfuse/EpollController.h"

namespace android {
namespace fuse {

namespace {

void HandleLookUp(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
bool HandleLookUp(FuseAppLoop* loop, FuseBuffer* buffer, FuseAppLoopCallback* callback) {
    // AppFuse does not support directory structure now.
    // It can lookup only files under the mount point.
    if (buffer->request.header.nodeid != FUSE_ROOT_ID) {
        LOG(ERROR) << "Nodeid is not FUSE_ROOT_ID.";
    buffer->response.Reset(0, -ENOENT, buffer->request.header.unique);
    return;
        return loop->ReplySimple(buffer->request.header.unique, -ENOENT);
    }

    // Ensure that the filename ends with 0.
  const size_t filename_length =
      buffer->request.header.len - sizeof(fuse_in_header);
    const size_t filename_length = buffer->request.header.len - sizeof(fuse_in_header);
    if (buffer->request.lookup_name[filename_length - 1] != 0) {
        LOG(ERROR) << "File name does not end with 0.";
    buffer->response.Reset(0, -ENOENT, buffer->request.header.unique);
    return;
        return loop->ReplySimple(buffer->request.header.unique, -ENOENT);
    }

  const uint64_t inode =
      static_cast<uint64_t>(atol(buffer->request.lookup_name));
    const uint64_t inode = static_cast<uint64_t>(atol(buffer->request.lookup_name));
    if (inode == 0 || inode == LONG_MAX) {
        LOG(ERROR) << "Invalid filename";
    buffer->response.Reset(0, -ENOENT, buffer->request.header.unique);
    return;
        return loop->ReplySimple(buffer->request.header.unique, -ENOENT);
    }

  const int64_t size = callback->OnGetSize(inode);
  if (size < 0) {
    buffer->response.Reset(0, size, buffer->request.header.unique);
    return;
  }

  buffer->response.Reset(sizeof(fuse_entry_out), 0,
                         buffer->request.header.unique);
  buffer->response.entry_out.nodeid = inode;
  buffer->response.entry_out.attr_valid = 10;
  buffer->response.entry_out.entry_valid = 10;
  buffer->response.entry_out.attr.ino = inode;
  buffer->response.entry_out.attr.mode = S_IFREG | 0777;
  buffer->response.entry_out.attr.size = size;
    callback->OnLookup(buffer->request.header.unique, inode);
    return true;
}

void HandleGetAttr(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  const uint64_t nodeid = buffer->request.header.nodeid;
  int64_t size;
  uint32_t mode;
  if (nodeid == FUSE_ROOT_ID) {
    size = 0;
    mode = S_IFDIR | 0777;
bool HandleGetAttr(FuseAppLoop* loop, FuseBuffer* buffer, FuseAppLoopCallback* callback) {
    if (buffer->request.header.nodeid == FUSE_ROOT_ID) {
        return loop->ReplyGetAttr(buffer->request.header.unique, buffer->request.header.nodeid, 0,
                                  S_IFDIR | 0777);
    } else {
    size = callback->OnGetSize(buffer->request.header.nodeid);
    if (size < 0) {
      buffer->response.Reset(0, size, buffer->request.header.unique);
      return;
        callback->OnGetAttr(buffer->request.header.unique, buffer->request.header.nodeid);
        return true;
    }
    mode = S_IFREG | 0777;
}

  buffer->response.Reset(sizeof(fuse_attr_out), 0,
                         buffer->request.header.unique);
  buffer->response.attr_out.attr_valid = 10;
  buffer->response.attr_out.attr.ino = nodeid;
  buffer->response.attr_out.attr.mode = mode;
  buffer->response.attr_out.attr.size = size;
bool HandleRead(FuseAppLoop* loop, FuseBuffer* buffer, FuseAppLoopCallback* callback) {
    if (buffer->request.read_in.size > kFuseMaxRead) {
        return loop->ReplySimple(buffer->request.header.unique, -EINVAL);
    }

void HandleOpen(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  const int32_t file_handle = callback->OnOpen(buffer->request.header.nodeid);
  if (file_handle < 0) {
    buffer->response.Reset(0, file_handle, buffer->request.header.unique);
    return;
    callback->OnRead(buffer->request.header.unique, buffer->request.header.nodeid,
                     buffer->request.read_in.offset, buffer->request.read_in.size);
    return true;
}
  buffer->response.Reset(sizeof(fuse_open_out), kFuseSuccess,
                         buffer->request.header.unique);
  buffer->response.open_out.fh = file_handle;

bool HandleWrite(FuseAppLoop* loop, FuseBuffer* buffer, FuseAppLoopCallback* callback) {
    if (buffer->request.write_in.size > kFuseMaxWrite) {
        return loop->ReplySimple(buffer->request.header.unique, -EINVAL);
    }

void HandleFsync(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  buffer->response.Reset(0, callback->OnFsync(buffer->request.header.nodeid),
                         buffer->request.header.unique);
    callback->OnWrite(buffer->request.header.unique, buffer->request.header.nodeid,
                      buffer->request.write_in.offset, buffer->request.write_in.size,
                      buffer->request.write_data);
    return true;
}

void HandleRelease(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  buffer->response.Reset(0, callback->OnRelease(buffer->request.header.nodeid),
                         buffer->request.header.unique);
bool HandleMessage(FuseAppLoop* loop, FuseBuffer* buffer, int fd, FuseAppLoopCallback* callback) {
    if (!buffer->request.Read(fd)) {
        return false;
    }

void HandleRead(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  const uint64_t unique = buffer->request.header.unique;
  const uint64_t nodeid = buffer->request.header.nodeid;
  const uint64_t offset = buffer->request.read_in.offset;
  const uint32_t size = buffer->request.read_in.size;
    const uint32_t opcode = buffer->request.header.opcode;
    LOG(VERBOSE) << "Read a fuse packet, opcode=" << opcode;
    switch (opcode) {
        case FUSE_FORGET:
            // Do not reply to FUSE_FORGET.
            return true;

  if (size > kFuseMaxRead) {
    buffer->response.Reset(0, -EINVAL, buffer->request.header.unique);
    return;
  }
        case FUSE_LOOKUP:
            return HandleLookUp(loop, buffer, callback);

  const int32_t read_size = callback->OnRead(nodeid, offset, size,
                                             buffer->response.read_data);
  if (read_size < 0) {
    buffer->response.Reset(0, read_size, buffer->request.header.unique);
    return;
  }
        case FUSE_GETATTR:
            return HandleGetAttr(loop, buffer, callback);

  buffer->response.ResetHeader(read_size, kFuseSuccess, unique);
}
        case FUSE_OPEN:
            callback->OnOpen(buffer->request.header.unique, buffer->request.header.nodeid);
            return true;

void HandleWrite(FuseBuffer* buffer, FuseAppLoopCallback* callback) {
  const uint64_t unique = buffer->request.header.unique;
  const uint64_t nodeid = buffer->request.header.nodeid;
  const uint64_t offset = buffer->request.write_in.offset;
  const uint32_t size = buffer->request.write_in.size;
        case FUSE_READ:
            return HandleRead(loop, buffer, callback);

  if (size > kFuseMaxWrite) {
    buffer->response.Reset(0, -EINVAL, buffer->request.header.unique);
    return;
        case FUSE_WRITE:
            return HandleWrite(loop, buffer, callback);

        case FUSE_RELEASE:
            callback->OnRelease(buffer->request.header.unique, buffer->request.header.nodeid);
            return true;

        case FUSE_FSYNC:
            callback->OnFsync(buffer->request.header.unique, buffer->request.header.nodeid);
            return true;

        default:
            buffer->HandleNotImpl();
            return buffer->response.Write(fd);
    }
}

  const int32_t write_size = callback->OnWrite(nodeid, offset, size,
                                               buffer->request.write_data);
  if (write_size < 0) {
    buffer->response.Reset(0, write_size, buffer->request.header.unique);
    return;
} // namespace

FuseAppLoopCallback::~FuseAppLoopCallback() = default;

FuseAppLoop::FuseAppLoop(base::unique_fd&& fd) : fd_(std::move(fd)) {}

void FuseAppLoop::Break() {
    const int64_t value = 1;
    if (write(break_fd_, &value, sizeof(value)) == -1) {
        PLOG(ERROR) << "Failed to send a break event";
    }
}

  buffer->response.Reset(sizeof(fuse_write_out), kFuseSuccess, unique);
  buffer->response.write_out.size = write_size;
bool FuseAppLoop::ReplySimple(uint64_t unique, int32_t result) {
    if (result == -ENOSYS) {
        // We should not return -ENOSYS because the kernel stops delivering FUSE
        // command after receiving -ENOSYS as a result for the command.
        result = -EBADF;
    }
    FuseSimpleResponse response;
    response.Reset(0, result, unique);
    return response.Write(fd_);
}

} // namespace
bool FuseAppLoop::ReplyLookup(uint64_t unique, uint64_t inode, int64_t size) {
    FuseSimpleResponse response;
    response.Reset(sizeof(fuse_entry_out), 0, unique);
    response.entry_out.nodeid = inode;
    response.entry_out.attr_valid = 10;
    response.entry_out.entry_valid = 10;
    response.entry_out.attr.ino = inode;
    response.entry_out.attr.mode = S_IFREG | 0777;
    response.entry_out.attr.size = size;
    return response.Write(fd_);
}

bool StartFuseAppLoop(int raw_fd, FuseAppLoopCallback* callback) {
  base::unique_fd fd(raw_fd);
  FuseBuffer buffer;
bool FuseAppLoop::ReplyGetAttr(uint64_t unique, uint64_t inode, int64_t size, int mode) {
    CHECK(mode == (S_IFREG | 0777) || mode == (S_IFDIR | 0777));
    FuseSimpleResponse response;
    response.Reset(sizeof(fuse_attr_out), 0, unique);
    response.attr_out.attr_valid = 10;
    response.attr_out.attr.ino = inode;
    response.attr_out.attr.mode = mode;
    response.attr_out.attr.size = size;
    return response.Write(fd_);
}

  LOG(DEBUG) << "Start fuse loop.";
  while (callback->IsActive()) {
    if (!buffer.request.Read(fd)) {
      return false;
bool FuseAppLoop::ReplyOpen(uint64_t unique, uint64_t fh) {
    FuseSimpleResponse response;
    response.Reset(sizeof(fuse_open_out), kFuseSuccess, unique);
    response.open_out.fh = fh;
    return response.Write(fd_);
}

    const uint32_t opcode = buffer.request.header.opcode;
    LOG(VERBOSE) << "Read a fuse packet, opcode=" << opcode;
    switch (opcode) {
      case FUSE_FORGET:
        // Do not reply to FUSE_FORGET.
        continue;
bool FuseAppLoop::ReplyWrite(uint64_t unique, uint32_t size) {
    CHECK(size <= kFuseMaxWrite);
    FuseSimpleResponse response;
    response.Reset(sizeof(fuse_write_out), kFuseSuccess, unique);
    response.write_out.size = size;
    return response.Write(fd_);
}

      case FUSE_LOOKUP:
        HandleLookUp(&buffer, callback);
        break;
bool FuseAppLoop::ReplyRead(uint64_t unique, uint32_t size, const void* data) {
    CHECK(size <= kFuseMaxRead);
    FuseSimpleResponse response;
    response.ResetHeader(size, kFuseSuccess, unique);
    return response.WriteWithBody(fd_, sizeof(FuseResponse), data);
}

      case FUSE_GETATTR:
        HandleGetAttr(&buffer, callback);
        break;
void FuseAppLoop::Start(FuseAppLoopCallback* callback) {
    break_fd_.reset(eventfd(/* initval */ 0, EFD_CLOEXEC));
    if (break_fd_.get() == -1) {
        PLOG(ERROR) << "Failed to open FD for break event";
        return;
    }

      case FUSE_OPEN:
        HandleOpen(&buffer, callback);
        break;
    base::unique_fd epoll_fd(epoll_create1(EPOLL_CLOEXEC));
    if (epoll_fd.get() == -1) {
        PLOG(ERROR) << "Failed to open FD for epoll";
        return;
    }

      case FUSE_READ:
        HandleRead(&buffer, callback);
        break;
    int last_event;
    int break_event;

      case FUSE_WRITE:
        HandleWrite(&buffer, callback);
        break;
    std::unique_ptr<EpollController> epoll_controller(new EpollController(std::move(epoll_fd)));
    if (!epoll_controller->AddFd(fd_, EPOLLIN, &last_event)) {
        return;
    }
    if (!epoll_controller->AddFd(break_fd_, EPOLLIN, &break_event)) {
        return;
    }

      case FUSE_RELEASE:
        HandleRelease(&buffer, callback);
        break;
    last_event = 0;
    break_event = 0;

      case FUSE_FSYNC:
        HandleFsync(&buffer, callback);
    FuseBuffer buffer;
    while (true) {
        if (!epoll_controller->Wait(1)) {
            break;
        }
        last_event = 0;
        *reinterpret_cast<int*>(epoll_controller->events()[0].data.ptr) =
            epoll_controller->events()[0].events;

      default:
        buffer.HandleNotImpl();
        if (break_event != 0 || (last_event & ~EPOLLIN) != 0) {
            break;
        }

    if (!buffer.response.Write(fd)) {
      LOG(ERROR) << "Failed to write a response to the device.";
      return false;
        if (!HandleMessage(this, &buffer, fd_, callback)) {
            break;
        }
    }

  return true;
    LOG(VERBOSE) << "FuseAppLoop exit";
}

}  // namespace fuse
+38 −10
Original line number Diff line number Diff line
@@ -17,23 +17,51 @@
#ifndef ANDROID_LIBAPPFUSE_FUSEAPPLOOP_H_
#define ANDROID_LIBAPPFUSE_FUSEAPPLOOP_H_

#include <memory>
#include <mutex>

#include <android-base/unique_fd.h>

#include "libappfuse/FuseBuffer.h"

namespace android {
namespace fuse {

class EpollController;

class FuseAppLoopCallback {
 public:
  virtual bool IsActive() = 0;
  virtual int64_t OnGetSize(uint64_t inode) = 0;
  virtual int32_t OnFsync(uint64_t inode) = 0;
  virtual int32_t OnWrite(
      uint64_t inode, uint64_t offset, uint32_t size, const void* data) = 0;
  virtual int32_t OnRead(
      uint64_t inode, uint64_t offset, uint32_t size, void* data) = 0;
  virtual int32_t OnOpen(uint64_t inode) = 0;
  virtual int32_t OnRelease(uint64_t inode) = 0;
  virtual ~FuseAppLoopCallback() = default;
   virtual void OnLookup(uint64_t unique, uint64_t inode) = 0;
   virtual void OnGetAttr(uint64_t unique, uint64_t inode) = 0;
   virtual void OnFsync(uint64_t unique, uint64_t inode) = 0;
   virtual void OnWrite(uint64_t unique, uint64_t inode, uint64_t offset, uint32_t size,
                        const void* data) = 0;
   virtual void OnRead(uint64_t unique, uint64_t inode, uint64_t offset, uint32_t size) = 0;
   virtual void OnOpen(uint64_t unique, uint64_t inode) = 0;
   virtual void OnRelease(uint64_t unique, uint64_t inode) = 0;
   virtual ~FuseAppLoopCallback();
};

class FuseAppLoop final {
  public:
    FuseAppLoop(base::unique_fd&& fd);

    void Start(FuseAppLoopCallback* callback);
    void Break();

    bool ReplySimple(uint64_t unique, int32_t result);
    bool ReplyLookup(uint64_t unique, uint64_t inode, int64_t size);
    bool ReplyGetAttr(uint64_t unique, uint64_t inode, int64_t size, int mode);
    bool ReplyOpen(uint64_t unique, uint64_t fh);
    bool ReplyWrite(uint64_t unique, uint32_t size);
    bool ReplyRead(uint64_t unique, uint32_t size, const void* data);

  private:
    base::unique_fd fd_;
    base::unique_fd break_fd_;

    // Lock for multi-threading.
    std::mutex mutex_;
};

bool StartFuseAppLoop(int fd, FuseAppLoopCallback* callback);
+53 −58
Original line number Diff line number Diff line
@@ -23,6 +23,9 @@
#include <gtest/gtest.h>
#include <thread>

#include "libappfuse/EpollController.h"
#include "libappfuse/FuseBridgeLoop.h"

namespace android {
namespace fuse {
namespace {
@@ -37,82 +40,61 @@ struct CallbackRequest {
class Callback : public FuseAppLoopCallback {
 public:
  std::vector<CallbackRequest> requests;
  FuseAppLoop* loop;

  bool IsActive() override {
    return true;
  void OnGetAttr(uint64_t seq, uint64_t inode) override {
      EXPECT_NE(FUSE_ROOT_ID, static_cast<int>(inode));
      EXPECT_TRUE(loop->ReplyGetAttr(seq, inode, kTestFileSize, S_IFREG | 0777));
  }

  int64_t OnGetSize(uint64_t inode) override {
    if (inode == FUSE_ROOT_ID) {
      return 0;
    } else {
      return kTestFileSize;
    }
  void OnLookup(uint64_t unique, uint64_t inode) override {
      EXPECT_NE(FUSE_ROOT_ID, static_cast<int>(inode));
      EXPECT_TRUE(loop->ReplyLookup(unique, inode, kTestFileSize));
  }

  int32_t OnFsync(uint64_t inode) override {
    requests.push_back({
      .code = FUSE_FSYNC,
      .inode = inode
    });
    return 0;
  void OnFsync(uint64_t seq, uint64_t inode) override {
      requests.push_back({.code = FUSE_FSYNC, .inode = inode});
      loop->ReplySimple(seq, 0);
  }

  int32_t OnWrite(uint64_t inode,
                  uint64_t offset ATTRIBUTE_UNUSED,
                  uint32_t size ATTRIBUTE_UNUSED,
                  const void* data ATTRIBUTE_UNUSED) override {
    requests.push_back({
      .code = FUSE_WRITE,
      .inode = inode
    });
    return 0;
  void OnWrite(uint64_t seq, uint64_t inode, uint64_t offset ATTRIBUTE_UNUSED,
               uint32_t size ATTRIBUTE_UNUSED, const void* data ATTRIBUTE_UNUSED) override {
      requests.push_back({.code = FUSE_WRITE, .inode = inode});
      loop->ReplyWrite(seq, 0);
  }

  int32_t OnRead(uint64_t inode,
                 uint64_t offset ATTRIBUTE_UNUSED,
                 uint32_t size ATTRIBUTE_UNUSED,
                 void* data ATTRIBUTE_UNUSED) override {
    requests.push_back({
      .code = FUSE_READ,
      .inode = inode
    });
    return 0;
  void OnRead(uint64_t seq, uint64_t inode, uint64_t offset ATTRIBUTE_UNUSED,
              uint32_t size ATTRIBUTE_UNUSED) override {
      requests.push_back({.code = FUSE_READ, .inode = inode});
      loop->ReplySimple(seq, 0);
  }

  int32_t OnOpen(uint64_t inode) override {
    requests.push_back({
      .code = FUSE_OPEN,
      .inode = inode
    });
    return 0;
  void OnOpen(uint64_t seq, uint64_t inode) override {
      requests.push_back({.code = FUSE_OPEN, .inode = inode});
      loop->ReplyOpen(seq, inode);
  }

  int32_t OnRelease(uint64_t inode) override {
    requests.push_back({
      .code = FUSE_RELEASE,
      .inode = inode
    });
    return 0;
  void OnRelease(uint64_t seq, uint64_t inode) override {
      requests.push_back({.code = FUSE_RELEASE, .inode = inode});
      loop->ReplySimple(seq, 0);
  }
};

class FuseAppLoopTest : public ::testing::Test {
 private:
  std::thread thread_;

 protected:
   std::thread thread_;
   base::unique_fd sockets_[2];
   Callback callback_;
   FuseRequest request_;
   FuseResponse response_;
   std::unique_ptr<FuseAppLoop> loop_;

   void SetUp() override {
       base::SetMinimumLogSeverity(base::VERBOSE);
       ASSERT_TRUE(SetupMessageSockets(&sockets_));
    thread_ = std::thread([this] {
      StartFuseAppLoop(sockets_[1].release(), &callback_);
    });
       loop_.reset(new FuseAppLoop(std::move(sockets_[1])));
       callback_.loop = loop_.get();
       thread_ = std::thread([this] { loop_->Start(&callback_); });
  }

  void CheckCallback(
@@ -300,5 +282,18 @@ TEST_F(FuseAppLoopTest, Write) {
  CheckCallback(sizeof(fuse_write_in), FUSE_WRITE, sizeof(fuse_write_out));
}

TEST_F(FuseAppLoopTest, Break) {
    // Ensure that the loop started.
    request_.Reset(sizeof(fuse_open_in), FUSE_OPEN, 1);
    request_.header.nodeid = 10;
    ASSERT_TRUE(request_.Write(sockets_[0]));
    ASSERT_TRUE(response_.Read(sockets_[0]));

    loop_->Break();
    if (thread_.joinable()) {
        thread_.join();
    }
}

}  // namespace fuse
}  // namespace android