Loading system/gd/l2cap/classic/cert/l2cap_test.py +52 −0 Original line number Diff line number Diff line Loading @@ -68,6 +68,28 @@ class L2capTest(GdFacadeOnlyBaseTestClass): def cert_send_b_frame(self, b_frame): self.cert_l2cap.send_acl(b_frame) def _send_configuration_request(self, sid, dcid, continuation=l2cap_packets.Continuation.END, options=[], payload=[]): config_request = l2cap_packets.ConfigurationRequestBuilder( sid, dcid, continuation, options) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) config_request_l2cap = config_request_l2cap.Serialize() config_request_l2cap.extend(payload) config_request_l2cap[0] += len(payload) config_request_l2cap[6] += len(payload) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(config_request_l2cap))) def _setup_link_from_cert(self): self.dut.neighbor.EnablePageScan( neighbor_facade.EnableMsg(enabled=True)) Loading Loading @@ -185,6 +207,36 @@ class L2capTest(GdFacadeOnlyBaseTestClass): assertThat(self.cert_l2cap.get_control_channel()).emitsNone( L2capMatchers.ConfigurationResponse()) def test_continuation_flag(self): """ L2CAP/COS/CFD/BV-01-C [Continuation Flag] Verify the IUT is able to receive configuration requests that have the continuation flag set. """ cert_acl_handle = self._setup_link_from_cert() with EventCallbackStream( self.cert_device.hci_acl_manager.FetchAclData( empty_proto.Empty())) as cert_acl_data_stream: cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) scid = 0x41 psm = 0x33 cert_acl_data_stream.register_callback(self._handle_control_packet) # Send configuration request with CONTINUE self.on_connection_response = lambda log: self._on_connection_response_use_mtu(log, continuation=l2cap_packets.Continuation.CONTINUE, mtu_value=48) self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, psm) cert_acl_data_asserts.assert_event_occurs( self.is_correct_configuration_response) flush_timeout_option = l2cap_packets.FlushTimeoutConfigurationOption( ) flush_timeout_option.flush_timeout = 65535 option = [flush_timeout_option] self._send_configuration_request( 3, self.scid_to_dcid[scid], options=option) cert_acl_data_asserts.assert_event_occurs( self.is_correct_configuration_response) def test_retry_config_after_rejection(self): """ L2CAP/COS/CFD/BV-02-C Loading system/gd/l2cap/classic/internal/signalling_manager.cc +10 −3 Original line number Diff line number Diff line Loading @@ -288,12 +288,19 @@ void ClassicSignallingManager::OnConfigurationRequest(SignalId signal_id, Cid ci auto& configuration_state = channel_configuration_[cid]; std::vector<std::unique_ptr<ConfigurationOption>> rsp_options; ConfigurationResponseResult result = ConfigurationResponseResult::SUCCESS; for (auto& option : options) { switch (option->type_) { case ConfigurationOptionType::MTU: { configuration_state.outgoing_mtu_ = MtuConfigurationOption::Specialize(option.get())->mtu_; // TODO: If less than minimum (required by spec), reject auto* config = MtuConfigurationOption::Specialize(option.get()); if (config->mtu_ < kMinimumClassicMtu) { LOG_WARN("Configuration request with Invalid MTU"); config->mtu_ = kDefaultClassicMtu; rsp_options.emplace_back(std::make_unique<MtuConfigurationOption>(*config)); result = ConfigurationResponseResult::UNACCEPTABLE_PARAMETERS; } configuration_state.outgoing_mtu_ = config->mtu_; break; } case ConfigurationOptionType::FLUSH_TIMEOUT: { Loading Loading @@ -345,7 +352,7 @@ void ClassicSignallingManager::OnConfigurationRequest(SignalId signal_id, Cid ci } auto response = ConfigurationResponseBuilder::Create(signal_id.Value(), channel->GetRemoteCid(), is_continuation, ConfigurationResponseResult::SUCCESS, std::move(rsp_options)); result, std::move(rsp_options)); enqueue_buffer_->Enqueue(std::move(response), handler_); } Loading Loading
system/gd/l2cap/classic/cert/l2cap_test.py +52 −0 Original line number Diff line number Diff line Loading @@ -68,6 +68,28 @@ class L2capTest(GdFacadeOnlyBaseTestClass): def cert_send_b_frame(self, b_frame): self.cert_l2cap.send_acl(b_frame) def _send_configuration_request(self, sid, dcid, continuation=l2cap_packets.Continuation.END, options=[], payload=[]): config_request = l2cap_packets.ConfigurationRequestBuilder( sid, dcid, continuation, options) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) config_request_l2cap = config_request_l2cap.Serialize() config_request_l2cap.extend(payload) config_request_l2cap[0] += len(payload) config_request_l2cap[6] += len(payload) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(config_request_l2cap))) def _setup_link_from_cert(self): self.dut.neighbor.EnablePageScan( neighbor_facade.EnableMsg(enabled=True)) Loading Loading @@ -185,6 +207,36 @@ class L2capTest(GdFacadeOnlyBaseTestClass): assertThat(self.cert_l2cap.get_control_channel()).emitsNone( L2capMatchers.ConfigurationResponse()) def test_continuation_flag(self): """ L2CAP/COS/CFD/BV-01-C [Continuation Flag] Verify the IUT is able to receive configuration requests that have the continuation flag set. """ cert_acl_handle = self._setup_link_from_cert() with EventCallbackStream( self.cert_device.hci_acl_manager.FetchAclData( empty_proto.Empty())) as cert_acl_data_stream: cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) scid = 0x41 psm = 0x33 cert_acl_data_stream.register_callback(self._handle_control_packet) # Send configuration request with CONTINUE self.on_connection_response = lambda log: self._on_connection_response_use_mtu(log, continuation=l2cap_packets.Continuation.CONTINUE, mtu_value=48) self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, psm) cert_acl_data_asserts.assert_event_occurs( self.is_correct_configuration_response) flush_timeout_option = l2cap_packets.FlushTimeoutConfigurationOption( ) flush_timeout_option.flush_timeout = 65535 option = [flush_timeout_option] self._send_configuration_request( 3, self.scid_to_dcid[scid], options=option) cert_acl_data_asserts.assert_event_occurs( self.is_correct_configuration_response) def test_retry_config_after_rejection(self): """ L2CAP/COS/CFD/BV-02-C Loading
system/gd/l2cap/classic/internal/signalling_manager.cc +10 −3 Original line number Diff line number Diff line Loading @@ -288,12 +288,19 @@ void ClassicSignallingManager::OnConfigurationRequest(SignalId signal_id, Cid ci auto& configuration_state = channel_configuration_[cid]; std::vector<std::unique_ptr<ConfigurationOption>> rsp_options; ConfigurationResponseResult result = ConfigurationResponseResult::SUCCESS; for (auto& option : options) { switch (option->type_) { case ConfigurationOptionType::MTU: { configuration_state.outgoing_mtu_ = MtuConfigurationOption::Specialize(option.get())->mtu_; // TODO: If less than minimum (required by spec), reject auto* config = MtuConfigurationOption::Specialize(option.get()); if (config->mtu_ < kMinimumClassicMtu) { LOG_WARN("Configuration request with Invalid MTU"); config->mtu_ = kDefaultClassicMtu; rsp_options.emplace_back(std::make_unique<MtuConfigurationOption>(*config)); result = ConfigurationResponseResult::UNACCEPTABLE_PARAMETERS; } configuration_state.outgoing_mtu_ = config->mtu_; break; } case ConfigurationOptionType::FLUSH_TIMEOUT: { Loading Loading @@ -345,7 +352,7 @@ void ClassicSignallingManager::OnConfigurationRequest(SignalId signal_id, Cid ci } auto response = ConfigurationResponseBuilder::Create(signal_id.Value(), channel->GetRemoteCid(), is_continuation, ConfigurationResponseResult::SUCCESS, std::move(rsp_options)); result, std::move(rsp_options)); enqueue_buffer_->Enqueue(std::move(response), handler_); } Loading