Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e4501e54 authored by Myles Watson's avatar Myles Watson
Browse files

Start with async and HCI

Bug: 205758693
Test: unit tests
Change-Id: I16a8be44bce5f2d233582ab6db17c30d068fa9c0
parent 10d29e49
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@
aidl_interface {
    name: "android.hardware.bluetooth",
    vendor_available: true,
    host_supported: true,
    srcs: ["android/hardware/bluetooth/*.aidl"],
    stability: "vintf",
    backend: {
+41 −0
Original line number Diff line number Diff line
package {
    default_applicable_licenses: ["hardware_interfaces_license"],
}

cc_library_static {
    name: "android.hardware.bluetooth.async",
    vendor_available: true,
    host_supported: true,
    srcs: [
        "async_fd_watcher.cc",
    ],
    export_include_dirs: ["."],
    shared_libs: [
        "liblog",
    ],
    cflags: [
        "-Wall",
        "-Werror",
    ],
}

cc_test {
    name: "bluetooth-vendor-interface-async-test",
    host_supported: true,
    srcs: [
        "test/async_fd_watcher_unittest.cc",
    ],
    shared_libs: [
        "liblog",
        "libutils",
    ],
    static_libs: [
        "android.hardware.bluetooth.async",
        "libgmock",
    ],
    cflags: [
        "-Wall",
        "-Werror",
    ],
    test_suites: ["general-tests"],
}
+176 −0
Original line number Diff line number Diff line
/*
 * Copyright 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "async_fd_watcher.h"

#include <string.h>

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <map>
#include <mutex>
#include <thread>
#include <vector>

#include "fcntl.h"
#include "log/log.h"
#include "sys/select.h"
#include "unistd.h"

static const int INVALID_FD = -1;

namespace android::hardware::bluetooth::async {

int AsyncFdWatcher::WatchFdForNonBlockingReads(
    int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
  // Add file descriptor and callback
  {
    std::unique_lock<std::mutex> guard(internal_mutex_);
    watched_fds_[file_descriptor] = on_read_fd_ready_callback;
  }

  // Start the thread if not started yet
  return tryStartThread();
}

int AsyncFdWatcher::ConfigureTimeout(
    const std::chrono::milliseconds timeout,
    const TimeoutCallback& on_timeout_callback) {
  // Add timeout and callback
  {
    std::unique_lock<std::mutex> guard(timeout_mutex_);
    timeout_cb_ = on_timeout_callback;
    timeout_ms_ = timeout;
  }

  notifyThread();
  return 0;
}

void AsyncFdWatcher::StopWatchingFileDescriptors() { stopThread(); }

AsyncFdWatcher::~AsyncFdWatcher() {}

// Make sure to call this with at least one file descriptor ready to be
// watched upon or the thread routine will return immediately
int AsyncFdWatcher::tryStartThread() {
  if (std::atomic_exchange(&running_, true)) return 0;

  // Set up the communication channel
  int pipe_fds[2];
  if (pipe2(pipe_fds, O_NONBLOCK)) return -1;

  notification_listen_fd_ = pipe_fds[0];
  notification_write_fd_ = pipe_fds[1];

  thread_ = std::thread([this]() { ThreadRoutine(); });
  if (!thread_.joinable()) return -1;

  return 0;
}

int AsyncFdWatcher::stopThread() {
  if (!std::atomic_exchange(&running_, false)) return 0;

  notifyThread();
  if (std::this_thread::get_id() != thread_.get_id()) {
    thread_.join();
  }

  {
    std::unique_lock<std::mutex> guard(internal_mutex_);
    watched_fds_.clear();
  }

  {
    std::unique_lock<std::mutex> guard(timeout_mutex_);
    timeout_cb_ = nullptr;
  }

  close(notification_listen_fd_);
  close(notification_write_fd_);

  return 0;
}

int AsyncFdWatcher::notifyThread() {
  uint8_t buffer[] = {0};
  if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
    return -1;
  }
  return 0;
}

void AsyncFdWatcher::ThreadRoutine() {
  while (running_) {
    fd_set read_fds;
    FD_ZERO(&read_fds);
    FD_SET(notification_listen_fd_, &read_fds);
    int max_read_fd = INVALID_FD;
    for (auto& it : watched_fds_) {
      FD_SET(it.first, &read_fds);
      max_read_fd = std::max(max_read_fd, it.first);
    }

    struct timeval timeout;
    struct timeval* timeout_ptr = NULL;
    if (timeout_ms_ > std::chrono::milliseconds(0)) {
      timeout.tv_sec = timeout_ms_.count() / 1000;
      timeout.tv_usec = (timeout_ms_.count() % 1000) * 1000;
      timeout_ptr = &timeout;
    }

    // Wait until there is data available to read on some FD.
    int nfds = std::max(notification_listen_fd_, max_read_fd);
    int retval = select(nfds + 1, &read_fds, NULL, NULL, timeout_ptr);

    // There was some error.
    if (retval < 0) continue;

    // Timeout.
    if (retval == 0) {
      // Allow the timeout callback to modify the timeout.
      TimeoutCallback saved_cb;
      {
        std::unique_lock<std::mutex> guard(timeout_mutex_);
        if (timeout_ms_ > std::chrono::milliseconds(0)) saved_cb = timeout_cb_;
      }
      if (saved_cb != nullptr) saved_cb();
      continue;
    }

    // Read data from the notification FD.
    if (FD_ISSET(notification_listen_fd_, &read_fds)) {
      char buffer[] = {0};
      TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer, 1));
      continue;
    }

    // Invoke the data ready callbacks if appropriate.
    {
      // Hold the mutex to make sure that the callbacks are still valid.
      std::unique_lock<std::mutex> guard(internal_mutex_);
      for (auto& it : watched_fds_) {
        if (FD_ISSET(it.first, &read_fds)) {
          it.second(it.first);
        }
      }
    }
  }
}

}  // namespace android::hardware::bluetooth::async
+60 −0
Original line number Diff line number Diff line
/*
 * Copyright 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <map>
#include <mutex>
#include <thread>

namespace android::hardware::bluetooth::async {

using ReadCallback = std::function<void(int)>;
using TimeoutCallback = std::function<void(void)>;

class AsyncFdWatcher {
 public:
  AsyncFdWatcher() = default;
  ~AsyncFdWatcher();

  int WatchFdForNonBlockingReads(int file_descriptor,
                                 const ReadCallback& on_read_fd_ready_callback);
  int ConfigureTimeout(const std::chrono::milliseconds timeout,
                       const TimeoutCallback& on_timeout_callback);
  void StopWatchingFileDescriptors();

 private:
  AsyncFdWatcher(const AsyncFdWatcher&) = delete;
  AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;

  int tryStartThread();
  int stopThread();
  int notifyThread();
  void ThreadRoutine();

  std::atomic_bool running_{false};
  std::thread thread_;
  std::mutex internal_mutex_;
  std::mutex timeout_mutex_;

  std::map<int, ReadCallback> watched_fds_;
  int notification_listen_fd_;
  int notification_write_fd_;
  TimeoutCallback timeout_cb_;
  std::chrono::milliseconds timeout_ms_;
};

}  // namespace android::hardware::bluetooth::async
+377 −0
Original line number Diff line number Diff line
/*
 * Copyright 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define LOG_TAG "async_fd_watcher_unittest"

#include "async_fd_watcher.h"

#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <log/log.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include <cstdint>
#include <cstring>
#include <vector>

namespace android::hardware::bluetooth::async_test {

using android::hardware::bluetooth::async::AsyncFdWatcher;

class AsyncFdWatcherSocketTest : public ::testing::Test {
 public:
  static const uint16_t kPort = 6111;
  static const size_t kBufferSize = 16;

  bool CheckBufferEquals() {
    return strcmp(server_buffer_, client_buffer_) == 0;
  }

 protected:
  int StartServer() {
    ALOGD("%s", __func__);
    struct sockaddr_in serv_addr;
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    EXPECT_FALSE(fd < 0);

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    serv_addr.sin_port = htons(kPort);
    int reuse_flag = 1;
    EXPECT_FALSE(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_flag,
                            sizeof(reuse_flag)) < 0);
    EXPECT_FALSE(bind(fd, (sockaddr*)&serv_addr, sizeof(serv_addr)) < 0);

    ALOGD("%s before listen", __func__);
    listen(fd, 1);
    return fd;
  }

  int AcceptConnection(int fd) {
    ALOGD("%s", __func__);
    struct sockaddr_in cli_addr;
    memset(&cli_addr, 0, sizeof(cli_addr));
    socklen_t clilen = sizeof(cli_addr);

    int connection_fd = accept(fd, (struct sockaddr*)&cli_addr, &clilen);
    EXPECT_FALSE(connection_fd < 0);

    return connection_fd;
  }

  void ReadIncomingMessage(int fd) {
    ALOGD("%s", __func__);
    int n = TEMP_FAILURE_RETRY(read(fd, server_buffer_, kBufferSize - 1));
    EXPECT_FALSE(n < 0);

    if (n == 0) {  // got EOF
      ALOGD("%s: EOF", __func__);
    } else {
      ALOGD("%s: Got something", __func__);
      n = write(fd, "1", 1);
    }
  }

  void SetUp() override {
    ALOGD("%s", __func__);
    memset(server_buffer_, 0, kBufferSize);
    memset(client_buffer_, 0, kBufferSize);
  }

  void ConfigureServer() {
    socket_fd_ = StartServer();

    conn_watcher_.WatchFdForNonBlockingReads(socket_fd_, [this](int fd) {
      int connection_fd = AcceptConnection(fd);
      ALOGD("%s: Conn_watcher fd = %d", __func__, fd);

      conn_watcher_.ConfigureTimeout(std::chrono::seconds(0), []() {
        bool connection_timeout_cleared = false;
        ASSERT_TRUE(connection_timeout_cleared);
      });

      ALOGD("%s: 3", __func__);
      async_fd_watcher_.WatchFdForNonBlockingReads(
          connection_fd, [this](int fd) { ReadIncomingMessage(fd); });

      // Time out if it takes longer than a second.
      SetTimeout(std::chrono::seconds(1));
    });
    conn_watcher_.ConfigureTimeout(std::chrono::seconds(1), []() {
      bool connection_timeout = true;
      ASSERT_FALSE(connection_timeout);
    });
  }

  void CleanUpServer() {
    async_fd_watcher_.StopWatchingFileDescriptors();
    conn_watcher_.StopWatchingFileDescriptors();
    close(socket_fd_);
  }

  void TearDown() override {
    ALOGD("%s 3", __func__);
    EXPECT_TRUE(CheckBufferEquals());
  }

  void OnTimeout() {
    ALOGD("%s", __func__);
    timed_out_ = true;
  }

  void ClearTimeout() {
    ALOGD("%s", __func__);
    timed_out_ = false;
  }

  bool TimedOut() {
    ALOGD("%s %d", __func__, timed_out_ ? 1 : 0);
    return timed_out_;
  }

  void SetTimeout(std::chrono::milliseconds timeout_ms) {
    ALOGD("%s", __func__);
    async_fd_watcher_.ConfigureTimeout(timeout_ms, [this]() { OnTimeout(); });
    ClearTimeout();
  }

  int ConnectClient() {
    ALOGD("%s", __func__);
    int socket_cli_fd = socket(AF_INET, SOCK_STREAM, 0);
    EXPECT_FALSE(socket_cli_fd < 0);

    struct sockaddr_in serv_addr;
    memset((void*)&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    serv_addr.sin_port = htons(kPort);

    int result =
        connect(socket_cli_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
    EXPECT_FALSE(result < 0);

    return socket_cli_fd;
  }

  void WriteFromClient(int socket_cli_fd) {
    ALOGD("%s", __func__);
    strcpy(client_buffer_, "1");
    int n = write(socket_cli_fd, client_buffer_, strlen(client_buffer_));
    EXPECT_TRUE(n > 0);
  }

  void AwaitServerResponse(int socket_cli_fd) {
    ALOGD("%s", __func__);
    int n = read(socket_cli_fd, client_buffer_, 1);
    ALOGD("%s done", __func__);
    EXPECT_TRUE(n > 0);
  }

 private:
  AsyncFdWatcher async_fd_watcher_;
  AsyncFdWatcher conn_watcher_;
  int socket_fd_;
  char server_buffer_[kBufferSize];
  char client_buffer_[kBufferSize];
  bool timed_out_;
};

// Use a single AsyncFdWatcher to signal a connection to the server socket.
TEST_F(AsyncFdWatcherSocketTest, Connect) {
  int socket_fd = StartServer();

  AsyncFdWatcher conn_watcher;
  conn_watcher.WatchFdForNonBlockingReads(socket_fd, [this](int fd) {
    int connection_fd = AcceptConnection(fd);
    close(connection_fd);
  });

  // Fail if the client doesn't connect within 1 second.
  conn_watcher.ConfigureTimeout(std::chrono::seconds(1), []() {
    bool connection_timeout = true;
    ASSERT_FALSE(connection_timeout);
  });

  int socket_cli_fd = ConnectClient();
  conn_watcher.StopWatchingFileDescriptors();
  close(socket_fd);
  close(socket_cli_fd);
}

// Use a single AsyncFdWatcher to signal a connection to the server socket.
TEST_F(AsyncFdWatcherSocketTest, TimedOutConnect) {
  int socket_fd = StartServer();
  bool timed_out = false;
  bool* timeout_ptr = &timed_out;

  AsyncFdWatcher conn_watcher;
  conn_watcher.WatchFdForNonBlockingReads(socket_fd, [this](int fd) {
    int connection_fd = AcceptConnection(fd);
    close(connection_fd);
  });

  // Set the timeout flag after 100ms.
  conn_watcher.ConfigureTimeout(std::chrono::milliseconds(100),
                                [timeout_ptr]() { *timeout_ptr = true; });
  EXPECT_FALSE(timed_out);
  sleep(1);
  EXPECT_TRUE(timed_out);
  conn_watcher.StopWatchingFileDescriptors();
  close(socket_fd);
}

// Modify the timeout in a timeout callback.
TEST_F(AsyncFdWatcherSocketTest, TimedOutSchedulesTimeout) {
  int socket_fd = StartServer();
  bool timed_out = false;
  bool timed_out2 = false;

  AsyncFdWatcher conn_watcher;
  conn_watcher.WatchFdForNonBlockingReads(socket_fd, [this](int fd) {
    int connection_fd = AcceptConnection(fd);
    close(connection_fd);
  });

  // Set a timeout flag in each callback.
  conn_watcher.ConfigureTimeout(std::chrono::milliseconds(500),
                                [&conn_watcher, &timed_out, &timed_out2]() {
                                  timed_out = true;
                                  conn_watcher.ConfigureTimeout(
                                      std::chrono::seconds(1),
                                      [&timed_out2]() { timed_out2 = true; });
                                });
  EXPECT_FALSE(timed_out);
  EXPECT_FALSE(timed_out2);
  sleep(1);
  EXPECT_TRUE(timed_out);
  EXPECT_FALSE(timed_out2);
  sleep(1);
  EXPECT_TRUE(timed_out);
  EXPECT_TRUE(timed_out2);
  conn_watcher.StopWatchingFileDescriptors();
  close(socket_fd);
}

MATCHER_P(ReadAndMatchSingleChar, byte,
          "Reads a byte from the file descriptor and matches the value against "
          "byte") {
  char inbuf[1] = {0};

  int n = TEMP_FAILURE_RETRY(read(arg, inbuf, 1));

  TEMP_FAILURE_RETRY(write(arg, inbuf, 1));
  if (n != 1) {
    return false;
  }
  return inbuf[0] == byte;
};

// Use a single AsyncFdWatcher to watch two file descriptors.
TEST_F(AsyncFdWatcherSocketTest, WatchTwoFileDescriptors) {
  int sockfd1[2];
  int sockfd2[2];
  socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd1);
  socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd2);

  testing::MockFunction<void(int)> cb1;
  testing::MockFunction<void(int)> cb2;

  AsyncFdWatcher watcher;
  watcher.WatchFdForNonBlockingReads(sockfd1[0], cb1.AsStdFunction());

  watcher.WatchFdForNonBlockingReads(sockfd2[0], cb2.AsStdFunction());

  EXPECT_CALL(cb1, Call(ReadAndMatchSingleChar('1')));
  char one_buf[1] = {'1'};
  TEMP_FAILURE_RETRY(write(sockfd1[1], one_buf, sizeof(one_buf)));

  EXPECT_CALL(cb2, Call(ReadAndMatchSingleChar('2')));
  char two_buf[1] = {'2'};
  TEMP_FAILURE_RETRY(write(sockfd2[1], two_buf, sizeof(two_buf)));

  // Blocking read instead of a flush.
  TEMP_FAILURE_RETRY(read(sockfd1[1], one_buf, sizeof(one_buf)));
  TEMP_FAILURE_RETRY(read(sockfd2[1], two_buf, sizeof(two_buf)));

  watcher.StopWatchingFileDescriptors();
}

// Use two AsyncFdWatchers to set up a server socket.
TEST_F(AsyncFdWatcherSocketTest, ClientServer) {
  ConfigureServer();
  int socket_cli_fd = ConnectClient();

  WriteFromClient(socket_cli_fd);

  AwaitServerResponse(socket_cli_fd);

  close(socket_cli_fd);
  CleanUpServer();
}

// Use two AsyncFdWatchers to set up a server socket, which times out.
TEST_F(AsyncFdWatcherSocketTest, TimeOutTest) {
  ConfigureServer();
  int socket_cli_fd = ConnectClient();

  while (!TimedOut()) sleep(1);

  close(socket_cli_fd);
  CleanUpServer();
}

// Use two AsyncFdWatchers to set up a server socket, which times out.
TEST_F(AsyncFdWatcherSocketTest, RepeatedTimeOutTest) {
  ConfigureServer();
  int socket_cli_fd = ConnectClient();
  ClearTimeout();

  // Time out when there are no writes.
  EXPECT_FALSE(TimedOut());
  sleep(2);
  EXPECT_TRUE(TimedOut());
  ClearTimeout();

  // Don't time out when there is a write.
  WriteFromClient(socket_cli_fd);
  AwaitServerResponse(socket_cli_fd);
  EXPECT_FALSE(TimedOut());
  ClearTimeout();

  // Time out when the write is late.
  sleep(2);
  WriteFromClient(socket_cli_fd);
  AwaitServerResponse(socket_cli_fd);
  EXPECT_TRUE(TimedOut());
  ClearTimeout();

  // Time out when there is a pause after a write.
  WriteFromClient(socket_cli_fd);
  sleep(2);
  AwaitServerResponse(socket_cli_fd);
  EXPECT_TRUE(TimedOut());
  ClearTimeout();

  close(socket_cli_fd);
  CleanUpServer();
}

}  // namespace android::hardware::bluetooth::async_test
Loading