Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f12c5c46 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge changes Iea352d4b,Ic0737465

* changes:
  GRPC: Use server_stream_call.cancelled() instead of hidden API
  RootCanal HAL: Fixed race condition when tearing down rootcanal HAL
parents 142e94b5 7f8739e1
Loading
Loading
Loading
Loading
+5 −7
Original line number Diff line number Diff line
@@ -16,7 +16,6 @@

from concurrent.futures import ThreadPoolExecutor
from grpc import RpcError
from grpc._channel import _Rendezvous
import logging


@@ -140,10 +139,9 @@ class EventCallbackStream(object):
                        callback(event)
            return None
        except RpcError as exp:
            if type(exp) is _Rendezvous:
                if exp.cancelled():
            if self.server_stream_call.cancelled():
                logging.debug("Cancelled")
                return None
            else:
                    logging.warning("Not cancelled")
                logging.warning("Some RPC error not due to cancellation")
            return exp
+69 −30
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <chrono>
#include <csignal>
#include <mutex>
#include <queue>
@@ -92,18 +93,28 @@ const bool SnoopLogger::AlwaysFlush = true;
class HciHalHostRootcanal : public HciHal {
 public:
  void registerIncomingPacketCallback(HciHalCallbacks* callback) override {
    std::lock_guard<std::mutex> lock(mutex_);
    std::lock_guard<std::mutex> lock(api_mutex_);
    LOG_INFO("%s before", __func__);
    {
      std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_);
      ASSERT(incoming_packet_callback_ == nullptr && callback != nullptr);
      incoming_packet_callback_ = callback;
    }
    LOG_INFO("%s after", __func__);
  }

  void unregisterIncomingPacketCallback() override {
    std::lock_guard<std::mutex> lock(mutex_);
    std::lock_guard<std::mutex> lock(api_mutex_);
    LOG_INFO("%s before", __func__);
    {
      std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_);
      incoming_packet_callback_ = nullptr;
    }
    LOG_INFO("%s after", __func__);
  }

  void sendHciCommand(HciPacket command) override {
    std::lock_guard<std::mutex> lock(mutex_);
    std::lock_guard<std::mutex> lock(api_mutex_);
    ASSERT(sock_fd_ != INVALID_FD);
    std::vector<uint8_t> packet = std::move(command);
    btsnoop_logger_->capture(packet, SnoopLogger::Direction::OUTGOING, SnoopLogger::PacketType::CMD);
@@ -112,7 +123,7 @@ class HciHalHostRootcanal : public HciHal {
  }

  void sendAclData(HciPacket data) override {
    std::lock_guard<std::mutex> lock(mutex_);
    std::lock_guard<std::mutex> lock(api_mutex_);
    ASSERT(sock_fd_ != INVALID_FD);
    std::vector<uint8_t> packet = std::move(data);
    btsnoop_logger_->capture(packet, SnoopLogger::Direction::OUTGOING, SnoopLogger::PacketType::ACL);
@@ -121,7 +132,7 @@ class HciHalHostRootcanal : public HciHal {
  }

  void sendScoData(HciPacket data) override {
    std::lock_guard<std::mutex> lock(mutex_);
    std::lock_guard<std::mutex> lock(api_mutex_);
    ASSERT(sock_fd_ != INVALID_FD);
    std::vector<uint8_t> packet = std::move(data);
    btsnoop_logger_->capture(packet, SnoopLogger::Direction::OUTGOING, SnoopLogger::PacketType::SCO);
@@ -135,7 +146,7 @@ class HciHalHostRootcanal : public HciHal {
  }

  void Start() override {
    std::lock_guard<std::mutex> lock(mutex_);
    std::lock_guard<std::mutex> lock(api_mutex_);
    ASSERT(sock_fd_ == INVALID_FD);
    sock_fd_ = ConnectToRootCanal(config_->GetServerAddress(), config_->GetPort());
    ASSERT(sock_fd_ != INVALID_FD);
@@ -147,28 +158,38 @@ class HciHalHostRootcanal : public HciHal {
  }

  void Stop() override {
    std::lock_guard<std::mutex> lock(mutex_);
    std::lock_guard<std::mutex> lock(api_mutex_);
    LOG_INFO("Rootcanal HAL is closing");
    if (reactable_ != nullptr) {
      hci_incoming_thread_.GetReactor()->Unregister(reactable_);
      LOG_INFO("Rootcanal HAL is stopping, start waiting for last callback");
      // Wait up to 1 second for the last incoming packet callback to finish
      hci_incoming_thread_.GetReactor()->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
      LOG_INFO("Rootcanal HAL is stopping, finished waiting for last callback");
      ASSERT(sock_fd_ != INVALID_FD);
    }
    reactable_ = nullptr;
    {
      std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_);
      incoming_packet_callback_ = nullptr;
    }
    ::close(sock_fd_);
    sock_fd_ = INVALID_FD;
    LOG_INFO("Rootcanal HAL is closed");
  }

 private:
  std::mutex mutex_;
  // Held when APIs are called, NOT to be held during callbacks
  std::mutex api_mutex_;
  HciHalHostRootcanalConfig* config_ = HciHalHostRootcanalConfig::Get();
  HciHalCallbacks* incoming_packet_callback_ = nullptr;
  std::mutex incoming_packet_callback_mutex_;
  int sock_fd_ = INVALID_FD;
  bluetooth::os::Thread hci_incoming_thread_ =
      bluetooth::os::Thread("hci_incoming_thread", bluetooth::os::Thread::Priority::NORMAL);
  bluetooth::os::Reactor::Reactable* reactable_ = nullptr;
  std::queue<std::vector<uint8_t>> hci_outgoing_queue_;
  SnoopLogger* btsnoop_logger_;
  SnoopLogger* btsnoop_logger_ = nullptr;

  void write_to_rootcanal_fd(HciPacket packet) {
    // TODO: replace this with new queue when it's ready
@@ -181,7 +202,7 @@ class HciHalHostRootcanal : public HciHal {
  }

  void send_packet_ready() {
    std::lock_guard<std::mutex> lock(this->mutex_);
    std::lock_guard<std::mutex> lock(this->api_mutex_);
    auto packet_to_send = this->hci_outgoing_queue_.front();
    auto bytes_written = write(this->sock_fd_, (void*)packet_to_send.data(), packet_to_send.size());
    this->hci_outgoing_queue_.pop();
@@ -196,11 +217,13 @@ class HciHalHostRootcanal : public HciHal {
  }

  void incoming_packet_received() {
    {
      std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_);
      if (incoming_packet_callback_ == nullptr) {
        LOG_INFO("Dropping a packet");
        return;
      }

    }
    uint8_t buf[kBufSize] = {};

    ssize_t received_size;
@@ -228,10 +251,16 @@ class HciHalHostRootcanal : public HciHal {

      HciPacket receivedHciPacket;
      receivedHciPacket.assign(buf + kH4HeaderSize, buf + kH4HeaderSize + kHciEvtHeaderSize + payload_size);
      btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING,
                               SnoopLogger::PacketType::EVT);
      btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::EVT);
      {
        std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_);
        if (incoming_packet_callback_ == nullptr) {
          LOG_INFO("Dropping an event after processing");
          return;
        }
        incoming_packet_callback_->hciEventReceived(receivedHciPacket);
      }
    }

    if (buf[0] == kH4Acl) {
      RUN_NO_INTR(received_size = recv(sock_fd_, buf + kH4HeaderSize, kHciAclHeaderSize, 0));
@@ -248,10 +277,16 @@ class HciHalHostRootcanal : public HciHal {

      HciPacket receivedHciPacket;
      receivedHciPacket.assign(buf + kH4HeaderSize, buf + kH4HeaderSize + kHciAclHeaderSize + payload_size);
      btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING,
                               SnoopLogger::PacketType::ACL);
      btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::ACL);
      {
        std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_);
        if (incoming_packet_callback_ == nullptr) {
          LOG_INFO("Dropping an ACL packet after processing");
          return;
        }
        incoming_packet_callback_->aclDataReceived(receivedHciPacket);
      }
    }

    if (buf[0] == kH4Sco) {
      RUN_NO_INTR(received_size = recv(sock_fd_, buf + kH4HeaderSize, kHciScoHeaderSize, 0));
@@ -266,17 +301,21 @@ class HciHalHostRootcanal : public HciHal {

      HciPacket receivedHciPacket;
      receivedHciPacket.assign(buf + kH4HeaderSize, buf + kH4HeaderSize + kHciScoHeaderSize + payload_size);
      btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING,
                               SnoopLogger::PacketType::SCO);
      btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::SCO);
      {
        std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_);
        if (incoming_packet_callback_ == nullptr) {
          LOG_INFO("Dropping a SCO packet after processing");
          return;
        }
        incoming_packet_callback_->scoDataReceived(receivedHciPacket);
      }
    }
    memset(buf, 0, kBufSize);
  }
};

const ModuleFactory HciHal::Factory = ModuleFactory([]() {
  return new HciHalHostRootcanal();
});
const ModuleFactory HciHal::Factory = ModuleFactory([]() { return new HciHalHostRootcanal(); });

}  // namespace hal
}  // namespace bluetooth