Loading system/gd/cert/matchers.py +5 −0 Original line number Diff line number Diff line Loading @@ -122,6 +122,11 @@ class L2capMatchers(object): def PacketPayloadWithMatchingPsm(psm): return lambda packet: None if psm != packet.psm else packet # this is a hack - should be removed @staticmethod def PacketPayloadWithMatchingCid(cid): return lambda packet: None if cid != packet.fixed_cid else packet @staticmethod def ExtractBasicFrame(scid): return lambda packet: L2capMatchers._basic_frame_for(packet, scid) Loading system/gd/cert/py_l2cap.py +39 −5 Original line number Diff line number Diff line Loading @@ -57,7 +57,31 @@ class PyL2cap(Closable): return PyL2capChannel(self._device, psm) class PyLeL2capChannel(IEventStream): class PyLeL2capFixedChannel(IEventStream): def __init__(self, device, cid, l2cap_stream): self._device = device self._cid = cid self._le_l2cap_stream = l2cap_stream self._our_le_l2cap_view = FilteringEventStream( self._le_l2cap_stream, L2capMatchers.PacketPayloadWithMatchingCid(self._cid)) def get_event_queue(self): return self._our_le_l2cap_view.get_event_queue() def send(self, payload): self._device.l2cap_le.SendFixedChannelPacket( l2cap_le_facade_pb2.FixedChannelPacket( cid=self._cid, payload=payload)) def close_channel(self): self._device.l2cap_le.SetFixedChannel( l2cap_le_facade_pb2.SetEnableFixedChannelRequest( cid=self._cid, enable=False)) class PyLeL2capDynamicChannel(IEventStream): def __init__(self, device, psm, l2cap_stream): self._device = device Loading @@ -72,7 +96,8 @@ class PyLeL2capChannel(IEventStream): def send(self, payload): self._device.l2cap_le.SendDynamicChannelPacket( l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload)) l2cap_le_facade_pb2.DynamicChannelPacket( psm=self._psm, payload=payload)) def close_channel(self): self._device.l2cap_le.CloseDynamicChannel( Loading @@ -86,7 +111,7 @@ class PyLeL2capChannel(IEventStream): class CreditBasedConnectionResponseFutureWrapper(object): """ The future object returned when we send a connection request from DUT. Can be used to get connection status and create the corresponding PyLeL2capChannel object later create the corresponding PyLeL2capDynamicChannel object later """ def __init__(self, grpc_response_future, device, psm, le_l2cap_stream): Loading @@ -102,7 +127,8 @@ class CreditBasedConnectionResponseFutureWrapper(object): def get_channel(self): assertThat(self.get_status()).isEqualTo( l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS) return PyLeL2capChannel(self._device, self._psm, self._le_l2cap_stream) return PyLeL2capDynamicChannel(self._device, self._psm, self._le_l2cap_stream) class PyLeL2cap(Closable): Loading @@ -115,11 +141,19 @@ class PyLeL2cap(Closable): def close(self): safeClose(self._le_l2cap_stream) def enable_fixed_channel(self, cid=4): self._device.l2cap_le.SetFixedChannel( l2cap_le_facade_pb2.SetEnableFixedChannelRequest( cid=cid, enable=True)) def get_fixed_channel(self, cid=4): return PyLeL2capFixedChannel(self._device, cid, self._le_l2cap_stream) def register_coc(self, psm=0x33): self._device.l2cap_le.SetDynamicChannel( l2cap_le_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, enable=True)) return PyLeL2capChannel(self._device, psm, self._le_l2cap_stream) return PyLeL2capDynamicChannel(self._device, psm, self._le_l2cap_stream) def connect_coc_to_cert(self, psm=0x33): """ Loading system/gd/l2cap/le/cert/cert_le_l2cap.py +6 −0 Original line number Diff line number Diff line Loading @@ -131,6 +131,12 @@ class CertLeL2cap(Closable): control_channel=None) self._get_acl_stream().register_callback(self._handle_control_packet) def open_fixed_channel(self, cid=4): channel = CertLeL2capChannel(self._device, cid, cid, self._get_acl_stream(), self._le_acl, None, 0) return channel def open_channel(self, signal_id, psm, Loading system/gd/l2cap/le/cert/le_l2cap_test.py +20 −0 Original line number Diff line number Diff line Loading @@ -146,6 +146,26 @@ class LeL2capTest(GdBaseTestClass): dut_channel = response_future.get_channel() return (dut_channel, cert_channel) def _open_fixed_channel(self, cid=4): dut_channel = self.dut_l2cap.get_fixed_channel(cid) cert_channel = self.cert_l2cap.open_fixed_channel(cid) return (dut_channel, cert_channel) def test_fixed_channel_send(self): self.dut_l2cap.enable_fixed_channel(4) self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_fixed_channel(4) dut_channel.send(b'hello' * 40) assertThat(cert_channel).emits(L2capMatchers.Data(b'hello' * 40)) def test_fixed_channel_receive(self): self.dut_l2cap.enable_fixed_channel(4) self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_fixed_channel(4) cert_channel.send(SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) def test_connect_from_dut_and_open_dynamic_channel(self): """ Internal test for GD stack only Loading system/gd/l2cap/le/facade.cc +169 −14 Original line number Diff line number Diff line Loading @@ -108,19 +108,6 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service { return ::grpc::Status::OK; } ::grpc::Status SendConnectionParameterUpdate(::grpc::ServerContext* context, const ConnectionParameter* request, ::google::protobuf::Empty* response) override { if (dynamic_channel_helper_map_.empty()) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Need to open at least one dynamic channel first"); } auto& dynamic_channel_helper = dynamic_channel_helper_map_.begin()->second; dynamic_channel_helper->channel_->GetLinkOptions()->UpdateConnectionParameter( request->conn_interval_min(), request->conn_interval_max(), request->conn_latency(), request->supervision_timeout(), request->min_ce_length(), request->max_ce_length()); return ::grpc::Status::OK; } class L2capDynamicChannelHelper { public: L2capDynamicChannelHelper(L2capLeModuleFacadeService* service, L2capLeModule* l2cap_layer, os::Handler* handler, Loading Loading @@ -233,7 +220,7 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service { channel_->GetQueueUpEnd()->UnregisterEnqueue(); promise.set_value(); return packet_one; }; } L2capLeModuleFacadeService* facade_service_; L2capLeModule* l2cap_layer_; Loading @@ -247,10 +234,178 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service { std::mutex channel_open_cv_mutex_; }; ::grpc::Status SetFixedChannel(::grpc::ServerContext* context, const SetEnableFixedChannelRequest* request, ::google::protobuf::Empty* response) override { if (request->enable()) { fixed_channel_helper_map_.emplace(request->cid(), std::make_unique<L2capFixedChannelHelper>( this, l2cap_layer_, facade_handler_, request->cid())); return ::grpc::Status::OK; } else { auto service_helper = fixed_channel_helper_map_.find(request->cid()); if (service_helper == fixed_channel_helper_map_.end()) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Cid not registered"); } service_helper->second->channel_->Release(); service_helper->second->service_->Unregister(common::BindOnce([] {}), facade_handler_); return ::grpc::Status::OK; } } ::grpc::Status SendFixedChannelPacket(::grpc::ServerContext* context, const FixedChannelPacket* request, ::google::protobuf::Empty* response) override { std::unique_lock<std::mutex> lock(channel_map_mutex_); if (fixed_channel_helper_map_.find(request->cid()) == fixed_channel_helper_map_.end()) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Cid not registered"); } std::vector<uint8_t> packet(request->payload().begin(), request->payload().end()); if (!fixed_channel_helper_map_[request->cid()]->SendPacket(packet)) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Channel not open"); } return ::grpc::Status::OK; } class L2capFixedChannelHelper { public: L2capFixedChannelHelper(L2capLeModuleFacadeService* service, L2capLeModule* l2cap_layer, os::Handler* handler, Cid cid) : facade_service_(service), l2cap_layer_(l2cap_layer), handler_(handler), cid_(cid) { fixed_channel_manager_ = l2cap_layer_->GetFixedChannelManager(); fixed_channel_manager_->RegisterService( cid_, {}, common::BindOnce(&L2capFixedChannelHelper::on_l2cap_service_registration_complete, common::Unretained(this)), common::Bind(&L2capFixedChannelHelper::on_connection_open, common::Unretained(this)), handler_); } ~L2capFixedChannelHelper() { if (channel_ != nullptr) { channel_->GetQueueUpEnd()->UnregisterDequeue(); channel_->Release(); channel_ = nullptr; } } void Connect(hci::AddressWithType address) { fixed_channel_manager_->ConnectServices( address, common::BindOnce(&L2capFixedChannelHelper::on_connect_fail, common::Unretained(this)), handler_); std::unique_lock<std::mutex> lock(channel_open_cv_mutex_); if (!channel_open_cv_.wait_for(lock, std::chrono::seconds(2), [this] { return channel_ != nullptr; })) { LOG_WARN("Channel is not open for cid %d", cid_); } } void disconnect() { channel_->Release(); } void on_l2cap_service_registration_complete(FixedChannelManager::RegistrationResult registration_result, std::unique_ptr<FixedChannelService> service) { if (registration_result != FixedChannelManager::RegistrationResult::SUCCESS) { LOG_ERROR("Service registration failed"); } else { service_ = std::move(service); } } // invoked from Facade Handler void on_connection_open(std::unique_ptr<FixedChannel> channel) { { std::unique_lock<std::mutex> lock(channel_open_cv_mutex_); LOG_INFO(); channel_ = std::move(channel); channel_->RegisterOnCloseCallback( handler_, common::BindOnce(&L2capFixedChannelHelper::on_close_callback, common::Unretained(this))); channel_->Acquire(); } channel_open_cv_.notify_all(); channel_->GetQueueUpEnd()->RegisterDequeue( facade_service_->facade_handler_, common::Bind(&L2capFixedChannelHelper::on_incoming_packet, common::Unretained(this))); } void on_close_callback(hci::ErrorCode error_code) { { std::unique_lock<std::mutex> lock(channel_open_cv_mutex_); channel_->GetQueueUpEnd()->UnregisterDequeue(); } channel_ = nullptr; } void on_connect_fail(FixedChannelManager::ConnectionResult result) { { std::unique_lock<std::mutex> lock(channel_open_cv_mutex_); channel_ = nullptr; } channel_open_cv_.notify_all(); } void on_incoming_packet() { auto packet = channel_->GetQueueUpEnd()->TryDequeue(); std::string data = std::string(packet->begin(), packet->end()); L2capPacket l2cap_data; l2cap_data.set_fixed_cid(cid_); l2cap_data.set_payload(data); facade_service_->pending_l2cap_data_.OnIncomingEvent(l2cap_data); } bool SendPacket(std::vector<uint8_t> packet) { if (channel_ == nullptr) { std::unique_lock<std::mutex> lock(channel_open_cv_mutex_); if (!channel_open_cv_.wait_for(lock, std::chrono::seconds(2), [this] { return channel_ != nullptr; })) { LOG_WARN("Channel is not open for cid %d", cid_); return false; } } std::promise<void> promise; auto future = promise.get_future(); channel_->GetQueueUpEnd()->RegisterEnqueue( handler_, common::Bind(&L2capFixedChannelHelper::enqueue_callback, common::Unretained(this), packet, common::Passed(std::move(promise)))); auto status = future.wait_for(std::chrono::milliseconds(500)); if (status != std::future_status::ready) { LOG_ERROR("Can't send packet because the previous packet wasn't sent yet"); return false; } return true; } std::unique_ptr<packet::BasePacketBuilder> enqueue_callback(std::vector<uint8_t> packet, std::promise<void> promise) { auto packet_one = std::make_unique<packet::RawBuilder>(2000); packet_one->AddOctets(packet); channel_->GetQueueUpEnd()->UnregisterEnqueue(); promise.set_value(); return packet_one; } L2capLeModuleFacadeService* facade_service_; L2capLeModule* l2cap_layer_; os::Handler* handler_; std::unique_ptr<FixedChannelManager> fixed_channel_manager_; std::unique_ptr<FixedChannelService> service_; std::unique_ptr<FixedChannel> channel_ = nullptr; Cid cid_; std::condition_variable channel_open_cv_; std::mutex channel_open_cv_mutex_; }; ::grpc::Status SendConnectionParameterUpdate(::grpc::ServerContext* context, const ConnectionParameter* request, ::google::protobuf::Empty* response) override { if (dynamic_channel_helper_map_.empty()) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Need to open at least one dynamic channel first"); } auto& dynamic_channel_helper = dynamic_channel_helper_map_.begin()->second; dynamic_channel_helper->channel_->GetLinkOptions()->UpdateConnectionParameter( request->conn_interval_min(), request->conn_interval_max(), request->conn_latency(), request->supervision_timeout(), request->min_ce_length(), request->max_ce_length()); return ::grpc::Status::OK; } L2capLeModule* l2cap_layer_; os::Handler* facade_handler_; std::mutex channel_map_mutex_; std::map<Psm, std::unique_ptr<L2capDynamicChannelHelper>> dynamic_channel_helper_map_; std::map<Cid, std::unique_ptr<L2capFixedChannelHelper>> fixed_channel_helper_map_; ::bluetooth::grpc::GrpcEventQueue<L2capPacket> pending_l2cap_data_{"FetchL2capData"}; }; Loading Loading
system/gd/cert/matchers.py +5 −0 Original line number Diff line number Diff line Loading @@ -122,6 +122,11 @@ class L2capMatchers(object): def PacketPayloadWithMatchingPsm(psm): return lambda packet: None if psm != packet.psm else packet # this is a hack - should be removed @staticmethod def PacketPayloadWithMatchingCid(cid): return lambda packet: None if cid != packet.fixed_cid else packet @staticmethod def ExtractBasicFrame(scid): return lambda packet: L2capMatchers._basic_frame_for(packet, scid) Loading
system/gd/cert/py_l2cap.py +39 −5 Original line number Diff line number Diff line Loading @@ -57,7 +57,31 @@ class PyL2cap(Closable): return PyL2capChannel(self._device, psm) class PyLeL2capChannel(IEventStream): class PyLeL2capFixedChannel(IEventStream): def __init__(self, device, cid, l2cap_stream): self._device = device self._cid = cid self._le_l2cap_stream = l2cap_stream self._our_le_l2cap_view = FilteringEventStream( self._le_l2cap_stream, L2capMatchers.PacketPayloadWithMatchingCid(self._cid)) def get_event_queue(self): return self._our_le_l2cap_view.get_event_queue() def send(self, payload): self._device.l2cap_le.SendFixedChannelPacket( l2cap_le_facade_pb2.FixedChannelPacket( cid=self._cid, payload=payload)) def close_channel(self): self._device.l2cap_le.SetFixedChannel( l2cap_le_facade_pb2.SetEnableFixedChannelRequest( cid=self._cid, enable=False)) class PyLeL2capDynamicChannel(IEventStream): def __init__(self, device, psm, l2cap_stream): self._device = device Loading @@ -72,7 +96,8 @@ class PyLeL2capChannel(IEventStream): def send(self, payload): self._device.l2cap_le.SendDynamicChannelPacket( l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload)) l2cap_le_facade_pb2.DynamicChannelPacket( psm=self._psm, payload=payload)) def close_channel(self): self._device.l2cap_le.CloseDynamicChannel( Loading @@ -86,7 +111,7 @@ class PyLeL2capChannel(IEventStream): class CreditBasedConnectionResponseFutureWrapper(object): """ The future object returned when we send a connection request from DUT. Can be used to get connection status and create the corresponding PyLeL2capChannel object later create the corresponding PyLeL2capDynamicChannel object later """ def __init__(self, grpc_response_future, device, psm, le_l2cap_stream): Loading @@ -102,7 +127,8 @@ class CreditBasedConnectionResponseFutureWrapper(object): def get_channel(self): assertThat(self.get_status()).isEqualTo( l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS) return PyLeL2capChannel(self._device, self._psm, self._le_l2cap_stream) return PyLeL2capDynamicChannel(self._device, self._psm, self._le_l2cap_stream) class PyLeL2cap(Closable): Loading @@ -115,11 +141,19 @@ class PyLeL2cap(Closable): def close(self): safeClose(self._le_l2cap_stream) def enable_fixed_channel(self, cid=4): self._device.l2cap_le.SetFixedChannel( l2cap_le_facade_pb2.SetEnableFixedChannelRequest( cid=cid, enable=True)) def get_fixed_channel(self, cid=4): return PyLeL2capFixedChannel(self._device, cid, self._le_l2cap_stream) def register_coc(self, psm=0x33): self._device.l2cap_le.SetDynamicChannel( l2cap_le_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, enable=True)) return PyLeL2capChannel(self._device, psm, self._le_l2cap_stream) return PyLeL2capDynamicChannel(self._device, psm, self._le_l2cap_stream) def connect_coc_to_cert(self, psm=0x33): """ Loading
system/gd/l2cap/le/cert/cert_le_l2cap.py +6 −0 Original line number Diff line number Diff line Loading @@ -131,6 +131,12 @@ class CertLeL2cap(Closable): control_channel=None) self._get_acl_stream().register_callback(self._handle_control_packet) def open_fixed_channel(self, cid=4): channel = CertLeL2capChannel(self._device, cid, cid, self._get_acl_stream(), self._le_acl, None, 0) return channel def open_channel(self, signal_id, psm, Loading
system/gd/l2cap/le/cert/le_l2cap_test.py +20 −0 Original line number Diff line number Diff line Loading @@ -146,6 +146,26 @@ class LeL2capTest(GdBaseTestClass): dut_channel = response_future.get_channel() return (dut_channel, cert_channel) def _open_fixed_channel(self, cid=4): dut_channel = self.dut_l2cap.get_fixed_channel(cid) cert_channel = self.cert_l2cap.open_fixed_channel(cid) return (dut_channel, cert_channel) def test_fixed_channel_send(self): self.dut_l2cap.enable_fixed_channel(4) self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_fixed_channel(4) dut_channel.send(b'hello' * 40) assertThat(cert_channel).emits(L2capMatchers.Data(b'hello' * 40)) def test_fixed_channel_receive(self): self.dut_l2cap.enable_fixed_channel(4) self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_fixed_channel(4) cert_channel.send(SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) def test_connect_from_dut_and_open_dynamic_channel(self): """ Internal test for GD stack only Loading
system/gd/l2cap/le/facade.cc +169 −14 Original line number Diff line number Diff line Loading @@ -108,19 +108,6 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service { return ::grpc::Status::OK; } ::grpc::Status SendConnectionParameterUpdate(::grpc::ServerContext* context, const ConnectionParameter* request, ::google::protobuf::Empty* response) override { if (dynamic_channel_helper_map_.empty()) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Need to open at least one dynamic channel first"); } auto& dynamic_channel_helper = dynamic_channel_helper_map_.begin()->second; dynamic_channel_helper->channel_->GetLinkOptions()->UpdateConnectionParameter( request->conn_interval_min(), request->conn_interval_max(), request->conn_latency(), request->supervision_timeout(), request->min_ce_length(), request->max_ce_length()); return ::grpc::Status::OK; } class L2capDynamicChannelHelper { public: L2capDynamicChannelHelper(L2capLeModuleFacadeService* service, L2capLeModule* l2cap_layer, os::Handler* handler, Loading Loading @@ -233,7 +220,7 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service { channel_->GetQueueUpEnd()->UnregisterEnqueue(); promise.set_value(); return packet_one; }; } L2capLeModuleFacadeService* facade_service_; L2capLeModule* l2cap_layer_; Loading @@ -247,10 +234,178 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service { std::mutex channel_open_cv_mutex_; }; ::grpc::Status SetFixedChannel(::grpc::ServerContext* context, const SetEnableFixedChannelRequest* request, ::google::protobuf::Empty* response) override { if (request->enable()) { fixed_channel_helper_map_.emplace(request->cid(), std::make_unique<L2capFixedChannelHelper>( this, l2cap_layer_, facade_handler_, request->cid())); return ::grpc::Status::OK; } else { auto service_helper = fixed_channel_helper_map_.find(request->cid()); if (service_helper == fixed_channel_helper_map_.end()) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Cid not registered"); } service_helper->second->channel_->Release(); service_helper->second->service_->Unregister(common::BindOnce([] {}), facade_handler_); return ::grpc::Status::OK; } } ::grpc::Status SendFixedChannelPacket(::grpc::ServerContext* context, const FixedChannelPacket* request, ::google::protobuf::Empty* response) override { std::unique_lock<std::mutex> lock(channel_map_mutex_); if (fixed_channel_helper_map_.find(request->cid()) == fixed_channel_helper_map_.end()) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Cid not registered"); } std::vector<uint8_t> packet(request->payload().begin(), request->payload().end()); if (!fixed_channel_helper_map_[request->cid()]->SendPacket(packet)) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Channel not open"); } return ::grpc::Status::OK; } class L2capFixedChannelHelper { public: L2capFixedChannelHelper(L2capLeModuleFacadeService* service, L2capLeModule* l2cap_layer, os::Handler* handler, Cid cid) : facade_service_(service), l2cap_layer_(l2cap_layer), handler_(handler), cid_(cid) { fixed_channel_manager_ = l2cap_layer_->GetFixedChannelManager(); fixed_channel_manager_->RegisterService( cid_, {}, common::BindOnce(&L2capFixedChannelHelper::on_l2cap_service_registration_complete, common::Unretained(this)), common::Bind(&L2capFixedChannelHelper::on_connection_open, common::Unretained(this)), handler_); } ~L2capFixedChannelHelper() { if (channel_ != nullptr) { channel_->GetQueueUpEnd()->UnregisterDequeue(); channel_->Release(); channel_ = nullptr; } } void Connect(hci::AddressWithType address) { fixed_channel_manager_->ConnectServices( address, common::BindOnce(&L2capFixedChannelHelper::on_connect_fail, common::Unretained(this)), handler_); std::unique_lock<std::mutex> lock(channel_open_cv_mutex_); if (!channel_open_cv_.wait_for(lock, std::chrono::seconds(2), [this] { return channel_ != nullptr; })) { LOG_WARN("Channel is not open for cid %d", cid_); } } void disconnect() { channel_->Release(); } void on_l2cap_service_registration_complete(FixedChannelManager::RegistrationResult registration_result, std::unique_ptr<FixedChannelService> service) { if (registration_result != FixedChannelManager::RegistrationResult::SUCCESS) { LOG_ERROR("Service registration failed"); } else { service_ = std::move(service); } } // invoked from Facade Handler void on_connection_open(std::unique_ptr<FixedChannel> channel) { { std::unique_lock<std::mutex> lock(channel_open_cv_mutex_); LOG_INFO(); channel_ = std::move(channel); channel_->RegisterOnCloseCallback( handler_, common::BindOnce(&L2capFixedChannelHelper::on_close_callback, common::Unretained(this))); channel_->Acquire(); } channel_open_cv_.notify_all(); channel_->GetQueueUpEnd()->RegisterDequeue( facade_service_->facade_handler_, common::Bind(&L2capFixedChannelHelper::on_incoming_packet, common::Unretained(this))); } void on_close_callback(hci::ErrorCode error_code) { { std::unique_lock<std::mutex> lock(channel_open_cv_mutex_); channel_->GetQueueUpEnd()->UnregisterDequeue(); } channel_ = nullptr; } void on_connect_fail(FixedChannelManager::ConnectionResult result) { { std::unique_lock<std::mutex> lock(channel_open_cv_mutex_); channel_ = nullptr; } channel_open_cv_.notify_all(); } void on_incoming_packet() { auto packet = channel_->GetQueueUpEnd()->TryDequeue(); std::string data = std::string(packet->begin(), packet->end()); L2capPacket l2cap_data; l2cap_data.set_fixed_cid(cid_); l2cap_data.set_payload(data); facade_service_->pending_l2cap_data_.OnIncomingEvent(l2cap_data); } bool SendPacket(std::vector<uint8_t> packet) { if (channel_ == nullptr) { std::unique_lock<std::mutex> lock(channel_open_cv_mutex_); if (!channel_open_cv_.wait_for(lock, std::chrono::seconds(2), [this] { return channel_ != nullptr; })) { LOG_WARN("Channel is not open for cid %d", cid_); return false; } } std::promise<void> promise; auto future = promise.get_future(); channel_->GetQueueUpEnd()->RegisterEnqueue( handler_, common::Bind(&L2capFixedChannelHelper::enqueue_callback, common::Unretained(this), packet, common::Passed(std::move(promise)))); auto status = future.wait_for(std::chrono::milliseconds(500)); if (status != std::future_status::ready) { LOG_ERROR("Can't send packet because the previous packet wasn't sent yet"); return false; } return true; } std::unique_ptr<packet::BasePacketBuilder> enqueue_callback(std::vector<uint8_t> packet, std::promise<void> promise) { auto packet_one = std::make_unique<packet::RawBuilder>(2000); packet_one->AddOctets(packet); channel_->GetQueueUpEnd()->UnregisterEnqueue(); promise.set_value(); return packet_one; } L2capLeModuleFacadeService* facade_service_; L2capLeModule* l2cap_layer_; os::Handler* handler_; std::unique_ptr<FixedChannelManager> fixed_channel_manager_; std::unique_ptr<FixedChannelService> service_; std::unique_ptr<FixedChannel> channel_ = nullptr; Cid cid_; std::condition_variable channel_open_cv_; std::mutex channel_open_cv_mutex_; }; ::grpc::Status SendConnectionParameterUpdate(::grpc::ServerContext* context, const ConnectionParameter* request, ::google::protobuf::Empty* response) override { if (dynamic_channel_helper_map_.empty()) { return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Need to open at least one dynamic channel first"); } auto& dynamic_channel_helper = dynamic_channel_helper_map_.begin()->second; dynamic_channel_helper->channel_->GetLinkOptions()->UpdateConnectionParameter( request->conn_interval_min(), request->conn_interval_max(), request->conn_latency(), request->supervision_timeout(), request->min_ce_length(), request->max_ce_length()); return ::grpc::Status::OK; } L2capLeModule* l2cap_layer_; os::Handler* facade_handler_; std::mutex channel_map_mutex_; std::map<Psm, std::unique_ptr<L2capDynamicChannelHelper>> dynamic_channel_helper_map_; std::map<Cid, std::unique_ptr<L2capFixedChannelHelper>> fixed_channel_helper_map_; ::bluetooth::grpc::GrpcEventQueue<L2capPacket> pending_l2cap_data_{"FetchL2capData"}; }; Loading