Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a5830107 authored by Erwin Jansen's avatar Erwin Jansen
Browse files

Introduce abstract datachannels in Root Canal.

This introduces a set of interfaces that abstracts the network layer.
This makes it possible to use different communications channels
(gRPC/qemu pipe) and allows us to easily port root canal to other
platforms (windows/macos).

The following interfaces are introduced:

- AsyncDataChannel: A thing that can send/receive bytes.
- AsyncDataChannelServer: A thing that can accept incoming AsyncDataChannels.
- AsyncDataChannelConnector: A thing that can make outgoing connection

There is an implementation for Posix sockets, that can be used on both
Linux & Darwin (+M1).

Test: Unit tests for basic posix sockets.
Bug: 186567864
Change-Id: Ie1c2ac05abaaec691985565ed523bc65e04305eb
parent 7feae21a
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -48,6 +48,9 @@ cc_library_static {
        "model/setup/test_channel_transport.cc",
        "model/setup/test_command_handler.cc",
        "model/setup/test_model.cc",
        "net/posix/posix_async_socket.cc",
        "net/posix/posix_async_socket_connector.cc",
        "net/posix/posix_async_socket_server.cc",
        ":BluetoothPacketSources",
        ":BluetoothHciClassSources",
    ],
@@ -102,6 +105,7 @@ cc_test_host {
    srcs: [
        "test/async_manager_unittest.cc",
        "test/h4_parser_unittest.cc",
        "test/posix_socket_unittest.cc",
        "test/security_manager_unittest.cc",
    ],
    header_libs: [
+89 −0
Original line number Diff line number Diff line
// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#ifdef _WIN32
#include "msvc-posix.h"
#endif

#include <cstdint>
#include <functional>
#include <memory>

namespace android {
namespace net {

class AsyncDataChannel;

// Callback function that will be used to notify that new data
// can be read.
using ReadCallback = std::function<void(AsyncDataChannel*)>;

// A connected asynchronous socket abstraction.
//
// This is really a simple data channel that can be used to read and write
// data. Async Sockets are usually non-blocking posix/win sockets, but could be
// other types of datachannels (gRPC, qemu pipe)
class AsyncDataChannel {
 public:
  virtual ~AsyncDataChannel() = default;

  // Receive data in the given buffer. Properly handling EINTR where
  // applicable.
  //
  // Returns:
  // - >0: The number of bytes read.
  // -  0: The socket is closed, no further reads/write are possible
  // - <0: An error occurred. Details can be found in errno:
  //    -  EAGAIN: No data, try again later.
  //
  // Implementors should take care of translating EWOULDBLOCK into EAGAIN
  // if needed.
  virtual ssize_t Recv(uint8_t* buffer, uint64_t bufferSize) = 0;

  // Send data in the given buffer. Properly handling EINTR, EPIPE where
  // applicable.
  //
  // Returns:
  // - >0: The number of bytes written, this can be < bufferSize.
  // - <0: An error occurred. Details can be found in errno:
  //    - EAGAIN: The write would block, try again later.
  //    - EBADF: The connection is closed.
  //
  // Implementors should take care of translating EWOULDBLOCK into EAGAIN
  // if needed.
  virtual ssize_t Send(const uint8_t* buffer, uint64_t bufferSize) = 0;

  // True if this socket is connected
  virtual bool Connected() = 0;

  // Closes this socket. Upon return the following will hold:
  //
  // - No more ReadCallbacks will be invoked.
  // - Send/Recv calls will return 0.
  virtual void Close() = 0;

  // Registers the given callback to be invoked when a recv call can be made
  // to read data from this socket. The expectation is that a call to Recv will
  // not return EAGAIN. Returns false if registration of the watcher failed.
  //
  // Only one callback can be registered per socket.
  virtual bool WatchForNonBlockingRead(
      const ReadCallback& on_read_ready_callback) = 0;

  // Stops watching this socket, you will not receive any callbacks any longer.
  virtual void StopWatching() = 0;
};

}  // namespace net
}  // namespace android
 No newline at end of file
+43 −0
Original line number Diff line number Diff line

// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <chrono>
#include <string>

#include "net/async_data_channel.h"

namespace android {
namespace net {

using namespace std::chrono_literals;

// An AsyncDataChannelConnector is capable of connecting to a remote server.
class AsyncDataChannelConnector {
 public:
  virtual ~AsyncDataChannelConnector() = default;

  // Blocks and waits until a connection to the remote server has been
  // established, or a timeout has been reached. This function should
  // not return nullptr, but a DataChannel in disconnected state in case of
  // failure.
  //
  // In case of a disconnected DataChannel (socket->Connected() == false)
  // the errno variable can be set with the encountered error.
  virtual std::shared_ptr<AsyncDataChannel> ConnectToRemoteServer(
      const std::string& server, int port,
      const std::chrono::milliseconds timeout = 5000ms) = 0;
};
}  // namespace net
}  // namespace android
+74 −0
Original line number Diff line number Diff line
// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include <functional>
#include <memory>

#include "net/async_data_channel.h"

namespace android {
namespace net {

class AsyncDataChannelServer;

// Callback thas is called when a new client connection has been accepted.
using ConnectCallback = std::function<void(std::shared_ptr<AsyncDataChannel>,
                                           AsyncDataChannelServer* server)>;

// An AsyncDataChannelServer is capable of listening to incoming connections.
//
// A Callback will be invoked whenever a new connection has been accepted.
class AsyncDataChannelServer {
 public:
  // Destructor.
  virtual ~AsyncDataChannelServer() = default;

  // Start listening for new connections. The callback will be invoked
  // when a new socket has been accepted.
  //
  // errno will be set in case of failure.
  virtual bool StartListening() = 0;

  // Stop listening for new connections. The callback will not be
  // invoked, and sockets will not be accepted.
  //
  // This DOES not disconnect the server, and connections can still
  // be queued up.
  virtual void StopListening() = 0;

  // Disconnects the server, no new connections are possible.
  // The callback will never be invoked again.
  virtual void Close() = 0;

  // True if this server is connected and can accept incoming
  // connections.
  virtual bool Connected() = 0;

  // Registers the callback that should be invoked whenever a new socket was
  // accepted.
  //
  // Before the callback the server should have stopped listening for new
  // incoming connections. The callee is responsible for calling StartListening
  // if needed.
  void SetOnConnectCallback(const ConnectCallback& callback) {
    callback_ = callback;
  };

 protected:
  ConnectCallback callback_;
};

}  // namespace net
}  // namespace android
+143 −0
Original line number Diff line number Diff line
// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "net/posix/posix_async_socket.h"

#include <errno.h>       // for errno
#include <fcntl.h>       // for fcntl, FD_CLOEXEC, F_GETFL
#include <string.h>      // for strerror
#include <sys/socket.h>  // for getsockopt, send, MSG_NOSIGNAL
#include <unistd.h>      // for close, read

#include <functional>  // for __base

#include "model/setup/async_manager.h"  // for AsyncManager
#include "os/log.h"                     // for LOG_INFO
#include "osi/include/osi.h"            // for OSI_NO_INTR

/* set  for very verbose debugging */
#ifndef DEBUG
#define DD(...) (void)0
#else
#define DD(...) LOG_INFO(__VA_ARGS__)
#endif

namespace android {
namespace net {

PosixAsyncSocket::PosixAsyncSocket(int fd, AsyncManager* am)
    : fd_(fd), am_(am), watching_(false) {
  int flags = fcntl(fd, F_GETFL);
  fcntl(fd, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC);
#ifdef SO_NOSIGPIPE
  // Disable SIGPIPE generation on Darwin.
  // When writing to a broken pipe, send() will return -1 and
  // set errno to EPIPE.
  flags = 1;
  setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, (const char*)&flags, sizeof(flags));
#endif
}

PosixAsyncSocket::PosixAsyncSocket(PosixAsyncSocket&& other) {
  fd_ = other.fd_;
  watching_ = other.watching_.load();
  am_ = other.am_;

  other.fd_ = -1;
  other.watching_ = false;
}
PosixAsyncSocket::~PosixAsyncSocket() { Close(); }

ssize_t PosixAsyncSocket::Recv(uint8_t* buffer, uint64_t bufferSize) {
  errno = 0;
  ssize_t res = 0;
  OSI_NO_INTR(res = read(fd_, buffer, bufferSize));
  DD("%zd bytes (%d)", res, fd_);
  return res;
};

ssize_t PosixAsyncSocket::Send(const uint8_t* buffer, uint64_t bufferSize) {
  errno = 0;
  ssize_t res = 0;
#ifdef MSG_NOSIGNAL
  // Prevent SIGPIPE generation on Linux when writing to a broken pipe.
  // ::send() will return -1/EPIPE instead.
  const int sendFlags = MSG_NOSIGNAL;
#else
  // For Darwin, this is handled by setting SO_NOSIGPIPE when creating
  // the socket.
  const int sendFlags = 0;
#endif
  OSI_NO_INTR(res = send(fd_, buffer, bufferSize, sendFlags));

  DD("%zd bytes (%d)", res, fd_);
  return res;
}

bool PosixAsyncSocket::Connected() {
  if (fd_ == -1) {
    return false;
  }

  int error_code = 0;
  socklen_t error_code_size = sizeof(error_code);
  auto opt = getsockopt(fd_, SOL_SOCKET, SO_ERROR,
                        reinterpret_cast<void*>(&error_code), &error_code_size);
  DD("Status: %d, %s (%d)", opt, strerror(error_code), fd_);
  return !(opt < 0 || error_code);
}

void PosixAsyncSocket::Close() {
  if (fd_ == -1) {
    return;
  }

  StopWatching();

  // Clear out error
  int error_code = 0;
  socklen_t error_code_size = sizeof(error_code);
  getsockopt(fd_, SOL_SOCKET, SO_ERROR, reinterpret_cast<void*>(&error_code),
             &error_code_size);

  // shutdown sockets if possible,
  OSI_NO_INTR(shutdown(fd_, SHUT_RDWR));

  error_code = ::close(fd_);
  if (error_code == -1) {
    LOG_INFO("Failed to close: %s (%d)", strerror(errno), fd_);
  }
  LOG_INFO("(%d)", fd_);
  fd_ = -1;
}

bool PosixAsyncSocket::WatchForNonBlockingRead(
    const ReadCallback& on_read_ready_callback) {
  bool expected = false;
  if (watching_.compare_exchange_strong(expected, true)) {
    return am_->WatchFdForNonBlockingReads(
               fd_, [on_read_ready_callback, this](int fd) {
                 on_read_ready_callback(this);
               }) == 0;
  }
  return false;
}

void PosixAsyncSocket::StopWatching() {
  bool expected = true;
  if (watching_.compare_exchange_strong(expected, false)) {
    am_->StopWatchingFileDescriptor(fd_);
  }
}
}  // namespace net
}  // namespace android
Loading