Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 287adc98 authored by Myles Watson's avatar Myles Watson Committed by android-build-merger
Browse files

Merge "Bluetooth: Watch multiple FDs with AsyncFdWatcher" am: 1143a3fe am: 14d63bfc

am: 23a50f39

Change-Id: Id641325482bf1f1ab4fd647777433fd8a9da5263
parents ccb69f8e 23a50f39
Loading
Loading
Loading
Loading
+26 −10
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <map>
#include <mutex>
#include <thread>
#include <vector>
@@ -26,6 +27,8 @@
#include "sys/select.h"
#include "unistd.h"

static const int INVALID_FD = -1;

namespace android {
namespace hardware {
namespace bluetooth {
@@ -36,8 +39,7 @@ int AsyncFdWatcher::WatchFdForNonBlockingReads(
  // Add file descriptor and callback
  {
    std::unique_lock<std::mutex> guard(internal_mutex_);
    read_fd_ = file_descriptor;
    cb_ = on_read_fd_ready_callback;
    watched_fds_[file_descriptor] = on_read_fd_ready_callback;
  }

  // Start the thread if not started yet
@@ -58,7 +60,7 @@ int AsyncFdWatcher::ConfigureTimeout(
  return 0;
}

void AsyncFdWatcher::StopWatchingFileDescriptor() { stopThread(); }
void AsyncFdWatcher::StopWatchingFileDescriptors() { stopThread(); }

AsyncFdWatcher::~AsyncFdWatcher() {}

@@ -90,8 +92,7 @@ int AsyncFdWatcher::stopThread() {

  {
    std::unique_lock<std::mutex> guard(internal_mutex_);
    cb_ = nullptr;
    read_fd_ = -1;
    watched_fds_.clear();
  }

  {
@@ -115,7 +116,11 @@ void AsyncFdWatcher::ThreadRoutine() {
    fd_set read_fds;
    FD_ZERO(&read_fds);
    FD_SET(notification_listen_fd_, &read_fds);
    FD_SET(read_fd_, &read_fds);
    int max_read_fd = INVALID_FD;
    for (auto& it : watched_fds_) {
      FD_SET(it.first, &read_fds);
      max_read_fd = std::max(max_read_fd, it.first);
    }

    struct timeval timeout;
    struct timeval* timeout_ptr = NULL;
@@ -126,7 +131,7 @@ void AsyncFdWatcher::ThreadRoutine() {
    }

    // Wait until there is data available to read on some FD.
    int nfds = std::max(notification_listen_fd_, read_fd_);
    int nfds = std::max(notification_listen_fd_, max_read_fd);
    int retval = select(nfds + 1, &read_fds, NULL, NULL, timeout_ptr);

    // There was some error.
@@ -153,10 +158,21 @@ void AsyncFdWatcher::ThreadRoutine() {
      continue;
    }

    // Invoke the data ready callback if appropriate.
    if (FD_ISSET(read_fd_, &read_fds)) {
    // Invoke the data ready callbacks if appropriate.
    std::vector<decltype(watched_fds_)::value_type> saved_callbacks;
    {
      std::unique_lock<std::mutex> guard(internal_mutex_);
      if (cb_) cb_(read_fd_);
      for (auto& it : watched_fds_) {
        if (FD_ISSET(it.first, &read_fds)) {
          saved_callbacks.push_back(it);
        }
      }
    }

    for (auto& it : saved_callbacks) {
      if (it.second) {
        it.second(it.first);
      }
    }
  }
}
+3 −3
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

#pragma once

#include <map>
#include <mutex>
#include <thread>

@@ -36,7 +37,7 @@ class AsyncFdWatcher {
                                 const ReadCallback& on_read_fd_ready_callback);
  int ConfigureTimeout(const std::chrono::milliseconds timeout,
                       const TimeoutCallback& on_timeout_callback);
  void StopWatchingFileDescriptor();
  void StopWatchingFileDescriptors();

 private:
  AsyncFdWatcher(const AsyncFdWatcher&) = delete;
@@ -52,10 +53,9 @@ class AsyncFdWatcher {
  std::mutex internal_mutex_;
  std::mutex timeout_mutex_;

  int read_fd_;
  std::map<int, ReadCallback> watched_fds_;
  int notification_listen_fd_;
  int notification_write_fd_;
  ReadCallback cb_;
  TimeoutCallback timeout_cb_;
  std::chrono::milliseconds timeout_ms_;
};
+61 −5
Original line number Diff line number Diff line
@@ -14,6 +14,8 @@
// limitations under the License.
//

#define LOG_TAG "async_fd_watcher_unittest"

#include "async_fd_watcher.h"
#include <gtest/gtest.h>
#include <cstdint>
@@ -122,8 +124,8 @@ class AsyncFdWatcherSocketTest : public ::testing::Test {
  }

  void CleanUpServer() {
    async_fd_watcher_.StopWatchingFileDescriptor();
    conn_watcher_.StopWatchingFileDescriptor();
    async_fd_watcher_.StopWatchingFileDescriptors();
    conn_watcher_.StopWatchingFileDescriptors();
    close(socket_fd_);
  }

@@ -211,7 +213,7 @@ TEST_F(AsyncFdWatcherSocketTest, Connect) {
  });

  ConnectClient();
  conn_watcher.StopWatchingFileDescriptor();
  conn_watcher.StopWatchingFileDescriptors();
  close(socket_fd);
}

@@ -233,7 +235,7 @@ TEST_F(AsyncFdWatcherSocketTest, TimedOutConnect) {
  EXPECT_FALSE(timed_out);
  sleep(1);
  EXPECT_TRUE(timed_out);
  conn_watcher.StopWatchingFileDescriptor();
  conn_watcher.StopWatchingFileDescriptors();
  close(socket_fd);
}

@@ -265,10 +267,64 @@ TEST_F(AsyncFdWatcherSocketTest, TimedOutSchedulesTimeout) {
  sleep(1);
  EXPECT_TRUE(timed_out);
  EXPECT_TRUE(timed_out2);
  conn_watcher.StopWatchingFileDescriptor();
  conn_watcher.StopWatchingFileDescriptors();
  close(socket_fd);
}

// Use a single AsyncFdWatcher to watch two file descriptors.
TEST_F(AsyncFdWatcherSocketTest, WatchTwoFileDescriptors) {
  int sockfd[2];
  socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd);
  bool cb1_called = false;
  bool* cb1_called_ptr = &cb1_called;
  bool cb2_called = false;
  bool* cb2_called_ptr = &cb2_called;

  AsyncFdWatcher watcher;
  watcher.WatchFdForNonBlockingReads(sockfd[0], [cb1_called_ptr](int fd) {
    char read_buf[1] = {0};
    int n = TEMP_FAILURE_RETRY(read(fd, read_buf, sizeof(read_buf)));
    ASSERT_TRUE(n == sizeof(read_buf));
    ASSERT_TRUE(read_buf[0] == '1');
    *cb1_called_ptr = true;
  });

  watcher.WatchFdForNonBlockingReads(sockfd[1], [cb2_called_ptr](int fd) {
    char read_buf[1] = {0};
    int n = TEMP_FAILURE_RETRY(read(fd, read_buf, sizeof(read_buf)));
    ASSERT_TRUE(n == sizeof(read_buf));
    ASSERT_TRUE(read_buf[0] == '2');
    *cb2_called_ptr = true;
  });

  // Fail if the test doesn't pass within 3 seconds
  watcher.ConfigureTimeout(std::chrono::seconds(3), [this]() {
    bool connection_timeout = true;
    ASSERT_FALSE(connection_timeout);
  });

  EXPECT_FALSE(cb1_called);
  EXPECT_FALSE(cb2_called);

  char one_buf[1] = {'1'};
  TEMP_FAILURE_RETRY(write(sockfd[1], one_buf, sizeof(one_buf)));

  sleep(1);

  EXPECT_TRUE(cb1_called);
  EXPECT_FALSE(cb2_called);

  char two_buf[1] = {'2'};
  TEMP_FAILURE_RETRY(write(sockfd[0], two_buf, sizeof(two_buf)));

  sleep(1);

  EXPECT_TRUE(cb1_called);
  EXPECT_TRUE(cb2_called);

  watcher.StopWatchingFileDescriptors();
}

// Use two AsyncFdWatchers to set up a server socket.
TEST_F(AsyncFdWatcherSocketTest, ClientServer) {
  ConfigureServer();
+1 −1
Original line number Diff line number Diff line
@@ -274,7 +274,7 @@ bool VendorInterface::Open(InitializeCompleteCallback initialize_complete_cb,
}

void VendorInterface::Close() {
  fd_watcher_.StopWatchingFileDescriptor();
  fd_watcher_.StopWatchingFileDescriptors();

  if (lib_interface_ != nullptr) {
    bt_vendor_lpm_mode_t mode = BT_VND_LPM_DISABLE;