Loading system/gd/cert/matchers.py +18 −0 Original line number Diff line number Diff line Loading @@ -42,6 +42,14 @@ class L2capMatchers(object): def DisconnectionRequest(): return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST) @staticmethod def DisconnectionResponse(scid, dcid): return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid) @staticmethod def CommandReject(): return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT) @staticmethod def _basic_frame(packet): if packet is None: Loading Loading @@ -77,3 +85,13 @@ class L2capMatchers(object): return response.GetSourceCid() == scid and response.GetResult( ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid( ) != 0 @staticmethod def _is_matching_disconnection_response(packet, scid, dcid): frame = L2capMatchers._control_frame_with_code( packet, CommandCode.DISCONNECTION_RESPONSE) if frame is None: return False response = l2cap_packets.DisconnectionResponseView(frame) return response.GetSourceCid() == scid and response.GetDestinationCid( ) == dcid system/gd/l2cap/classic/cert/l2cap_test.py +5 −39 Original line number Diff line number Diff line Loading @@ -470,21 +470,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): close_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, close_channel) self.cert_send_b_frame(close_channel_l2cap) def verify_disconnection_response(packet): packet_bytes = packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.DISCONNECTION_RESPONSE: return False disconnection_response_view = l2cap_packets.DisconnectionResponseView( l2cap_control_view) return disconnection_response_view.GetSourceCid( ) == scid and disconnection_response_view.GetDestinationCid() == dcid assertThat(self.cert_acl).emits(verify_disconnection_response) assertThat(self.cert_acl).emits( L2capMatchers.DisconnectionResponse(scid, dcid)) def test_disconnect_on_timeout(self): """ Loading @@ -502,18 +489,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self._open_channel(1, scid, psm) def is_configuration_response(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) return l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE cert_acl_data_stream.assert_none_matching(is_configuration_response) cert_acl_data_stream.assert_none_matching( L2capMatchers.ConfigurationResponse()) def test_retry_config_after_rejection(self): """ Loading Loading @@ -563,18 +540,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass): invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00" self.cert_acl.send(invalid_command_packet) def is_command_reject(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) return l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.COMMAND_REJECT assertThat(self.cert_acl).emits(is_command_reject) assertThat(self.cert_acl).emits(L2capMatchers.CommandReject()) def test_query_for_1_2_features(self): """ Loading Loading
system/gd/cert/matchers.py +18 −0 Original line number Diff line number Diff line Loading @@ -42,6 +42,14 @@ class L2capMatchers(object): def DisconnectionRequest(): return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST) @staticmethod def DisconnectionResponse(scid, dcid): return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid) @staticmethod def CommandReject(): return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT) @staticmethod def _basic_frame(packet): if packet is None: Loading Loading @@ -77,3 +85,13 @@ class L2capMatchers(object): return response.GetSourceCid() == scid and response.GetResult( ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid( ) != 0 @staticmethod def _is_matching_disconnection_response(packet, scid, dcid): frame = L2capMatchers._control_frame_with_code( packet, CommandCode.DISCONNECTION_RESPONSE) if frame is None: return False response = l2cap_packets.DisconnectionResponseView(frame) return response.GetSourceCid() == scid and response.GetDestinationCid( ) == dcid
system/gd/l2cap/classic/cert/l2cap_test.py +5 −39 Original line number Diff line number Diff line Loading @@ -470,21 +470,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): close_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, close_channel) self.cert_send_b_frame(close_channel_l2cap) def verify_disconnection_response(packet): packet_bytes = packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) if l2cap_control_view.GetCode( ) != l2cap_packets.CommandCode.DISCONNECTION_RESPONSE: return False disconnection_response_view = l2cap_packets.DisconnectionResponseView( l2cap_control_view) return disconnection_response_view.GetSourceCid( ) == scid and disconnection_response_view.GetDestinationCid() == dcid assertThat(self.cert_acl).emits(verify_disconnection_response) assertThat(self.cert_acl).emits( L2capMatchers.DisconnectionResponse(scid, dcid)) def test_disconnect_on_timeout(self): """ Loading @@ -502,18 +489,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self._open_channel(1, scid, psm) def is_configuration_response(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) return l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE cert_acl_data_stream.assert_none_matching(is_configuration_response) cert_acl_data_stream.assert_none_matching( L2capMatchers.ConfigurationResponse()) def test_retry_config_after_rejection(self): """ Loading Loading @@ -563,18 +540,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass): invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00" self.cert_acl.send(invalid_command_packet) def is_command_reject(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) return l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.COMMAND_REJECT assertThat(self.cert_acl).emits(is_command_reject) assertThat(self.cert_acl).emits(L2capMatchers.CommandReject()) def test_query_for_1_2_features(self): """ Loading