Loading system/gd/cert/matchers.py +40 −0 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType import logging Loading Loading @@ -72,6 +73,13 @@ class L2capMatchers(object): def ExtractBasicFrame(scid): return lambda packet: L2capMatchers._basic_frame_for(packet, scid) @staticmethod def InformationResponseExtendedFeatures(supports_ertm=None, supports_streaming=None, supports_fcs=None, supports_fixed_channels=None): return lambda packet: L2capMatchers._is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels) @staticmethod def _basic_frame(packet): if packet is None: Loading Loading @@ -163,3 +171,35 @@ class L2capMatchers(object): response = l2cap_packets.DisconnectionResponseView(frame) return response.GetSourceCid() == scid and response.GetDestinationCid( ) == dcid @staticmethod def _information_response_with_type(packet, info_type): frame = L2capMatchers.control_frame_with_code( packet, CommandCode.INFORMATION_RESPONSE) if frame is None: return None response = l2cap_packets.InformationResponseView(frame) if response.GetInfoType() != info_type: return None return response @staticmethod def _is_matching_information_response_extended_features( packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels): frame = L2capMatchers._information_response_with_type( packet, InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) if frame is None: return False features = l2cap_packets.InformationResponseExtendedFeaturesView(frame) if supports_ertm is not None and features.GetEnhancedRetransmissionMode( ) != supports_ertm: return False if supports_streaming is not None and features.GetStreamingMode != supports_streaming: return False if supports_fcs is not None and features.GetFcsOption() != supports_fcs: return False if supports_fixed_channels is not None and features.GetFixedChannels( ) != supports_fixed_channels: return False return True system/gd/l2cap/classic/cert/l2cap_test.py +12 −86 Original line number Diff line number Diff line Loading @@ -37,6 +37,7 @@ from bluetooth_packets_python3.l2cap_packets import Final from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction from bluetooth_packets_python3.l2cap_packets import Poll from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType from cert_l2cap import CertL2cap # Assemble a sample packet. TODO: Use RawBuilder Loading Loading @@ -271,19 +272,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): 1, information_request) self.cert_send_b_frame(information_request_l2cap) def is_correct_information_response(l2cap_packet): l2cap_control_view = l2cap_packets.ControlView( l2cap_packet.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: return False information_response_view = l2cap_packets.InformationResponseView( l2cap_control_view) return information_response_view.GetInfoType( ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED assertThat(self.cert_l2cap.get_control_channel()).emits( is_correct_information_response) L2capMatchers.InformationResponseExtendedFeatures()) def test_extended_feature_info_response_ertm(self): """ Loading @@ -301,23 +291,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass): 1, information_request) self.cert_send_b_frame(information_request_l2cap) def is_correct_information_response(l2cap_packet): l2cap_control_view = l2cap_packets.ControlView( l2cap_packet.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: return False information_response_view = l2cap_packets.InformationResponseView( l2cap_control_view) if information_response_view.GetInfoType( ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: return False extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( information_response_view) return extended_features_view.GetEnhancedRetransmissionMode() assertThat(self.cert_l2cap.get_control_channel()).emits( is_correct_information_response) L2capMatchers.InformationResponseExtendedFeatures( supports_ertm=True)) def test_extended_feature_info_response_streaming(self): """ Loading @@ -335,27 +311,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass): 1, information_request) self.cert_send_b_frame(information_request_l2cap) def is_correct_information_response(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: return False information_response_view = l2cap_packets.InformationResponseView( l2cap_control_view) if information_response_view.GetInfoType( ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: return False extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( information_response_view) return extended_features_view.GetStreamingMode() assertThat(self.cert_acl).emits(is_correct_information_response) assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.InformationResponseExtendedFeatures( supports_streaming=True)) def test_extended_feature_info_response_fcs(self): """ Loading @@ -373,23 +331,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass): 1, information_request) self.cert_send_b_frame(information_request_l2cap) def is_correct_information_response(l2cap_packet): l2cap_control_view = l2cap_packets.ControlView( l2cap_packet.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: return False information_response_view = l2cap_packets.InformationResponseView( l2cap_control_view) if information_response_view.GetInfoType( ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: return False extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( information_response_view) return extended_features_view.GetFcsOption() assertThat(self.cert_l2cap.get_control_channel()).emits( is_correct_information_response) L2capMatchers.InformationResponseExtendedFeatures( supports_fcs=True)) def test_extended_feature_info_response_fixed_channels(self): """ Loading @@ -406,27 +350,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass): 1, information_request) self.cert_send_b_frame(information_request_l2cap) def is_correct_information_response(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: return False information_response_view = l2cap_packets.InformationResponseView( l2cap_control_view) if information_response_view.GetInfoType( ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: return False extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( information_response_view) return extended_features_view.GetFixedChannels() assertThat(self.cert_acl).emits(is_correct_information_response) assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.InformationResponseExtendedFeatures( supports_fixed_channels=True)) def test_config_channel_not_use_FCS(self): """ Loading Loading
system/gd/cert/matchers.py +40 −0 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType import logging Loading Loading @@ -72,6 +73,13 @@ class L2capMatchers(object): def ExtractBasicFrame(scid): return lambda packet: L2capMatchers._basic_frame_for(packet, scid) @staticmethod def InformationResponseExtendedFeatures(supports_ertm=None, supports_streaming=None, supports_fcs=None, supports_fixed_channels=None): return lambda packet: L2capMatchers._is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels) @staticmethod def _basic_frame(packet): if packet is None: Loading Loading @@ -163,3 +171,35 @@ class L2capMatchers(object): response = l2cap_packets.DisconnectionResponseView(frame) return response.GetSourceCid() == scid and response.GetDestinationCid( ) == dcid @staticmethod def _information_response_with_type(packet, info_type): frame = L2capMatchers.control_frame_with_code( packet, CommandCode.INFORMATION_RESPONSE) if frame is None: return None response = l2cap_packets.InformationResponseView(frame) if response.GetInfoType() != info_type: return None return response @staticmethod def _is_matching_information_response_extended_features( packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels): frame = L2capMatchers._information_response_with_type( packet, InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) if frame is None: return False features = l2cap_packets.InformationResponseExtendedFeaturesView(frame) if supports_ertm is not None and features.GetEnhancedRetransmissionMode( ) != supports_ertm: return False if supports_streaming is not None and features.GetStreamingMode != supports_streaming: return False if supports_fcs is not None and features.GetFcsOption() != supports_fcs: return False if supports_fixed_channels is not None and features.GetFixedChannels( ) != supports_fixed_channels: return False return True
system/gd/l2cap/classic/cert/l2cap_test.py +12 −86 Original line number Diff line number Diff line Loading @@ -37,6 +37,7 @@ from bluetooth_packets_python3.l2cap_packets import Final from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction from bluetooth_packets_python3.l2cap_packets import Poll from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType from cert_l2cap import CertL2cap # Assemble a sample packet. TODO: Use RawBuilder Loading Loading @@ -271,19 +272,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): 1, information_request) self.cert_send_b_frame(information_request_l2cap) def is_correct_information_response(l2cap_packet): l2cap_control_view = l2cap_packets.ControlView( l2cap_packet.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: return False information_response_view = l2cap_packets.InformationResponseView( l2cap_control_view) return information_response_view.GetInfoType( ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED assertThat(self.cert_l2cap.get_control_channel()).emits( is_correct_information_response) L2capMatchers.InformationResponseExtendedFeatures()) def test_extended_feature_info_response_ertm(self): """ Loading @@ -301,23 +291,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass): 1, information_request) self.cert_send_b_frame(information_request_l2cap) def is_correct_information_response(l2cap_packet): l2cap_control_view = l2cap_packets.ControlView( l2cap_packet.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: return False information_response_view = l2cap_packets.InformationResponseView( l2cap_control_view) if information_response_view.GetInfoType( ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: return False extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( information_response_view) return extended_features_view.GetEnhancedRetransmissionMode() assertThat(self.cert_l2cap.get_control_channel()).emits( is_correct_information_response) L2capMatchers.InformationResponseExtendedFeatures( supports_ertm=True)) def test_extended_feature_info_response_streaming(self): """ Loading @@ -335,27 +311,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass): 1, information_request) self.cert_send_b_frame(information_request_l2cap) def is_correct_information_response(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: return False information_response_view = l2cap_packets.InformationResponseView( l2cap_control_view) if information_response_view.GetInfoType( ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: return False extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( information_response_view) return extended_features_view.GetStreamingMode() assertThat(self.cert_acl).emits(is_correct_information_response) assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.InformationResponseExtendedFeatures( supports_streaming=True)) def test_extended_feature_info_response_fcs(self): """ Loading @@ -373,23 +331,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass): 1, information_request) self.cert_send_b_frame(information_request_l2cap) def is_correct_information_response(l2cap_packet): l2cap_control_view = l2cap_packets.ControlView( l2cap_packet.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: return False information_response_view = l2cap_packets.InformationResponseView( l2cap_control_view) if information_response_view.GetInfoType( ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: return False extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( information_response_view) return extended_features_view.GetFcsOption() assertThat(self.cert_l2cap.get_control_channel()).emits( is_correct_information_response) L2capMatchers.InformationResponseExtendedFeatures( supports_fcs=True)) def test_extended_feature_info_response_fixed_channels(self): """ Loading @@ -406,27 +350,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass): 1, information_request) self.cert_send_b_frame(information_request_l2cap) def is_correct_information_response(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: return False information_response_view = l2cap_packets.InformationResponseView( l2cap_control_view) if information_response_view.GetInfoType( ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: return False extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( information_response_view) return extended_features_view.GetFixedChannels() assertThat(self.cert_acl).emits(is_correct_information_response) assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.InformationResponseExtendedFeatures( supports_fixed_channels=True)) def test_config_channel_not_use_FCS(self): """ Loading