Loading system/gd/l2cap/classic/cert/l2cap_test.py +238 −2 Original line number Diff line number Diff line Loading @@ -56,12 +56,157 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.device_under_test.neighbor.EnablePageScan( neighbor_facade.EnableMsg(enabled=True)) self.cert_acl_handle = 0 def _on_connection_request(self, l2cap_control_view): connection_request_view = l2cap_packets.ConnectionRequestView( l2cap_control_view) sid = connection_request_view.GetIdentifier() cid = connection_request_view.GetSourceCid() connection_response = l2cap_packets.ConnectionResponseBuilder( id, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS, l2cap_packets.ConnectionResponseStatus. NO_FURTHER_INFORMATION_AVAILABLE) connection_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, connection_response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(connection_response_l2cap.Serialize()))) return True def _on_connection_response(self, l2cap_control_view): connection_request_view = l2cap_packets.ConnectionResponseView( l2cap_control_view) sid = connection_request_view.GetIdentifier() cid = connection_request_view.GetDestinationCid() config_request = l2cap_packets.ConfigurationRequestBuilder( sid + 1, cid, l2cap_packets.Continuation.END, []) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(config_request_l2cap.Serialize()))) return True def _on_configuration_request(self, l2cap_control_view): configuration_request = l2cap_packets.ConfigurationRequestView( l2cap_control_view) sid = configuration_request.GetIdentifier() cid = configuration_request.GetSourceCid() config_response = l2cap_packets.ConfigurationResponseBuilder( sid, cid, l2cap_packets.Continuation.END, l2cap_packets.ConfigurationResponseResult.SUCCESS, []) config_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(config_response_l2cap.Serialize()))) def _on_configuration_response(self, l2cap_control_view): configuration_response = l2cap_packets.ConfigurationResponseView( l2cap_control_view) sid = configuration_response.GetIdentifier() def _on_disconnection_request(self, l2cap_control_view): disconnection_request = l2cap_packets.DisconnectionRequestView( l2cap_control_view) sid = disconnection_request.GetIdentifier() scid = disconnection_request.GetSourceCid() dcid = disconnection_request.GetDestinationCid() disconnection_response = l2cap_packets.DisconnectionResponseBuilder( sid, dcid, scid) disconnection_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, disconnection_response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(disconnection_response_l2cap.Serialize()))) def _on_disconnection_response(self, l2cap_control_view): disconnection_response = l2cap_packets.DisconnectionResponseView( l2cap_control_view) def _on_information_request(self, l2cap_control_view): information_request = l2cap_packets.InformationRequestView( l2cap_control_view) sid = information_request.GetIdentifier() information_type = information_request.GetInfoType() if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU: response = l2cap_packets.InformationResponseConnectionlessMtuBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 100) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(response_l2cap.Serialize()))) return if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: response = l2cap_packets.InformationResponseExtendedFeaturesBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(response_l2cap.Serialize()))) return if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED: response = l2cap_packets.InformationResponseFixedChannelsBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 2) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(response_l2cap.Serialize()))) return def _on_information_response(self, l2cap_control_view): information_response = l2cap_packets.InformationResponseView( l2cap_control_view) def teardown_test(self): self.device_under_test.rootservice.StopStack( facade_rootservice.StopStackRequest()) self.cert_device.rootservice.StopStack( facade_rootservice.StopStackRequest()) def _handle_control_packet(self, l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONNECTION_REQUEST: return self._on_connection_request(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONNECTION_RESPONSE: return self._on_connection_response(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONFIGURATION_REQUEST: return self._on_configuration_request(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE: return self._on_configuration_response(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.DISCONNECTION_REQUEST: return self._on_disconnection_request(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.DISCONNECTION_RESPONSE: return self._on_disconnection_response(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.INFORMATION_REQUEST: return self._on_information_request(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.INFORMATION_RESPONSE: return self._on_information_response(l2cap_control_view) def _setup_link_from_cert(self): self.device_under_test.neighbor.EnablePageScan( Loading Loading @@ -94,6 +239,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass): connection_event_asserts.assert_event_occurs(get_handle) self.cert_acl_handle = handle return handle def _open_channel( Loading Loading @@ -160,10 +306,12 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.cert_device.hci_acl_manager.FetchAclData( empty_proto.Empty())) as cert_acl_data_stream: cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_stream.register_callback(self._handle_control_packet) scid = 0x41 dcid = self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid) psm = 0x01 dcid = self._open_channel(cert_acl_data_stream, psm, cert_acl_handle, scid) close_channel = l2cap_packets.DisconnectionRequestBuilder( 1, dcid, scid) Loading Loading @@ -191,3 +339,91 @@ class L2capTest(GdFacadeOnlyBaseTestClass): cert_acl_data_asserts.assert_event_occurs( verify_disconnection_response) def test_disconnect_on_timeout(self): """ L2CAP/COS/CED/BV-08-C """ cert_acl_handle = self._setup_link_from_cert() with EventCallbackStream( self.cert_device.hci_acl_manager.FetchAclData( empty_proto.Empty())) as cert_acl_data_stream: cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) scid = 0x41 psm = 0x01 self.device_under_test.l2cap.SetDynamicChannel( l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm)) # Don't send configuration response back self._on_configuration_request = lambda _: True dcid = self._open_channel(cert_acl_data_stream, psm, cert_acl_handle, scid) def is_configuration_response(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) return l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE cert_acl_data_asserts.assert_none_matching( is_configuration_response) def test_respond_to_echo_request(self): """ L2CAP/COS/ECH/BV-01-C [Respond to Echo Request] Verify that the IUT responds to an echo request. """ cert_acl_handle = self._setup_link_from_cert() with EventCallbackStream( self.cert_device.hci_acl_manager.FetchAclData( empty_proto.Empty())) as cert_acl_data_stream: cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) echo_request = l2cap_packets.EchoRequestBuilder( 100, l2cap_packets.DisconnectionRequestBuilder(1, 2, 3)) echo_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, echo_request) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=cert_acl_handle, payload=bytes(echo_request_l2cap.Serialize()))) cert_acl_data_asserts.assert_event_occurs( lambda packet: b"\x06\x01\x04\x00\x02\x00\x03\x00" in packet.payload ) def test_reject_unknown_command(self): """ L2CAP/COS/CED/BI-01-C """ cert_acl_handle = self._setup_link_from_cert() with EventCallbackStream( self.cert_device.hci_acl_manager.FetchAclData( empty_proto.Empty())) as cert_acl_data_stream: cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00" self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=cert_acl_handle, payload=bytes(invalid_command_packet))) def is_command_reject(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) return l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.COMMAND_REJECT cert_acl_data_asserts.assert_event_occurs(is_command_reject) system/gd/l2cap/classic/cert/simple_l2cap_test.py +0 −66 Original line number Diff line number Diff line Loading @@ -338,32 +338,6 @@ class SimpleL2capTest(GdBaseTestClass): self._open_channel(l2cap_event_asserts, scid=0x0101, psm=0x1) self._open_channel(l2cap_event_asserts, scid=0x0102, psm=0x3) def test_disconnect_on_timeout(self): """ L2CAP/COS/CED/BV-08-C """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) scid = 0x0101 psm = 1 self._open_channel(l2cap_event_asserts, scid=0x0101, psm=0x1) self.device_under_test.l2cap.SetDynamicChannel( l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm)) # Don't send configuration response back l2cap_log_stream.unregister_callback( self.handle_configuration_request, matcher_fn=is_configuration_request) self.cert_device.l2cap.SendConnectionRequest( l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) l2cap_event_asserts.assert_none_matching(is_configuration_response) def test_basic_operation_request_connection(self): """ L2CAP/COS/CED/BV-01-C [Request Connection] Loading @@ -384,46 +358,6 @@ class SimpleL2capTest(GdBaseTestClass): lambda log: is_connection_request(log) and log.connection_request.psm == psm ) def test_respond_to_echo_request(self): """ L2CAP/COS/ECH/BV-01-C [Respond to Echo Request] Verify that the IUT responds to an echo request. """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) # TODO: Replace with constructed packets when PDL is available echo_request_packet = b"\x08\x01\x00\x00" self.cert_device.l2cap.SendL2capPacket( l2cap_facade_pb2.L2capPacket( channel=1, payload=echo_request_packet)) l2cap_event_asserts.assert_event_occurs( lambda log: is_echo_response(log) and log.echo_response.signal_id == 0x01 ) def test_reject_unknown_command(self): """ L2CAP/COS/CED/BI-01-C """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) # TODO: Replace with constructed packets when PDL is available invalid_command_packet = b"\xff\x01\x00\x00" self.cert_device.l2cap.SendL2capPacket( l2cap_facade_pb2.L2capPacket( channel=1, payload=invalid_command_packet)) # command_reject_packet = b"\x01\x01\x02\x00\x00\x00" l2cap_event_asserts.assert_event_occurs( lambda log: is_command_reject(log) and log.command_reject.signal_id == 0x01 ) def test_query_for_1_2_features(self): """ L2CAP/COS/IEX/BV-01-C [Query for 1.2 Features] Loading Loading
system/gd/l2cap/classic/cert/l2cap_test.py +238 −2 Original line number Diff line number Diff line Loading @@ -56,12 +56,157 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.device_under_test.neighbor.EnablePageScan( neighbor_facade.EnableMsg(enabled=True)) self.cert_acl_handle = 0 def _on_connection_request(self, l2cap_control_view): connection_request_view = l2cap_packets.ConnectionRequestView( l2cap_control_view) sid = connection_request_view.GetIdentifier() cid = connection_request_view.GetSourceCid() connection_response = l2cap_packets.ConnectionResponseBuilder( id, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS, l2cap_packets.ConnectionResponseStatus. NO_FURTHER_INFORMATION_AVAILABLE) connection_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, connection_response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(connection_response_l2cap.Serialize()))) return True def _on_connection_response(self, l2cap_control_view): connection_request_view = l2cap_packets.ConnectionResponseView( l2cap_control_view) sid = connection_request_view.GetIdentifier() cid = connection_request_view.GetDestinationCid() config_request = l2cap_packets.ConfigurationRequestBuilder( sid + 1, cid, l2cap_packets.Continuation.END, []) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(config_request_l2cap.Serialize()))) return True def _on_configuration_request(self, l2cap_control_view): configuration_request = l2cap_packets.ConfigurationRequestView( l2cap_control_view) sid = configuration_request.GetIdentifier() cid = configuration_request.GetSourceCid() config_response = l2cap_packets.ConfigurationResponseBuilder( sid, cid, l2cap_packets.Continuation.END, l2cap_packets.ConfigurationResponseResult.SUCCESS, []) config_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(config_response_l2cap.Serialize()))) def _on_configuration_response(self, l2cap_control_view): configuration_response = l2cap_packets.ConfigurationResponseView( l2cap_control_view) sid = configuration_response.GetIdentifier() def _on_disconnection_request(self, l2cap_control_view): disconnection_request = l2cap_packets.DisconnectionRequestView( l2cap_control_view) sid = disconnection_request.GetIdentifier() scid = disconnection_request.GetSourceCid() dcid = disconnection_request.GetDestinationCid() disconnection_response = l2cap_packets.DisconnectionResponseBuilder( sid, dcid, scid) disconnection_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, disconnection_response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(disconnection_response_l2cap.Serialize()))) def _on_disconnection_response(self, l2cap_control_view): disconnection_response = l2cap_packets.DisconnectionResponseView( l2cap_control_view) def _on_information_request(self, l2cap_control_view): information_request = l2cap_packets.InformationRequestView( l2cap_control_view) sid = information_request.GetIdentifier() information_type = information_request.GetInfoType() if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU: response = l2cap_packets.InformationResponseConnectionlessMtuBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 100) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(response_l2cap.Serialize()))) return if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: response = l2cap_packets.InformationResponseExtendedFeaturesBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(response_l2cap.Serialize()))) return if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED: response = l2cap_packets.InformationResponseFixedChannelsBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 2) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=self.cert_acl_handle, payload=bytes(response_l2cap.Serialize()))) return def _on_information_response(self, l2cap_control_view): information_response = l2cap_packets.InformationResponseView( l2cap_control_view) def teardown_test(self): self.device_under_test.rootservice.StopStack( facade_rootservice.StopStackRequest()) self.cert_device.rootservice.StopStack( facade_rootservice.StopStackRequest()) def _handle_control_packet(self, l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONNECTION_REQUEST: return self._on_connection_request(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONNECTION_RESPONSE: return self._on_connection_response(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONFIGURATION_REQUEST: return self._on_configuration_request(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE: return self._on_configuration_response(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.DISCONNECTION_REQUEST: return self._on_disconnection_request(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.DISCONNECTION_RESPONSE: return self._on_disconnection_response(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.INFORMATION_REQUEST: return self._on_information_request(l2cap_control_view) if l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.INFORMATION_RESPONSE: return self._on_information_response(l2cap_control_view) def _setup_link_from_cert(self): self.device_under_test.neighbor.EnablePageScan( Loading Loading @@ -94,6 +239,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass): connection_event_asserts.assert_event_occurs(get_handle) self.cert_acl_handle = handle return handle def _open_channel( Loading Loading @@ -160,10 +306,12 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.cert_device.hci_acl_manager.FetchAclData( empty_proto.Empty())) as cert_acl_data_stream: cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_stream.register_callback(self._handle_control_packet) scid = 0x41 dcid = self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid) psm = 0x01 dcid = self._open_channel(cert_acl_data_stream, psm, cert_acl_handle, scid) close_channel = l2cap_packets.DisconnectionRequestBuilder( 1, dcid, scid) Loading Loading @@ -191,3 +339,91 @@ class L2capTest(GdFacadeOnlyBaseTestClass): cert_acl_data_asserts.assert_event_occurs( verify_disconnection_response) def test_disconnect_on_timeout(self): """ L2CAP/COS/CED/BV-08-C """ cert_acl_handle = self._setup_link_from_cert() with EventCallbackStream( self.cert_device.hci_acl_manager.FetchAclData( empty_proto.Empty())) as cert_acl_data_stream: cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) scid = 0x41 psm = 0x01 self.device_under_test.l2cap.SetDynamicChannel( l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm)) # Don't send configuration response back self._on_configuration_request = lambda _: True dcid = self._open_channel(cert_acl_data_stream, psm, cert_acl_handle, scid) def is_configuration_response(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) return l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE cert_acl_data_asserts.assert_none_matching( is_configuration_response) def test_respond_to_echo_request(self): """ L2CAP/COS/ECH/BV-01-C [Respond to Echo Request] Verify that the IUT responds to an echo request. """ cert_acl_handle = self._setup_link_from_cert() with EventCallbackStream( self.cert_device.hci_acl_manager.FetchAclData( empty_proto.Empty())) as cert_acl_data_stream: cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) echo_request = l2cap_packets.EchoRequestBuilder( 100, l2cap_packets.DisconnectionRequestBuilder(1, 2, 3)) echo_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, echo_request) self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=cert_acl_handle, payload=bytes(echo_request_l2cap.Serialize()))) cert_acl_data_asserts.assert_event_occurs( lambda packet: b"\x06\x01\x04\x00\x02\x00\x03\x00" in packet.payload ) def test_reject_unknown_command(self): """ L2CAP/COS/CED/BI-01-C """ cert_acl_handle = self._setup_link_from_cert() with EventCallbackStream( self.cert_device.hci_acl_manager.FetchAclData( empty_proto.Empty())) as cert_acl_data_stream: cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00" self.cert_device.hci_acl_manager.SendAclData( acl_manager_facade.AclData( handle=cert_acl_handle, payload=bytes(invalid_command_packet))) def is_command_reject(l2cap_packet): packet_bytes = l2cap_packet.payload l2cap_view = l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet_bytes))) if l2cap_view.GetChannelId() != 1: return False l2cap_control_view = l2cap_packets.ControlView( l2cap_view.GetPayload()) return l2cap_control_view.GetCode( ) == l2cap_packets.CommandCode.COMMAND_REJECT cert_acl_data_asserts.assert_event_occurs(is_command_reject)
system/gd/l2cap/classic/cert/simple_l2cap_test.py +0 −66 Original line number Diff line number Diff line Loading @@ -338,32 +338,6 @@ class SimpleL2capTest(GdBaseTestClass): self._open_channel(l2cap_event_asserts, scid=0x0101, psm=0x1) self._open_channel(l2cap_event_asserts, scid=0x0102, psm=0x3) def test_disconnect_on_timeout(self): """ L2CAP/COS/CED/BV-08-C """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) scid = 0x0101 psm = 1 self._open_channel(l2cap_event_asserts, scid=0x0101, psm=0x1) self.device_under_test.l2cap.SetDynamicChannel( l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm)) # Don't send configuration response back l2cap_log_stream.unregister_callback( self.handle_configuration_request, matcher_fn=is_configuration_request) self.cert_device.l2cap.SendConnectionRequest( l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) l2cap_event_asserts.assert_none_matching(is_configuration_response) def test_basic_operation_request_connection(self): """ L2CAP/COS/CED/BV-01-C [Request Connection] Loading @@ -384,46 +358,6 @@ class SimpleL2capTest(GdBaseTestClass): lambda log: is_connection_request(log) and log.connection_request.psm == psm ) def test_respond_to_echo_request(self): """ L2CAP/COS/ECH/BV-01-C [Respond to Echo Request] Verify that the IUT responds to an echo request. """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) # TODO: Replace with constructed packets when PDL is available echo_request_packet = b"\x08\x01\x00\x00" self.cert_device.l2cap.SendL2capPacket( l2cap_facade_pb2.L2capPacket( channel=1, payload=echo_request_packet)) l2cap_event_asserts.assert_event_occurs( lambda log: is_echo_response(log) and log.echo_response.signal_id == 0x01 ) def test_reject_unknown_command(self): """ L2CAP/COS/CED/BI-01-C """ with EventCallbackStream( self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) # TODO: Replace with constructed packets when PDL is available invalid_command_packet = b"\xff\x01\x00\x00" self.cert_device.l2cap.SendL2capPacket( l2cap_facade_pb2.L2capPacket( channel=1, payload=invalid_command_packet)) # command_reject_packet = b"\x01\x01\x02\x00\x00\x00" l2cap_event_asserts.assert_event_occurs( lambda log: is_command_reject(log) and log.command_reject.signal_id == 0x01 ) def test_query_for_1_2_features(self): """ L2CAP/COS/IEX/BV-01-C [Query for 1.2 Features] Loading