Loading system/gd/cert/event_callback_stream.py +5 −7 Original line number Diff line number Diff line Loading @@ -16,7 +16,6 @@ from concurrent.futures import ThreadPoolExecutor from grpc import RpcError from grpc._channel import _Rendezvous import logging Loading Loading @@ -140,10 +139,9 @@ class EventCallbackStream(object): callback(event) return None except RpcError as exp: if type(exp) is _Rendezvous: if exp.cancelled(): if self.server_stream_call.cancelled(): logging.debug("Cancelled") return None else: logging.warning("Not cancelled") logging.warning("Some RPC error not due to cancellation") return exp system/gd/hal/hci_hal_host_rootcanal.cc +69 −30 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> #include <chrono> #include <csignal> #include <mutex> #include <queue> Loading Loading @@ -92,18 +93,28 @@ const bool SnoopLogger::AlwaysFlush = true; class HciHalHostRootcanal : public HciHal { public: void registerIncomingPacketCallback(HciHalCallbacks* callback) override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); LOG_INFO("%s before", __func__); { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); ASSERT(incoming_packet_callback_ == nullptr && callback != nullptr); incoming_packet_callback_ = callback; } LOG_INFO("%s after", __func__); } void unregisterIncomingPacketCallback() override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); LOG_INFO("%s before", __func__); { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); incoming_packet_callback_ = nullptr; } LOG_INFO("%s after", __func__); } void sendHciCommand(HciPacket command) override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); ASSERT(sock_fd_ != INVALID_FD); std::vector<uint8_t> packet = std::move(command); btsnoop_logger_->capture(packet, SnoopLogger::Direction::OUTGOING, SnoopLogger::PacketType::CMD); Loading @@ -112,7 +123,7 @@ class HciHalHostRootcanal : public HciHal { } void sendAclData(HciPacket data) override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); ASSERT(sock_fd_ != INVALID_FD); std::vector<uint8_t> packet = std::move(data); btsnoop_logger_->capture(packet, SnoopLogger::Direction::OUTGOING, SnoopLogger::PacketType::ACL); Loading @@ -121,7 +132,7 @@ class HciHalHostRootcanal : public HciHal { } void sendScoData(HciPacket data) override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); ASSERT(sock_fd_ != INVALID_FD); std::vector<uint8_t> packet = std::move(data); btsnoop_logger_->capture(packet, SnoopLogger::Direction::OUTGOING, SnoopLogger::PacketType::SCO); Loading @@ -135,7 +146,7 @@ class HciHalHostRootcanal : public HciHal { } void Start() override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); ASSERT(sock_fd_ == INVALID_FD); sock_fd_ = ConnectToRootCanal(config_->GetServerAddress(), config_->GetPort()); ASSERT(sock_fd_ != INVALID_FD); Loading @@ -147,28 +158,38 @@ class HciHalHostRootcanal : public HciHal { } void Stop() override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); LOG_INFO("Rootcanal HAL is closing"); if (reactable_ != nullptr) { hci_incoming_thread_.GetReactor()->Unregister(reactable_); LOG_INFO("Rootcanal HAL is stopping, start waiting for last callback"); // Wait up to 1 second for the last incoming packet callback to finish hci_incoming_thread_.GetReactor()->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); LOG_INFO("Rootcanal HAL is stopping, finished waiting for last callback"); ASSERT(sock_fd_ != INVALID_FD); } reactable_ = nullptr; { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); incoming_packet_callback_ = nullptr; } ::close(sock_fd_); sock_fd_ = INVALID_FD; LOG_INFO("Rootcanal HAL is closed"); } private: std::mutex mutex_; // Held when APIs are called, NOT to be held during callbacks std::mutex api_mutex_; HciHalHostRootcanalConfig* config_ = HciHalHostRootcanalConfig::Get(); HciHalCallbacks* incoming_packet_callback_ = nullptr; std::mutex incoming_packet_callback_mutex_; int sock_fd_ = INVALID_FD; bluetooth::os::Thread hci_incoming_thread_ = bluetooth::os::Thread("hci_incoming_thread", bluetooth::os::Thread::Priority::NORMAL); bluetooth::os::Reactor::Reactable* reactable_ = nullptr; std::queue<std::vector<uint8_t>> hci_outgoing_queue_; SnoopLogger* btsnoop_logger_; SnoopLogger* btsnoop_logger_ = nullptr; void write_to_rootcanal_fd(HciPacket packet) { // TODO: replace this with new queue when it's ready Loading @@ -181,7 +202,7 @@ class HciHalHostRootcanal : public HciHal { } void send_packet_ready() { std::lock_guard<std::mutex> lock(this->mutex_); std::lock_guard<std::mutex> lock(this->api_mutex_); auto packet_to_send = this->hci_outgoing_queue_.front(); auto bytes_written = write(this->sock_fd_, (void*)packet_to_send.data(), packet_to_send.size()); this->hci_outgoing_queue_.pop(); Loading @@ -196,11 +217,13 @@ class HciHalHostRootcanal : public HciHal { } void incoming_packet_received() { { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); if (incoming_packet_callback_ == nullptr) { LOG_INFO("Dropping a packet"); return; } } uint8_t buf[kBufSize] = {}; ssize_t received_size; Loading Loading @@ -228,10 +251,16 @@ class HciHalHostRootcanal : public HciHal { HciPacket receivedHciPacket; receivedHciPacket.assign(buf + kH4HeaderSize, buf + kH4HeaderSize + kHciEvtHeaderSize + payload_size); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::EVT); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::EVT); { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); if (incoming_packet_callback_ == nullptr) { LOG_INFO("Dropping an event after processing"); return; } incoming_packet_callback_->hciEventReceived(receivedHciPacket); } } if (buf[0] == kH4Acl) { RUN_NO_INTR(received_size = recv(sock_fd_, buf + kH4HeaderSize, kHciAclHeaderSize, 0)); Loading @@ -248,10 +277,16 @@ class HciHalHostRootcanal : public HciHal { HciPacket receivedHciPacket; receivedHciPacket.assign(buf + kH4HeaderSize, buf + kH4HeaderSize + kHciAclHeaderSize + payload_size); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::ACL); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::ACL); { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); if (incoming_packet_callback_ == nullptr) { LOG_INFO("Dropping an ACL packet after processing"); return; } incoming_packet_callback_->aclDataReceived(receivedHciPacket); } } if (buf[0] == kH4Sco) { RUN_NO_INTR(received_size = recv(sock_fd_, buf + kH4HeaderSize, kHciScoHeaderSize, 0)); Loading @@ -266,17 +301,21 @@ class HciHalHostRootcanal : public HciHal { HciPacket receivedHciPacket; receivedHciPacket.assign(buf + kH4HeaderSize, buf + kH4HeaderSize + kHciScoHeaderSize + payload_size); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::SCO); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::SCO); { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); if (incoming_packet_callback_ == nullptr) { LOG_INFO("Dropping a SCO packet after processing"); return; } incoming_packet_callback_->scoDataReceived(receivedHciPacket); } } memset(buf, 0, kBufSize); } }; const ModuleFactory HciHal::Factory = ModuleFactory([]() { return new HciHalHostRootcanal(); }); const ModuleFactory HciHal::Factory = ModuleFactory([]() { return new HciHalHostRootcanal(); }); } // namespace hal } // namespace bluetooth Loading
system/gd/cert/event_callback_stream.py +5 −7 Original line number Diff line number Diff line Loading @@ -16,7 +16,6 @@ from concurrent.futures import ThreadPoolExecutor from grpc import RpcError from grpc._channel import _Rendezvous import logging Loading Loading @@ -140,10 +139,9 @@ class EventCallbackStream(object): callback(event) return None except RpcError as exp: if type(exp) is _Rendezvous: if exp.cancelled(): if self.server_stream_call.cancelled(): logging.debug("Cancelled") return None else: logging.warning("Not cancelled") logging.warning("Some RPC error not due to cancellation") return exp
system/gd/hal/hci_hal_host_rootcanal.cc +69 −30 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> #include <chrono> #include <csignal> #include <mutex> #include <queue> Loading Loading @@ -92,18 +93,28 @@ const bool SnoopLogger::AlwaysFlush = true; class HciHalHostRootcanal : public HciHal { public: void registerIncomingPacketCallback(HciHalCallbacks* callback) override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); LOG_INFO("%s before", __func__); { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); ASSERT(incoming_packet_callback_ == nullptr && callback != nullptr); incoming_packet_callback_ = callback; } LOG_INFO("%s after", __func__); } void unregisterIncomingPacketCallback() override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); LOG_INFO("%s before", __func__); { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); incoming_packet_callback_ = nullptr; } LOG_INFO("%s after", __func__); } void sendHciCommand(HciPacket command) override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); ASSERT(sock_fd_ != INVALID_FD); std::vector<uint8_t> packet = std::move(command); btsnoop_logger_->capture(packet, SnoopLogger::Direction::OUTGOING, SnoopLogger::PacketType::CMD); Loading @@ -112,7 +123,7 @@ class HciHalHostRootcanal : public HciHal { } void sendAclData(HciPacket data) override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); ASSERT(sock_fd_ != INVALID_FD); std::vector<uint8_t> packet = std::move(data); btsnoop_logger_->capture(packet, SnoopLogger::Direction::OUTGOING, SnoopLogger::PacketType::ACL); Loading @@ -121,7 +132,7 @@ class HciHalHostRootcanal : public HciHal { } void sendScoData(HciPacket data) override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); ASSERT(sock_fd_ != INVALID_FD); std::vector<uint8_t> packet = std::move(data); btsnoop_logger_->capture(packet, SnoopLogger::Direction::OUTGOING, SnoopLogger::PacketType::SCO); Loading @@ -135,7 +146,7 @@ class HciHalHostRootcanal : public HciHal { } void Start() override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); ASSERT(sock_fd_ == INVALID_FD); sock_fd_ = ConnectToRootCanal(config_->GetServerAddress(), config_->GetPort()); ASSERT(sock_fd_ != INVALID_FD); Loading @@ -147,28 +158,38 @@ class HciHalHostRootcanal : public HciHal { } void Stop() override { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(api_mutex_); LOG_INFO("Rootcanal HAL is closing"); if (reactable_ != nullptr) { hci_incoming_thread_.GetReactor()->Unregister(reactable_); LOG_INFO("Rootcanal HAL is stopping, start waiting for last callback"); // Wait up to 1 second for the last incoming packet callback to finish hci_incoming_thread_.GetReactor()->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); LOG_INFO("Rootcanal HAL is stopping, finished waiting for last callback"); ASSERT(sock_fd_ != INVALID_FD); } reactable_ = nullptr; { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); incoming_packet_callback_ = nullptr; } ::close(sock_fd_); sock_fd_ = INVALID_FD; LOG_INFO("Rootcanal HAL is closed"); } private: std::mutex mutex_; // Held when APIs are called, NOT to be held during callbacks std::mutex api_mutex_; HciHalHostRootcanalConfig* config_ = HciHalHostRootcanalConfig::Get(); HciHalCallbacks* incoming_packet_callback_ = nullptr; std::mutex incoming_packet_callback_mutex_; int sock_fd_ = INVALID_FD; bluetooth::os::Thread hci_incoming_thread_ = bluetooth::os::Thread("hci_incoming_thread", bluetooth::os::Thread::Priority::NORMAL); bluetooth::os::Reactor::Reactable* reactable_ = nullptr; std::queue<std::vector<uint8_t>> hci_outgoing_queue_; SnoopLogger* btsnoop_logger_; SnoopLogger* btsnoop_logger_ = nullptr; void write_to_rootcanal_fd(HciPacket packet) { // TODO: replace this with new queue when it's ready Loading @@ -181,7 +202,7 @@ class HciHalHostRootcanal : public HciHal { } void send_packet_ready() { std::lock_guard<std::mutex> lock(this->mutex_); std::lock_guard<std::mutex> lock(this->api_mutex_); auto packet_to_send = this->hci_outgoing_queue_.front(); auto bytes_written = write(this->sock_fd_, (void*)packet_to_send.data(), packet_to_send.size()); this->hci_outgoing_queue_.pop(); Loading @@ -196,11 +217,13 @@ class HciHalHostRootcanal : public HciHal { } void incoming_packet_received() { { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); if (incoming_packet_callback_ == nullptr) { LOG_INFO("Dropping a packet"); return; } } uint8_t buf[kBufSize] = {}; ssize_t received_size; Loading Loading @@ -228,10 +251,16 @@ class HciHalHostRootcanal : public HciHal { HciPacket receivedHciPacket; receivedHciPacket.assign(buf + kH4HeaderSize, buf + kH4HeaderSize + kHciEvtHeaderSize + payload_size); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::EVT); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::EVT); { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); if (incoming_packet_callback_ == nullptr) { LOG_INFO("Dropping an event after processing"); return; } incoming_packet_callback_->hciEventReceived(receivedHciPacket); } } if (buf[0] == kH4Acl) { RUN_NO_INTR(received_size = recv(sock_fd_, buf + kH4HeaderSize, kHciAclHeaderSize, 0)); Loading @@ -248,10 +277,16 @@ class HciHalHostRootcanal : public HciHal { HciPacket receivedHciPacket; receivedHciPacket.assign(buf + kH4HeaderSize, buf + kH4HeaderSize + kHciAclHeaderSize + payload_size); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::ACL); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::ACL); { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); if (incoming_packet_callback_ == nullptr) { LOG_INFO("Dropping an ACL packet after processing"); return; } incoming_packet_callback_->aclDataReceived(receivedHciPacket); } } if (buf[0] == kH4Sco) { RUN_NO_INTR(received_size = recv(sock_fd_, buf + kH4HeaderSize, kHciScoHeaderSize, 0)); Loading @@ -266,17 +301,21 @@ class HciHalHostRootcanal : public HciHal { HciPacket receivedHciPacket; receivedHciPacket.assign(buf + kH4HeaderSize, buf + kH4HeaderSize + kHciScoHeaderSize + payload_size); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::SCO); btsnoop_logger_->capture(receivedHciPacket, SnoopLogger::Direction::INCOMING, SnoopLogger::PacketType::SCO); { std::lock_guard<std::mutex> incoming_packet_callback_lock(incoming_packet_callback_mutex_); if (incoming_packet_callback_ == nullptr) { LOG_INFO("Dropping a SCO packet after processing"); return; } incoming_packet_callback_->scoDataReceived(receivedHciPacket); } } memset(buf, 0, kBufSize); } }; const ModuleFactory HciHal::Factory = ModuleFactory([]() { return new HciHalHostRootcanal(); }); const ModuleFactory HciHal::Factory = ModuleFactory([]() { return new HciHalHostRootcanal(); }); } // namespace hal } // namespace bluetooth