Loading system/gd/l2cap/classic/cert/api.proto +8 −1 Original line number Diff line number Diff line Loading @@ -54,6 +54,7 @@ message SFrame { uint32 f = 4; uint32 p = 5; uint32 s = 6; bool with_fcs = 7; } message SendSFrameResult {} Loading Loading @@ -99,12 +100,18 @@ message ChannelRetransmissionFlowControlConfig { uint32 mps = 6; } enum FcsConfig { NO_FCS = 0; DEFAULT = 1; NON = 2; } message ConfigurationRequest { uint32 dcid = 1; uint32 signal_id = 2; uint32 mtu = 3; ChannelRetransmissionFlowControlConfig retransmission_config = 4; bool fcs = 5; FcsConfig fcs_config = 5; } enum ConfigurationResult { Loading system/gd/l2cap/classic/cert/cert.cc +30 −5 Original line number Diff line number Diff line Loading @@ -100,7 +100,12 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { auto f = static_cast<Final>(request->f()); auto p = static_cast<Poll>(request->p()); auto s = static_cast<SupervisoryFunction>(request->s()); auto builder = EnhancedSupervisoryFrameBuilder::Create(request->channel(), s, p, f, request->req_seq()); std::unique_ptr<packet::BasePacketBuilder> builder; if (request->with_fcs()) { builder = EnhancedSupervisoryFrameWithFcsBuilder::Create(request->channel(), s, p, f, request->req_seq()); } else { builder = EnhancedSupervisoryFrameBuilder::Create(request->channel(), s, p, f, request->req_seq()); } outgoing_packet_queue_.push(std::move(builder)); send_packet_from_queue(); return ::grpc::Status::OK; Loading Loading @@ -140,9 +145,11 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { option->monitor_time_out_ = 12000; option->maximum_pdu_size_ = 1010; config.push_back(std::move(option)); auto no_fcs = std::make_unique<FrameCheckSequenceOption>(); no_fcs->fcs_type_ = FcsType::NO_FCS; config.push_back(std::move(no_fcs)); auto fcs = std::make_unique<FrameCheckSequenceOption>(); if (request->fcs_config() != FcsConfig::NON) { fcs->fcs_type_ = request->fcs_config() ? FcsType::DEFAULT : FcsType::NO_FCS; config.push_back(std::move(fcs)); } } auto builder = ConfigurationRequestBuilder::Create(request->signal_id(), request->dcid(), Continuation::END, std::move(config)); Loading Loading @@ -242,7 +249,8 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { } case InformationRequestType::EXTENDED_FEATURES: { auto builder = InformationResponseExtendedFeaturesBuilder::Create( request->signal_id(), InformationRequestResult::SUCCESS, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0); request->signal_id(), InformationRequestResult::SUCCESS, 0, 0, 0, 1, 0, request->information_value() & (1 << 5) ? 1 : 0, 0, 0, 0, 0); auto l2cap_builder = BasicFrameBuilder::Create(kClassicSignallingCid, std::move(builder)); outgoing_packet_queue_.push(std::move(l2cap_builder)); send_packet_from_queue(); Loading Loading @@ -349,6 +357,7 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { response.mutable_configuration_request()->set_signal_id(control_view.GetIdentifier()); response.mutable_configuration_request()->set_dcid(view.GetDestinationCid()); bool fcs_set = false; for (auto& option : view.GetConfig()) { if (option->type_ == ConfigurationOptionType::RETRANSMISSION_AND_FLOW_CONTROL) { auto config = RetransmissionAndFlowControlConfigurationOption::Specialize(option.get()); Loading @@ -365,6 +374,22 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { response.mutable_configuration_request()->mutable_retransmission_config()->set_mps( config->maximum_pdu_size_); } if (option->type_ == ConfigurationOptionType::FRAME_CHECK_SEQUENCE) { auto fcs_type = FrameCheckSequenceOption::Specialize(option.get())->fcs_type_; LOG_INFO("fce_type=%hhu", fcs_type); switch (fcs_type) { case FcsType::NO_FCS: response.mutable_configuration_request()->set_fcs_config(FcsConfig::NO_FCS); break; case FcsType::DEFAULT: response.mutable_configuration_request()->set_fcs_config(FcsConfig::DEFAULT); break; } fcs_set = true; } } if (!fcs_set) { response.mutable_configuration_request()->set_fcs_config(FcsConfig::NON); } LogEvent(response); break; Loading system/gd/l2cap/classic/cert/pts_l2cap_test.py +20 −0 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ from facade import common_pb2 from facade import rootservice_pb2 as facade_rootservice_pb2 from l2cap.classic import facade_pb2 as l2cap_facade_pb2 from google.protobuf import empty_pb2 from neighbor.facade import facade_pb2 as neighbor_facade class PTSL2capTest(PTSBaseTestClass): Loading @@ -47,6 +48,9 @@ class PTSL2capTest(PTSBaseTestClass): self.pts_address = common_pb2.BluetoothAddress( address=str.encode(pts_address)) self.device_under_test.neighbor.EnablePageScan( neighbor_facade.EnableMsg(enabled=True)) def teardown_test(self): self.device_under_test.rootservice.StopStack( facade_rootservice_pb2.StopStackRequest()) Loading @@ -72,6 +76,22 @@ class PTSL2capTest(PTSBaseTestClass): lambda device: device.remote.address == self.pts_address.address, timeout=timedelta(seconds=timeout)) def test_L2CAP_EXF_BV_03_C(self): """ L2CAP/EXF/BV-03-C [Extended Features Information Response for FCS Option] Verify the IUT can format an Information Response for the information type of Extended Features that correctly identifies that the FCS Option is locally supported. """ with self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_close_asserts = EventAsserts( dut_connection_close_stream) psm = 1 retransmission_mode = l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM self.device_under_test.l2cap.SetDynamicChannel( l2cap_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, retransmission_mode=retransmission_mode)) time.sleep(5) def test_L2CAP_COS_CED_BV_01_C(self): """ L2CAP/COS/CED/BV-01-C [Request Connection] Loading system/gd/l2cap/classic/cert/simple_l2cap_test.py +219 −1 Original line number Diff line number Diff line Loading @@ -485,7 +485,225 @@ class SimpleL2capTest(GdBaseTestClass): lambda log : is_information_response(log) and \ log.information_response.signal_id == signal_id and \ log.information_response.type == expected_log_type and \ log.information_response.information_value | expected_mask == expected_mask) log.information_response.information_value & expected_mask == expected_mask) def test_extended_feature_info_response_fcs(self): """ L2CAP/EXF/BV-03-C [Extended Features Information Response for FCS Option] """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) signal_id = 3 self.cert_device.l2cap.SendInformationRequest( l2cap_cert_pb2.InformationRequest( type=l2cap_cert_pb2.InformationRequestType. EXTENDED_FEATURES, signal_id=signal_id)) l2cap_event_asserts_alt.assert_event_occurs_at_most( is_information_response, 1) expected_log_type = l2cap_cert_pb2.InformationRequestType.EXTENDED_FEATURES expected_mask = 1 << 5 l2cap_event_asserts.assert_event_occurs( lambda log : is_information_response(log) and \ log.information_response.signal_id == signal_id and \ log.information_response.type == expected_log_type and \ log.information_response.information_value & expected_mask == expected_mask) def test_config_channel_not_use_FCS(self): """ L2CAP/FOC/BV-01-C [IUT Initiated Configuration of the FCS Option] """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM scid = 0x0101 self._setup_link(l2cap_event_asserts) self._open_channel( l2cap_event_asserts, scid=scid, mode=self.retransmission_mode) self.device_under_test.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and \ log.data_packet.channel == scid and \ log.data_packet.payload == b'\x00\x00abc' ) def test_explicitly_request_use_FCS(self): """ L2CAP/FOC/BV-02-C [Lower Tester Explicitly Requests FCS should be Used] """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM scid = 0x0101 information_value = 1 << 3 | 1 << 5 def handle_information_request(log): log = log.information_request self.cert_device.l2cap.SendInformationResponse( l2cap_cert_pb2.InformationResponse( type=log.type, signal_id=log.signal_id, information_value=information_value)) l2cap_log_stream.unregister_callback( self.handle_information_request, matcher_fn=is_information_request) l2cap_log_stream.register_callback( handle_information_request, matcher_fn=is_information_request) def handle_connection_response(log): log = log.connection_response self.scid_dcid_map[log.scid] = log.dcid l2cap_log_stream.unregister_callback( self.handle_connection_response, matcher_fn=is_connection_response) l2cap_log_stream.register_callback( handle_connection_response, matcher_fn=is_connection_response) def handle_configuration_request(log): log = log.configuration_request if log.dcid not in self.scid_dcid_map: return dcid = self.scid_dcid_map[log.dcid] self.cert_device.l2cap.SendConfigurationResponse( l2cap_cert_pb2.ConfigurationResponse( scid=dcid, signal_id=log.signal_id, retransmission_config=l2cap_cert_pb2. ChannelRetransmissionFlowControlConfig( mode=self.retransmission_mode))) self.cert_device.l2cap.SendConfigurationRequest( l2cap_cert_pb2.ConfigurationRequest( dcid=dcid, signal_id=log.signal_id + 1, retransmission_config=l2cap_cert_pb2. ChannelRetransmissionFlowControlConfig( mode=self.retransmission_mode), fcs_config=l2cap_cert_pb2.FcsConfig.DEFAULT)) l2cap_log_stream.unregister_callback( self.handle_configuration_request, matcher_fn=is_configuration_request) l2cap_log_stream.register_callback( handle_configuration_request, matcher_fn=is_configuration_request) self._setup_link(l2cap_event_asserts) self._open_channel( l2cap_event_asserts, scid=scid, mode=self.retransmission_mode) l2cap_event_asserts_alt.assert_event_occurs( lambda log: is_configuration_request(log) and \ log.configuration_request.fcs_config == l2cap_cert_pb2.FcsConfig.NO_FCS) self.device_under_test.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and \ log.data_packet.channel == scid and \ basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b'abc\x0f\xb6' ) def test_implicitly_request_use_FCS(self): """ L2CAP/FOC/BV-03-C [Lower Tester Implicitly Requests FCS should be Used] """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM scid = 0x41 psm = 0x41 information_value = 1 << 3 | 1 << 5 def handle_information_request(log): log = log.information_request self.cert_device.l2cap.SendInformationResponse( l2cap_cert_pb2.InformationResponse( type=log.type, signal_id=log.signal_id, information_value=information_value)) l2cap_log_stream.unregister_callback( self.handle_information_request, matcher_fn=is_information_request) l2cap_log_stream.register_callback( handle_information_request, matcher_fn=is_information_request) def handle_connection_response(log): log = log.connection_response self.scid_dcid_map[log.scid] = log.dcid l2cap_log_stream.unregister_callback( self.handle_connection_response, matcher_fn=is_connection_response) l2cap_log_stream.register_callback( handle_connection_response, matcher_fn=is_connection_response) def handle_configuration_request(log): log = log.configuration_request if log.dcid not in self.scid_dcid_map: return dcid = self.scid_dcid_map[log.dcid] self.cert_device.l2cap.SendConfigurationResponse( l2cap_cert_pb2.ConfigurationResponse( scid=dcid, signal_id=log.signal_id, retransmission_config=l2cap_cert_pb2. ChannelRetransmissionFlowControlConfig( mode=self.retransmission_mode))) self.cert_device.l2cap.SendConfigurationRequest( l2cap_cert_pb2.ConfigurationRequest( dcid=dcid, signal_id=log.signal_id + 1, retransmission_config=l2cap_cert_pb2. ChannelRetransmissionFlowControlConfig( mode=self.retransmission_mode), fcs_config=l2cap_cert_pb2.FcsConfig.NON)) l2cap_log_stream.unregister_callback( self.handle_configuration_request, matcher_fn=is_configuration_request) l2cap_log_stream.register_callback( handle_configuration_request, matcher_fn=is_configuration_request) self._setup_link(l2cap_event_asserts) self._open_channel( l2cap_event_asserts, scid=scid, psm=psm, mode=self.retransmission_mode) l2cap_event_asserts_alt.assert_event_occurs( lambda log: is_configuration_request(log) and \ log.configuration_request.fcs_config == l2cap_cert_pb2.FcsConfig.NO_FCS) self.cert_device.l2cap.SendSFrame( l2cap_cert_pb2.SFrame( channel=self.scid_dcid_map[scid], p=1, withFcs=True)) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and \ log.data_packet.channel == scid and \ log.data_packet.payload == b'\x81\x00\x75\xe8' ) def test_transmit_i_frames(self): """ Loading system/gd/l2cap/classic/internal/signalling_manager.cc +5 −5 Original line number Diff line number Diff line Loading @@ -159,10 +159,10 @@ void ClassicSignallingManager::OnConnectionRequest(SignalId signal_id, Psm psm, configuration_state.incoming_mtu_ = initial_config.incoming_mtu; auto fcs_option = std::make_unique<FrameCheckSequenceOption>(); fcs_option->fcs_type_ = FcsType::DEFAULT; if (!link_->GetRemoteSupportsFcs()) { fcs_option->fcs_type_ = FcsType::NO_FCS; configuration_state.fcs_type_ = FcsType::NO_FCS; if (link_->GetRemoteSupportsFcs()) { fcs_option->fcs_type_ = FcsType::DEFAULT; configuration_state.fcs_type_ = FcsType::DEFAULT; } auto retransmission_flow_control_configuration = std::make_unique<RetransmissionAndFlowControlConfigurationOption>(); Loading Loading @@ -485,7 +485,7 @@ void ClassicSignallingManager::OnInformationRequest(SignalId signal_id, Informat case InformationRequestInfoType::EXTENDED_FEATURES_SUPPORTED: { // TODO: implement this response auto response = InformationResponseExtendedFeaturesBuilder::Create( signal_id.Value(), InformationRequestResult::SUCCESS, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0); signal_id.Value(), InformationRequestResult::SUCCESS, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0); enqueue_buffer_->Enqueue(std::move(response), handler_); break; } Loading Loading
system/gd/l2cap/classic/cert/api.proto +8 −1 Original line number Diff line number Diff line Loading @@ -54,6 +54,7 @@ message SFrame { uint32 f = 4; uint32 p = 5; uint32 s = 6; bool with_fcs = 7; } message SendSFrameResult {} Loading Loading @@ -99,12 +100,18 @@ message ChannelRetransmissionFlowControlConfig { uint32 mps = 6; } enum FcsConfig { NO_FCS = 0; DEFAULT = 1; NON = 2; } message ConfigurationRequest { uint32 dcid = 1; uint32 signal_id = 2; uint32 mtu = 3; ChannelRetransmissionFlowControlConfig retransmission_config = 4; bool fcs = 5; FcsConfig fcs_config = 5; } enum ConfigurationResult { Loading
system/gd/l2cap/classic/cert/cert.cc +30 −5 Original line number Diff line number Diff line Loading @@ -100,7 +100,12 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { auto f = static_cast<Final>(request->f()); auto p = static_cast<Poll>(request->p()); auto s = static_cast<SupervisoryFunction>(request->s()); auto builder = EnhancedSupervisoryFrameBuilder::Create(request->channel(), s, p, f, request->req_seq()); std::unique_ptr<packet::BasePacketBuilder> builder; if (request->with_fcs()) { builder = EnhancedSupervisoryFrameWithFcsBuilder::Create(request->channel(), s, p, f, request->req_seq()); } else { builder = EnhancedSupervisoryFrameBuilder::Create(request->channel(), s, p, f, request->req_seq()); } outgoing_packet_queue_.push(std::move(builder)); send_packet_from_queue(); return ::grpc::Status::OK; Loading Loading @@ -140,9 +145,11 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { option->monitor_time_out_ = 12000; option->maximum_pdu_size_ = 1010; config.push_back(std::move(option)); auto no_fcs = std::make_unique<FrameCheckSequenceOption>(); no_fcs->fcs_type_ = FcsType::NO_FCS; config.push_back(std::move(no_fcs)); auto fcs = std::make_unique<FrameCheckSequenceOption>(); if (request->fcs_config() != FcsConfig::NON) { fcs->fcs_type_ = request->fcs_config() ? FcsType::DEFAULT : FcsType::NO_FCS; config.push_back(std::move(fcs)); } } auto builder = ConfigurationRequestBuilder::Create(request->signal_id(), request->dcid(), Continuation::END, std::move(config)); Loading Loading @@ -242,7 +249,8 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { } case InformationRequestType::EXTENDED_FEATURES: { auto builder = InformationResponseExtendedFeaturesBuilder::Create( request->signal_id(), InformationRequestResult::SUCCESS, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0); request->signal_id(), InformationRequestResult::SUCCESS, 0, 0, 0, 1, 0, request->information_value() & (1 << 5) ? 1 : 0, 0, 0, 0, 0); auto l2cap_builder = BasicFrameBuilder::Create(kClassicSignallingCid, std::move(builder)); outgoing_packet_queue_.push(std::move(l2cap_builder)); send_packet_from_queue(); Loading Loading @@ -349,6 +357,7 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { response.mutable_configuration_request()->set_signal_id(control_view.GetIdentifier()); response.mutable_configuration_request()->set_dcid(view.GetDestinationCid()); bool fcs_set = false; for (auto& option : view.GetConfig()) { if (option->type_ == ConfigurationOptionType::RETRANSMISSION_AND_FLOW_CONTROL) { auto config = RetransmissionAndFlowControlConfigurationOption::Specialize(option.get()); Loading @@ -365,6 +374,22 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { response.mutable_configuration_request()->mutable_retransmission_config()->set_mps( config->maximum_pdu_size_); } if (option->type_ == ConfigurationOptionType::FRAME_CHECK_SEQUENCE) { auto fcs_type = FrameCheckSequenceOption::Specialize(option.get())->fcs_type_; LOG_INFO("fce_type=%hhu", fcs_type); switch (fcs_type) { case FcsType::NO_FCS: response.mutable_configuration_request()->set_fcs_config(FcsConfig::NO_FCS); break; case FcsType::DEFAULT: response.mutable_configuration_request()->set_fcs_config(FcsConfig::DEFAULT); break; } fcs_set = true; } } if (!fcs_set) { response.mutable_configuration_request()->set_fcs_config(FcsConfig::NON); } LogEvent(response); break; Loading
system/gd/l2cap/classic/cert/pts_l2cap_test.py +20 −0 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ from facade import common_pb2 from facade import rootservice_pb2 as facade_rootservice_pb2 from l2cap.classic import facade_pb2 as l2cap_facade_pb2 from google.protobuf import empty_pb2 from neighbor.facade import facade_pb2 as neighbor_facade class PTSL2capTest(PTSBaseTestClass): Loading @@ -47,6 +48,9 @@ class PTSL2capTest(PTSBaseTestClass): self.pts_address = common_pb2.BluetoothAddress( address=str.encode(pts_address)) self.device_under_test.neighbor.EnablePageScan( neighbor_facade.EnableMsg(enabled=True)) def teardown_test(self): self.device_under_test.rootservice.StopStack( facade_rootservice_pb2.StopStackRequest()) Loading @@ -72,6 +76,22 @@ class PTSL2capTest(PTSBaseTestClass): lambda device: device.remote.address == self.pts_address.address, timeout=timedelta(seconds=timeout)) def test_L2CAP_EXF_BV_03_C(self): """ L2CAP/EXF/BV-03-C [Extended Features Information Response for FCS Option] Verify the IUT can format an Information Response for the information type of Extended Features that correctly identifies that the FCS Option is locally supported. """ with self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_close_asserts = EventAsserts( dut_connection_close_stream) psm = 1 retransmission_mode = l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM self.device_under_test.l2cap.SetDynamicChannel( l2cap_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, retransmission_mode=retransmission_mode)) time.sleep(5) def test_L2CAP_COS_CED_BV_01_C(self): """ L2CAP/COS/CED/BV-01-C [Request Connection] Loading
system/gd/l2cap/classic/cert/simple_l2cap_test.py +219 −1 Original line number Diff line number Diff line Loading @@ -485,7 +485,225 @@ class SimpleL2capTest(GdBaseTestClass): lambda log : is_information_response(log) and \ log.information_response.signal_id == signal_id and \ log.information_response.type == expected_log_type and \ log.information_response.information_value | expected_mask == expected_mask) log.information_response.information_value & expected_mask == expected_mask) def test_extended_feature_info_response_fcs(self): """ L2CAP/EXF/BV-03-C [Extended Features Information Response for FCS Option] """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) signal_id = 3 self.cert_device.l2cap.SendInformationRequest( l2cap_cert_pb2.InformationRequest( type=l2cap_cert_pb2.InformationRequestType. EXTENDED_FEATURES, signal_id=signal_id)) l2cap_event_asserts_alt.assert_event_occurs_at_most( is_information_response, 1) expected_log_type = l2cap_cert_pb2.InformationRequestType.EXTENDED_FEATURES expected_mask = 1 << 5 l2cap_event_asserts.assert_event_occurs( lambda log : is_information_response(log) and \ log.information_response.signal_id == signal_id and \ log.information_response.type == expected_log_type and \ log.information_response.information_value & expected_mask == expected_mask) def test_config_channel_not_use_FCS(self): """ L2CAP/FOC/BV-01-C [IUT Initiated Configuration of the FCS Option] """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM scid = 0x0101 self._setup_link(l2cap_event_asserts) self._open_channel( l2cap_event_asserts, scid=scid, mode=self.retransmission_mode) self.device_under_test.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and \ log.data_packet.channel == scid and \ log.data_packet.payload == b'\x00\x00abc' ) def test_explicitly_request_use_FCS(self): """ L2CAP/FOC/BV-02-C [Lower Tester Explicitly Requests FCS should be Used] """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM scid = 0x0101 information_value = 1 << 3 | 1 << 5 def handle_information_request(log): log = log.information_request self.cert_device.l2cap.SendInformationResponse( l2cap_cert_pb2.InformationResponse( type=log.type, signal_id=log.signal_id, information_value=information_value)) l2cap_log_stream.unregister_callback( self.handle_information_request, matcher_fn=is_information_request) l2cap_log_stream.register_callback( handle_information_request, matcher_fn=is_information_request) def handle_connection_response(log): log = log.connection_response self.scid_dcid_map[log.scid] = log.dcid l2cap_log_stream.unregister_callback( self.handle_connection_response, matcher_fn=is_connection_response) l2cap_log_stream.register_callback( handle_connection_response, matcher_fn=is_connection_response) def handle_configuration_request(log): log = log.configuration_request if log.dcid not in self.scid_dcid_map: return dcid = self.scid_dcid_map[log.dcid] self.cert_device.l2cap.SendConfigurationResponse( l2cap_cert_pb2.ConfigurationResponse( scid=dcid, signal_id=log.signal_id, retransmission_config=l2cap_cert_pb2. ChannelRetransmissionFlowControlConfig( mode=self.retransmission_mode))) self.cert_device.l2cap.SendConfigurationRequest( l2cap_cert_pb2.ConfigurationRequest( dcid=dcid, signal_id=log.signal_id + 1, retransmission_config=l2cap_cert_pb2. ChannelRetransmissionFlowControlConfig( mode=self.retransmission_mode), fcs_config=l2cap_cert_pb2.FcsConfig.DEFAULT)) l2cap_log_stream.unregister_callback( self.handle_configuration_request, matcher_fn=is_configuration_request) l2cap_log_stream.register_callback( handle_configuration_request, matcher_fn=is_configuration_request) self._setup_link(l2cap_event_asserts) self._open_channel( l2cap_event_asserts, scid=scid, mode=self.retransmission_mode) l2cap_event_asserts_alt.assert_event_occurs( lambda log: is_configuration_request(log) and \ log.configuration_request.fcs_config == l2cap_cert_pb2.FcsConfig.NO_FCS) self.device_under_test.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and \ log.data_packet.channel == scid and \ basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b'abc\x0f\xb6' ) def test_implicitly_request_use_FCS(self): """ L2CAP/FOC/BV-03-C [Lower Tester Implicitly Requests FCS should be Used] """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM scid = 0x41 psm = 0x41 information_value = 1 << 3 | 1 << 5 def handle_information_request(log): log = log.information_request self.cert_device.l2cap.SendInformationResponse( l2cap_cert_pb2.InformationResponse( type=log.type, signal_id=log.signal_id, information_value=information_value)) l2cap_log_stream.unregister_callback( self.handle_information_request, matcher_fn=is_information_request) l2cap_log_stream.register_callback( handle_information_request, matcher_fn=is_information_request) def handle_connection_response(log): log = log.connection_response self.scid_dcid_map[log.scid] = log.dcid l2cap_log_stream.unregister_callback( self.handle_connection_response, matcher_fn=is_connection_response) l2cap_log_stream.register_callback( handle_connection_response, matcher_fn=is_connection_response) def handle_configuration_request(log): log = log.configuration_request if log.dcid not in self.scid_dcid_map: return dcid = self.scid_dcid_map[log.dcid] self.cert_device.l2cap.SendConfigurationResponse( l2cap_cert_pb2.ConfigurationResponse( scid=dcid, signal_id=log.signal_id, retransmission_config=l2cap_cert_pb2. ChannelRetransmissionFlowControlConfig( mode=self.retransmission_mode))) self.cert_device.l2cap.SendConfigurationRequest( l2cap_cert_pb2.ConfigurationRequest( dcid=dcid, signal_id=log.signal_id + 1, retransmission_config=l2cap_cert_pb2. ChannelRetransmissionFlowControlConfig( mode=self.retransmission_mode), fcs_config=l2cap_cert_pb2.FcsConfig.NON)) l2cap_log_stream.unregister_callback( self.handle_configuration_request, matcher_fn=is_configuration_request) l2cap_log_stream.register_callback( handle_configuration_request, matcher_fn=is_configuration_request) self._setup_link(l2cap_event_asserts) self._open_channel( l2cap_event_asserts, scid=scid, psm=psm, mode=self.retransmission_mode) l2cap_event_asserts_alt.assert_event_occurs( lambda log: is_configuration_request(log) and \ log.configuration_request.fcs_config == l2cap_cert_pb2.FcsConfig.NO_FCS) self.cert_device.l2cap.SendSFrame( l2cap_cert_pb2.SFrame( channel=self.scid_dcid_map[scid], p=1, withFcs=True)) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and \ log.data_packet.channel == scid and \ log.data_packet.payload == b'\x81\x00\x75\xe8' ) def test_transmit_i_frames(self): """ Loading
system/gd/l2cap/classic/internal/signalling_manager.cc +5 −5 Original line number Diff line number Diff line Loading @@ -159,10 +159,10 @@ void ClassicSignallingManager::OnConnectionRequest(SignalId signal_id, Psm psm, configuration_state.incoming_mtu_ = initial_config.incoming_mtu; auto fcs_option = std::make_unique<FrameCheckSequenceOption>(); fcs_option->fcs_type_ = FcsType::DEFAULT; if (!link_->GetRemoteSupportsFcs()) { fcs_option->fcs_type_ = FcsType::NO_FCS; configuration_state.fcs_type_ = FcsType::NO_FCS; if (link_->GetRemoteSupportsFcs()) { fcs_option->fcs_type_ = FcsType::DEFAULT; configuration_state.fcs_type_ = FcsType::DEFAULT; } auto retransmission_flow_control_configuration = std::make_unique<RetransmissionAndFlowControlConfigurationOption>(); Loading Loading @@ -485,7 +485,7 @@ void ClassicSignallingManager::OnInformationRequest(SignalId signal_id, Informat case InformationRequestInfoType::EXTENDED_FEATURES_SUPPORTED: { // TODO: implement this response auto response = InformationResponseExtendedFeaturesBuilder::Create( signal_id.Value(), InformationRequestResult::SUCCESS, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0); signal_id.Value(), InformationRequestResult::SUCCESS, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0); enqueue_buffer_->Enqueue(std::move(response), handler_); break; } Loading