Loading system/gd/cert/cert_testcases +1 −2 Original line number Diff line number Diff line SimpleHciTest SimpleL2capTest system/gd/cert/run_cert.sh +0 −2 Original line number Diff line number Diff line Loading @@ -5,5 +5,3 @@ # Run normal facade to cert API tests PYTHONPATH=$PYTHONPATH:$ANDROID_BUILD_TOP/out/host/linux-x86/lib64 python3.8 `which act.py` -c $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/host_only_config.json -tf $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/cert_testcases -tp $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd # Run new facade to facade API tests PYTHONPATH=$PYTHONPATH:$ANDROID_BUILD_TOP/out/host/linux-x86/lib64 python3.8 `which act.py` -c $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/host_only_config_facade_only.json -tf $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/cert_testcases_facade_only -tp $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd No newline at end of file system/gd/hal/hci_hal_host_rootcanal.cc +1 −1 Original line number Diff line number Diff line Loading @@ -207,7 +207,7 @@ class HciHalHostRootcanal : public HciHal { RUN_NO_INTR(received_size = recv(sock_fd_, buf, kH4HeaderSize, 0)); ASSERT_LOG(received_size != -1, "Can't receive from socket: %s", strerror(errno)); if (received_size == 0) { LOG_WARN("Can't read H4 header."); LOG_WARN("Can't read H4 header. EOF received"); raise(SIGINT); return; } Loading system/gd/hci/cert/simple_hci_test.pydeleted 100644 → 0 +0 −357 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2019 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import logging import os import sys sys.path.append(os.environ['ANDROID_BUILD_TOP'] + '/packages/modules/Bluetooth/system/gd') from cert.gd_base_test import GdBaseTestClass from cert.event_callback_stream import EventCallbackStream from cert.event_asserts import EventAsserts from cert import rootservice_pb2 as cert_rootservice_pb2 from facade import common_pb2 from facade import rootservice_pb2 as facade_rootservice_pb2 from google.protobuf import empty_pb2 from hci import facade_pb2 as hci_facade_pb2 from hci.cert import api_pb2 as hci_cert_pb2 from hci.cert import api_pb2_grpc as hci_cert_pb2_grpc class SimpleHciTest(GdBaseTestClass): def setup_test(self): self.device_under_test = self.gd_devices[0] self.cert_device = self.gd_cert_devices[0] self.device_under_test.rootservice.StartStack( facade_rootservice_pb2.StartStackRequest( module_under_test=facade_rootservice_pb2.BluetoothModule.Value( 'HCI'),)) self.cert_device.rootservice.StartStack( cert_rootservice_pb2.StartStackRequest( module_to_test=cert_rootservice_pb2.BluetoothModule.Value( 'HCI'),)) self.device_under_test.wait_channel_ready() self.cert_device.wait_channel_ready() self.device_under_test.hci.SetPageScanMode( hci_facade_pb2.PageScanMode(enabled=True)) self.cert_device.hci.SetPageScanMode( hci_cert_pb2.PageScanMode(enabled=True)) dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress( empty_pb2.Empty()).address self.device_under_test.address = dut_address cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress( empty_pb2.Empty()).address self.cert_device.address = cert_address self.dut_address = common_pb2.BluetoothAddress( address=self.device_under_test.address) self.cert_address = common_pb2.BluetoothAddress( address=self.cert_device.address) def teardown_test(self): self.device_under_test.rootservice.StopStack( facade_rootservice_pb2.StopStackRequest()) self.cert_device.rootservice.StopStack( cert_rootservice_pb2.StopStackRequest()) def test_none_event(self): with EventCallbackStream( self.device_under_test.hci.FetchConnectionComplete( empty_pb2.Empty())) as connection_complete_stream: connection_complete_asserts = EventAsserts( connection_complete_stream) connection_complete_asserts.assert_none() def _connect_from_dut(self): logging.debug("_connect_from_dut") policy = hci_cert_pb2.IncomingConnectionPolicy( remote=self.dut_address, accepted=True) self.cert_device.hci.SetIncomingConnectionPolicy(policy) with EventCallbackStream( self.device_under_test.hci.FetchConnectionComplete( empty_pb2.Empty())) as dut_connection_complete_stream: dut_connection_complete_asserts = EventAsserts( dut_connection_complete_stream) self.device_under_test.hci.Connect(self.cert_address) dut_connection_complete_asserts.assert_event_occurs( lambda event: self._get_handle(event)) def _disconnect_from_dut(self): logging.debug("_disconnect_from_dut") with EventCallbackStream( self.device_under_test.hci.FetchDisconnection( empty_pb2.Empty())) as dut_disconnection_stream: dut_disconnection_asserts = EventAsserts(dut_disconnection_stream) self.device_under_test.hci.Disconnect(self.cert_address) dut_disconnection_asserts.assert_event_occurs( lambda event: event.remote.address == self.cert_device.address) def _get_handle(self, event): if event.remote.address == self.cert_device.address: self.connection_handle = event.connection_handle return True return False def test_connect_disconnect_send_acl(self): self._connect_from_dut() with EventCallbackStream( self.cert_device.hci.FetchAclData( empty_pb2.Empty())) as cert_acl_stream: cert_acl_asserts = EventAsserts(cert_acl_stream) acl_data = hci_facade_pb2.AclData( remote=self.cert_address, payload=b'123') self.device_under_test.hci.SendAclData(acl_data) self.device_under_test.hci.SendAclData(acl_data) self.device_under_test.hci.SendAclData(acl_data) cert_acl_asserts.assert_event_occurs( lambda packet: b'123' in packet.payload and packet.remote == self.dut_address ) self._disconnect_from_dut() def test_connect_disconnect_receive_acl(self): self._connect_from_dut() with EventCallbackStream( self.device_under_test.hci.FetchAclData( empty_pb2.Empty())) as dut_acl_stream: dut_acl_asserts = EventAsserts(dut_acl_stream) acl_data = hci_cert_pb2.AclData( remote=self.dut_address, payload=b'123') self.cert_device.hci.SendAclData(acl_data) self.cert_device.hci.SendAclData(acl_data) self.cert_device.hci.SendAclData(acl_data) dut_acl_asserts.assert_event_occurs( lambda packet: b'123' in packet.payload and packet.remote == self.cert_address ) self._disconnect_from_dut() def test_reject_connection_request(self): with EventCallbackStream( self.device_under_test.hci.FetchConnectionFailed( empty_pb2.Empty())) as dut_connection_failed_stream: dut_connection_failed_asserts = EventAsserts( dut_connection_failed_stream) self.device_under_test.hci.Connect(self.cert_address) dut_connection_failed_asserts.assert_event_occurs( lambda event: event.remote == self.cert_address) def test_send_classic_security_command(self): self._connect_from_dut() with EventCallbackStream( self.device_under_test.hci_classic_security. FetchCommandCompleteEvent( empty_pb2.Empty())) as dut_command_complete_stream: dut_command_complete_asserts = EventAsserts( dut_command_complete_stream) self.device_under_test.hci.AuthenticationRequested( self.cert_address) # Link request self.device_under_test.hci_classic_security.LinkKeyRequestNegativeReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x040c) # Pin code request message = hci_facade_pb2.PinCodeRequestReplyMessage( remote=self.cert_address, len=4, pin_code=bytes("1234", encoding="ASCII")) self.device_under_test.hci_classic_security.PinCodeRequestReply( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x040d) self.device_under_test.hci_classic_security.PinCodeRequestNegativeReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x040e) # IO capability request message = hci_facade_pb2.IoCapabilityRequestReplyMessage( remote=self.cert_address, io_capability=0, oob_present=0, authentication_requirements=0) self.device_under_test.hci_classic_security.IoCapabilityRequestReply( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x042b) # message = hci_facade_pb2.IoCapabilityRequestNegativeReplyMessage( # remote=self.cert_address, # reason=1 # ) # # link_layer_controller.cc(447)] Check failed: security_manager_.GetAuthenticationAddress() == peer # self.device_under_test.hci_classic_security.IoCapabilityRequestNegativeReply(message) # User confirm request self.device_under_test.hci_classic_security.UserConfirmationRequestReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x042c) message = hci_facade_pb2.LinkKeyRequestReplyMessage( remote=self.cert_address, link_key=bytes( "4C68384139F574D836BCF34E9DFB01BF", encoding="ASCII")) self.device_under_test.hci_classic_security.LinkKeyRequestReply( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x040b) self.device_under_test.hci_classic_security.UserConfirmationRequestNegativeReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x042d) # User passkey request message = hci_facade_pb2.UserPasskeyRequestReplyMessage( remote=self.cert_address, passkey=999999, ) self.device_under_test.hci_classic_security.UserPasskeyRequestReply( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x042e) self.device_under_test.hci_classic_security.UserPasskeyRequestNegativeReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x042f) # Remote OOB data request message = hci_facade_pb2.RemoteOobDataRequestReplyMessage( remote=self.cert_address, c= b'\x19\x20\x21\x22\x23\x24\x25\x26\x19\x20\x21\x22\x23\x24\x25\x26', r= b'\x30\x31\x32\x33\x34\x35\x36\x37\x30\x31\x32\x33\x34\x35\x36\x37', ) self.device_under_test.hci_classic_security.RemoteOobDataRequestReply( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0430) self.device_under_test.hci_classic_security.RemoteOobDataRequestNegativeReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0433) # Read/Write/Delete link key message = hci_facade_pb2.ReadStoredLinkKeyMessage( remote=self.cert_address, read_all_flag=0, ) self.device_under_test.hci_classic_security.ReadStoredLinkKey( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c0d) message = hci_facade_pb2.WriteStoredLinkKeyMessage( num_keys_to_write=1, remote=self.cert_address, link_keys=bytes( "4C68384139F574D836BCF34E9DFB01BF", encoding="ASCII"), ) self.device_under_test.hci_classic_security.WriteStoredLinkKey( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c11) message = hci_facade_pb2.DeleteStoredLinkKeyMessage( remote=self.cert_address, delete_all_flag=0, ) self.device_under_test.hci_classic_security.DeleteStoredLinkKey( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c12) # Refresh Encryption Key message = hci_facade_pb2.RefreshEncryptionKeyMessage( connection_handle=self.connection_handle,) self.device_under_test.hci_classic_security.RefreshEncryptionKey( message) # Read/Write Simple Pairing Mode self.device_under_test.hci_classic_security.ReadSimplePairingMode( empty_pb2.Empty()) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c55) message = hci_facade_pb2.WriteSimplePairingModeMessage( simple_pairing_mode=1,) self.device_under_test.hci_classic_security.WriteSimplePairingMode( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c56) # Read local oob data self.device_under_test.hci_classic_security.ReadLocalOobData( empty_pb2.Empty()) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c57) # Send keypress notification message = hci_facade_pb2.SendKeypressNotificationMessage( remote=self.cert_address, notification_type=1, ) self.device_under_test.hci_classic_security.SendKeypressNotification( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c60) # Read local oob extended data self.device_under_test.hci_classic_security.ReadLocalOobExtendedData( empty_pb2.Empty()) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c7d) # Read Encryption key size message = hci_facade_pb2.ReadEncryptionKeySizeMessage( connection_handle=self.connection_handle,) self.device_under_test.hci_classic_security.ReadEncryptionKeySize( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x1408) self._disconnect_from_dut() def test_internal_hci_command(self): self._connect_from_dut() self.device_under_test.hci.TestInternalHciCommands(empty_pb2.Empty()) self.device_under_test.hci.TestInternalHciLeCommands(empty_pb2.Empty()) self._disconnect_from_dut() def test_classic_connection_management_command(self): self._connect_from_dut() self.device_under_test.hci.TestClassicConnectionManagementCommands( self.cert_address) self._disconnect_from_dut() system/gd/l2cap/classic/cert/simple_l2cap_test.py +33 −24 Original line number Diff line number Diff line Loading @@ -149,7 +149,7 @@ class SimpleL2capTest(GdBaseTestClass): ChannelRetransmissionFlowControlConfig( mode=self.retransmission_mode))) self.handle_connection_request = handle_connection_request self.handle_connection_response = handle_connection_response event_callback_stream.register_callback( self.handle_connection_response, matcher_fn=is_connection_response) Loading Loading @@ -204,7 +204,7 @@ class SimpleL2capTest(GdBaseTestClass): self.cert_device.l2cap.SetupLink( l2cap_cert_pb2.SetupLinkRequest(remote=self.dut_address)) event_asserts.assert_event_occurs( lambda log: log.HasField("link_up") and log.remote == self.dut_address lambda log: log.HasField("link_up") and log.link_up.remote == self.dut_address ) def _open_channel(self, event_asserts, scid=0x0101, psm=0x33): Loading @@ -213,7 +213,11 @@ class SimpleL2capTest(GdBaseTestClass): self.cert_device.l2cap.SendConnectionRequest( l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) event_asserts.assert_event_occurs( lambda log: is_configuration_response(log) and scid == log.scid) lambda log: is_configuration_response(log) and scid == log.configuration_response.scid ) # Allow some time for channel creation on facade side after configuration response is received. time.sleep(0.5) def test_connect(self): with EventCallbackStream( Loading Loading @@ -260,7 +264,7 @@ class SimpleL2capTest(GdBaseTestClass): l2cap_event_asserts.assert_event_occurs( lambda log : log.HasField("data_packet") and \ log.data_packet.channel == 2 and \ basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b"123") log.data_packet.payload == b"123") self.device_under_test.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket( Loading Loading @@ -461,7 +465,7 @@ class SimpleL2capTest(GdBaseTestClass): EXTENDED_FEATURES, signal_id=signal_id)) l2cap_event_asserts_alt.assert_event_occurs_at_most_times( l2cap_event_asserts_alt.assert_event_occurs_at_most( is_information_response, 1) expected_log_type = l2cap_cert_pb2.InformationRequestType.EXTENDED_FEATURES Loading Loading @@ -508,14 +512,15 @@ class SimpleL2capTest(GdBaseTestClass): self.cert_device.l2cap.SendSFrame( l2cap_cert_pb2.SFrame( channel=self.scid_dcid_map[scid], req_seq=3, s=0)) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc' lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b'abc' ) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc' lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b'abc' ) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc' lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b'abc' ) l2cap_event_asserts_alt.assert_event_occurs_at_most( lambda log: log.HasField("data_packet"), 3) Loading Loading @@ -559,7 +564,6 @@ class SimpleL2capTest(GdBaseTestClass): self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) signal_id = 3 Loading @@ -570,12 +574,6 @@ class SimpleL2capTest(GdBaseTestClass): self.device_under_test.l2cap.SetDynamicChannel( l2cap_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, retransmission_mode=mode)) self.cert_device.l2cap.SendConnectionRequest( l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) l2cap_log_stream.unregister_callback( self.handle_Connection_response, matcher_fn=is_configuration_response) def handle_connection_response(log): log = log.connection_response Loading @@ -593,10 +591,6 @@ class SimpleL2capTest(GdBaseTestClass): l2cap_log_stream.register_callback( handle_connection_response, matcher_fn=is_connection_response) l2cap_log_stream.unregister_callback( self.handle_configuration_request, matcher_fn=is_configuration_request) def handle_configuration_request(log): log = log.configuration_request if log.dcid not in self.scid_dcid_map: Loading @@ -617,6 +611,23 @@ class SimpleL2capTest(GdBaseTestClass): handle_configuration_request, matcher_fn=is_configuration_request) def handle_information_request(log): log = log.information_request self.cert_device.l2cap.SendInformationResponse( l2cap_cert_pb2.InformationResponse( type=log.type, signal_id=log.signal_id)) l2cap_log_stream.register_callback( handle_information_request, matcher_fn=is_information_request) self.cert_device.l2cap.SendConnectionRequest( l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) l2cap_event_asserts.assert_event_occurs( lambda log: is_configuration_response(log) and scid == log.configuration_response.scid ) time.sleep(0.5) self.cert_device.l2cap.SendIFrame( l2cap_cert_pb2.IFrame( channel=self.scid_dcid_map[scid], Loading @@ -630,9 +641,8 @@ class SimpleL2capTest(GdBaseTestClass): tx_seq=(self.tx_window - 1), sar=0)) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x05\x01' lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x05\x01' ) self.cert_device.l2cap.SendSFrame( l2cap_cert_pb2.SFrame( channel=self.scid_dcid_map[scid], req_seq=0, p=1, s=0)) Loading Loading @@ -674,9 +684,8 @@ class SimpleL2capTest(GdBaseTestClass): l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) l2cap_event_asserts.assert_event_occurs( lambda log: is_configuration_request(log) and \ lambda log: is_configuration_response(log) and \ log.configuration_response.scid == scid and\ log.configuration_response.result == log.l2cap_cert_pb2.ConfigurationResult.SUCCESS and \ log.HasField("retransmission_config") and \ log.configuration_response.result == l2cap_cert_pb2.ConfigurationResult.SUCCESS and \ log.configuration_response.retransmission_config.mode == l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM) Loading
system/gd/cert/cert_testcases +1 −2 Original line number Diff line number Diff line SimpleHciTest SimpleL2capTest
system/gd/cert/run_cert.sh +0 −2 Original line number Diff line number Diff line Loading @@ -5,5 +5,3 @@ # Run normal facade to cert API tests PYTHONPATH=$PYTHONPATH:$ANDROID_BUILD_TOP/out/host/linux-x86/lib64 python3.8 `which act.py` -c $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/host_only_config.json -tf $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/cert_testcases -tp $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd # Run new facade to facade API tests PYTHONPATH=$PYTHONPATH:$ANDROID_BUILD_TOP/out/host/linux-x86/lib64 python3.8 `which act.py` -c $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/host_only_config_facade_only.json -tf $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/cert/cert_testcases_facade_only -tp $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd No newline at end of file
system/gd/hal/hci_hal_host_rootcanal.cc +1 −1 Original line number Diff line number Diff line Loading @@ -207,7 +207,7 @@ class HciHalHostRootcanal : public HciHal { RUN_NO_INTR(received_size = recv(sock_fd_, buf, kH4HeaderSize, 0)); ASSERT_LOG(received_size != -1, "Can't receive from socket: %s", strerror(errno)); if (received_size == 0) { LOG_WARN("Can't read H4 header."); LOG_WARN("Can't read H4 header. EOF received"); raise(SIGINT); return; } Loading
system/gd/hci/cert/simple_hci_test.pydeleted 100644 → 0 +0 −357 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2019 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import logging import os import sys sys.path.append(os.environ['ANDROID_BUILD_TOP'] + '/packages/modules/Bluetooth/system/gd') from cert.gd_base_test import GdBaseTestClass from cert.event_callback_stream import EventCallbackStream from cert.event_asserts import EventAsserts from cert import rootservice_pb2 as cert_rootservice_pb2 from facade import common_pb2 from facade import rootservice_pb2 as facade_rootservice_pb2 from google.protobuf import empty_pb2 from hci import facade_pb2 as hci_facade_pb2 from hci.cert import api_pb2 as hci_cert_pb2 from hci.cert import api_pb2_grpc as hci_cert_pb2_grpc class SimpleHciTest(GdBaseTestClass): def setup_test(self): self.device_under_test = self.gd_devices[0] self.cert_device = self.gd_cert_devices[0] self.device_under_test.rootservice.StartStack( facade_rootservice_pb2.StartStackRequest( module_under_test=facade_rootservice_pb2.BluetoothModule.Value( 'HCI'),)) self.cert_device.rootservice.StartStack( cert_rootservice_pb2.StartStackRequest( module_to_test=cert_rootservice_pb2.BluetoothModule.Value( 'HCI'),)) self.device_under_test.wait_channel_ready() self.cert_device.wait_channel_ready() self.device_under_test.hci.SetPageScanMode( hci_facade_pb2.PageScanMode(enabled=True)) self.cert_device.hci.SetPageScanMode( hci_cert_pb2.PageScanMode(enabled=True)) dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress( empty_pb2.Empty()).address self.device_under_test.address = dut_address cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress( empty_pb2.Empty()).address self.cert_device.address = cert_address self.dut_address = common_pb2.BluetoothAddress( address=self.device_under_test.address) self.cert_address = common_pb2.BluetoothAddress( address=self.cert_device.address) def teardown_test(self): self.device_under_test.rootservice.StopStack( facade_rootservice_pb2.StopStackRequest()) self.cert_device.rootservice.StopStack( cert_rootservice_pb2.StopStackRequest()) def test_none_event(self): with EventCallbackStream( self.device_under_test.hci.FetchConnectionComplete( empty_pb2.Empty())) as connection_complete_stream: connection_complete_asserts = EventAsserts( connection_complete_stream) connection_complete_asserts.assert_none() def _connect_from_dut(self): logging.debug("_connect_from_dut") policy = hci_cert_pb2.IncomingConnectionPolicy( remote=self.dut_address, accepted=True) self.cert_device.hci.SetIncomingConnectionPolicy(policy) with EventCallbackStream( self.device_under_test.hci.FetchConnectionComplete( empty_pb2.Empty())) as dut_connection_complete_stream: dut_connection_complete_asserts = EventAsserts( dut_connection_complete_stream) self.device_under_test.hci.Connect(self.cert_address) dut_connection_complete_asserts.assert_event_occurs( lambda event: self._get_handle(event)) def _disconnect_from_dut(self): logging.debug("_disconnect_from_dut") with EventCallbackStream( self.device_under_test.hci.FetchDisconnection( empty_pb2.Empty())) as dut_disconnection_stream: dut_disconnection_asserts = EventAsserts(dut_disconnection_stream) self.device_under_test.hci.Disconnect(self.cert_address) dut_disconnection_asserts.assert_event_occurs( lambda event: event.remote.address == self.cert_device.address) def _get_handle(self, event): if event.remote.address == self.cert_device.address: self.connection_handle = event.connection_handle return True return False def test_connect_disconnect_send_acl(self): self._connect_from_dut() with EventCallbackStream( self.cert_device.hci.FetchAclData( empty_pb2.Empty())) as cert_acl_stream: cert_acl_asserts = EventAsserts(cert_acl_stream) acl_data = hci_facade_pb2.AclData( remote=self.cert_address, payload=b'123') self.device_under_test.hci.SendAclData(acl_data) self.device_under_test.hci.SendAclData(acl_data) self.device_under_test.hci.SendAclData(acl_data) cert_acl_asserts.assert_event_occurs( lambda packet: b'123' in packet.payload and packet.remote == self.dut_address ) self._disconnect_from_dut() def test_connect_disconnect_receive_acl(self): self._connect_from_dut() with EventCallbackStream( self.device_under_test.hci.FetchAclData( empty_pb2.Empty())) as dut_acl_stream: dut_acl_asserts = EventAsserts(dut_acl_stream) acl_data = hci_cert_pb2.AclData( remote=self.dut_address, payload=b'123') self.cert_device.hci.SendAclData(acl_data) self.cert_device.hci.SendAclData(acl_data) self.cert_device.hci.SendAclData(acl_data) dut_acl_asserts.assert_event_occurs( lambda packet: b'123' in packet.payload and packet.remote == self.cert_address ) self._disconnect_from_dut() def test_reject_connection_request(self): with EventCallbackStream( self.device_under_test.hci.FetchConnectionFailed( empty_pb2.Empty())) as dut_connection_failed_stream: dut_connection_failed_asserts = EventAsserts( dut_connection_failed_stream) self.device_under_test.hci.Connect(self.cert_address) dut_connection_failed_asserts.assert_event_occurs( lambda event: event.remote == self.cert_address) def test_send_classic_security_command(self): self._connect_from_dut() with EventCallbackStream( self.device_under_test.hci_classic_security. FetchCommandCompleteEvent( empty_pb2.Empty())) as dut_command_complete_stream: dut_command_complete_asserts = EventAsserts( dut_command_complete_stream) self.device_under_test.hci.AuthenticationRequested( self.cert_address) # Link request self.device_under_test.hci_classic_security.LinkKeyRequestNegativeReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x040c) # Pin code request message = hci_facade_pb2.PinCodeRequestReplyMessage( remote=self.cert_address, len=4, pin_code=bytes("1234", encoding="ASCII")) self.device_under_test.hci_classic_security.PinCodeRequestReply( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x040d) self.device_under_test.hci_classic_security.PinCodeRequestNegativeReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x040e) # IO capability request message = hci_facade_pb2.IoCapabilityRequestReplyMessage( remote=self.cert_address, io_capability=0, oob_present=0, authentication_requirements=0) self.device_under_test.hci_classic_security.IoCapabilityRequestReply( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x042b) # message = hci_facade_pb2.IoCapabilityRequestNegativeReplyMessage( # remote=self.cert_address, # reason=1 # ) # # link_layer_controller.cc(447)] Check failed: security_manager_.GetAuthenticationAddress() == peer # self.device_under_test.hci_classic_security.IoCapabilityRequestNegativeReply(message) # User confirm request self.device_under_test.hci_classic_security.UserConfirmationRequestReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x042c) message = hci_facade_pb2.LinkKeyRequestReplyMessage( remote=self.cert_address, link_key=bytes( "4C68384139F574D836BCF34E9DFB01BF", encoding="ASCII")) self.device_under_test.hci_classic_security.LinkKeyRequestReply( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x040b) self.device_under_test.hci_classic_security.UserConfirmationRequestNegativeReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x042d) # User passkey request message = hci_facade_pb2.UserPasskeyRequestReplyMessage( remote=self.cert_address, passkey=999999, ) self.device_under_test.hci_classic_security.UserPasskeyRequestReply( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x042e) self.device_under_test.hci_classic_security.UserPasskeyRequestNegativeReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x042f) # Remote OOB data request message = hci_facade_pb2.RemoteOobDataRequestReplyMessage( remote=self.cert_address, c= b'\x19\x20\x21\x22\x23\x24\x25\x26\x19\x20\x21\x22\x23\x24\x25\x26', r= b'\x30\x31\x32\x33\x34\x35\x36\x37\x30\x31\x32\x33\x34\x35\x36\x37', ) self.device_under_test.hci_classic_security.RemoteOobDataRequestReply( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0430) self.device_under_test.hci_classic_security.RemoteOobDataRequestNegativeReply( self.cert_address) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0433) # Read/Write/Delete link key message = hci_facade_pb2.ReadStoredLinkKeyMessage( remote=self.cert_address, read_all_flag=0, ) self.device_under_test.hci_classic_security.ReadStoredLinkKey( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c0d) message = hci_facade_pb2.WriteStoredLinkKeyMessage( num_keys_to_write=1, remote=self.cert_address, link_keys=bytes( "4C68384139F574D836BCF34E9DFB01BF", encoding="ASCII"), ) self.device_under_test.hci_classic_security.WriteStoredLinkKey( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c11) message = hci_facade_pb2.DeleteStoredLinkKeyMessage( remote=self.cert_address, delete_all_flag=0, ) self.device_under_test.hci_classic_security.DeleteStoredLinkKey( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c12) # Refresh Encryption Key message = hci_facade_pb2.RefreshEncryptionKeyMessage( connection_handle=self.connection_handle,) self.device_under_test.hci_classic_security.RefreshEncryptionKey( message) # Read/Write Simple Pairing Mode self.device_under_test.hci_classic_security.ReadSimplePairingMode( empty_pb2.Empty()) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c55) message = hci_facade_pb2.WriteSimplePairingModeMessage( simple_pairing_mode=1,) self.device_under_test.hci_classic_security.WriteSimplePairingMode( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c56) # Read local oob data self.device_under_test.hci_classic_security.ReadLocalOobData( empty_pb2.Empty()) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c57) # Send keypress notification message = hci_facade_pb2.SendKeypressNotificationMessage( remote=self.cert_address, notification_type=1, ) self.device_under_test.hci_classic_security.SendKeypressNotification( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c60) # Read local oob extended data self.device_under_test.hci_classic_security.ReadLocalOobExtendedData( empty_pb2.Empty()) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x0c7d) # Read Encryption key size message = hci_facade_pb2.ReadEncryptionKeySizeMessage( connection_handle=self.connection_handle,) self.device_under_test.hci_classic_security.ReadEncryptionKeySize( message) dut_command_complete_asserts.assert_event_occurs( lambda event: event.command_opcode == 0x1408) self._disconnect_from_dut() def test_internal_hci_command(self): self._connect_from_dut() self.device_under_test.hci.TestInternalHciCommands(empty_pb2.Empty()) self.device_under_test.hci.TestInternalHciLeCommands(empty_pb2.Empty()) self._disconnect_from_dut() def test_classic_connection_management_command(self): self._connect_from_dut() self.device_under_test.hci.TestClassicConnectionManagementCommands( self.cert_address) self._disconnect_from_dut()
system/gd/l2cap/classic/cert/simple_l2cap_test.py +33 −24 Original line number Diff line number Diff line Loading @@ -149,7 +149,7 @@ class SimpleL2capTest(GdBaseTestClass): ChannelRetransmissionFlowControlConfig( mode=self.retransmission_mode))) self.handle_connection_request = handle_connection_request self.handle_connection_response = handle_connection_response event_callback_stream.register_callback( self.handle_connection_response, matcher_fn=is_connection_response) Loading Loading @@ -204,7 +204,7 @@ class SimpleL2capTest(GdBaseTestClass): self.cert_device.l2cap.SetupLink( l2cap_cert_pb2.SetupLinkRequest(remote=self.dut_address)) event_asserts.assert_event_occurs( lambda log: log.HasField("link_up") and log.remote == self.dut_address lambda log: log.HasField("link_up") and log.link_up.remote == self.dut_address ) def _open_channel(self, event_asserts, scid=0x0101, psm=0x33): Loading @@ -213,7 +213,11 @@ class SimpleL2capTest(GdBaseTestClass): self.cert_device.l2cap.SendConnectionRequest( l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) event_asserts.assert_event_occurs( lambda log: is_configuration_response(log) and scid == log.scid) lambda log: is_configuration_response(log) and scid == log.configuration_response.scid ) # Allow some time for channel creation on facade side after configuration response is received. time.sleep(0.5) def test_connect(self): with EventCallbackStream( Loading Loading @@ -260,7 +264,7 @@ class SimpleL2capTest(GdBaseTestClass): l2cap_event_asserts.assert_event_occurs( lambda log : log.HasField("data_packet") and \ log.data_packet.channel == 2 and \ basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b"123") log.data_packet.payload == b"123") self.device_under_test.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket( Loading Loading @@ -461,7 +465,7 @@ class SimpleL2capTest(GdBaseTestClass): EXTENDED_FEATURES, signal_id=signal_id)) l2cap_event_asserts_alt.assert_event_occurs_at_most_times( l2cap_event_asserts_alt.assert_event_occurs_at_most( is_information_response, 1) expected_log_type = l2cap_cert_pb2.InformationRequestType.EXTENDED_FEATURES Loading Loading @@ -508,14 +512,15 @@ class SimpleL2capTest(GdBaseTestClass): self.cert_device.l2cap.SendSFrame( l2cap_cert_pb2.SFrame( channel=self.scid_dcid_map[scid], req_seq=3, s=0)) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc' lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b'abc' ) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc' lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b'abc' ) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc' lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b'abc' ) l2cap_event_asserts_alt.assert_event_occurs_at_most( lambda log: log.HasField("data_packet"), 3) Loading Loading @@ -559,7 +564,6 @@ class SimpleL2capTest(GdBaseTestClass): self.cert_device.l2cap.FetchL2capLog( empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) signal_id = 3 Loading @@ -570,12 +574,6 @@ class SimpleL2capTest(GdBaseTestClass): self.device_under_test.l2cap.SetDynamicChannel( l2cap_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, retransmission_mode=mode)) self.cert_device.l2cap.SendConnectionRequest( l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) l2cap_log_stream.unregister_callback( self.handle_Connection_response, matcher_fn=is_configuration_response) def handle_connection_response(log): log = log.connection_response Loading @@ -593,10 +591,6 @@ class SimpleL2capTest(GdBaseTestClass): l2cap_log_stream.register_callback( handle_connection_response, matcher_fn=is_connection_response) l2cap_log_stream.unregister_callback( self.handle_configuration_request, matcher_fn=is_configuration_request) def handle_configuration_request(log): log = log.configuration_request if log.dcid not in self.scid_dcid_map: Loading @@ -617,6 +611,23 @@ class SimpleL2capTest(GdBaseTestClass): handle_configuration_request, matcher_fn=is_configuration_request) def handle_information_request(log): log = log.information_request self.cert_device.l2cap.SendInformationResponse( l2cap_cert_pb2.InformationResponse( type=log.type, signal_id=log.signal_id)) l2cap_log_stream.register_callback( handle_information_request, matcher_fn=is_information_request) self.cert_device.l2cap.SendConnectionRequest( l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) l2cap_event_asserts.assert_event_occurs( lambda log: is_configuration_response(log) and scid == log.configuration_response.scid ) time.sleep(0.5) self.cert_device.l2cap.SendIFrame( l2cap_cert_pb2.IFrame( channel=self.scid_dcid_map[scid], Loading @@ -630,9 +641,8 @@ class SimpleL2capTest(GdBaseTestClass): tx_seq=(self.tx_window - 1), sar=0)) l2cap_event_asserts.assert_event_occurs( lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x05\x01' lambda log: print(log) or log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x05\x01' ) self.cert_device.l2cap.SendSFrame( l2cap_cert_pb2.SFrame( channel=self.scid_dcid_map[scid], req_seq=0, p=1, s=0)) Loading Loading @@ -674,9 +684,8 @@ class SimpleL2capTest(GdBaseTestClass): l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) l2cap_event_asserts.assert_event_occurs( lambda log: is_configuration_request(log) and \ lambda log: is_configuration_response(log) and \ log.configuration_response.scid == scid and\ log.configuration_response.result == log.l2cap_cert_pb2.ConfigurationResult.SUCCESS and \ log.HasField("retransmission_config") and \ log.configuration_response.result == l2cap_cert_pb2.ConfigurationResult.SUCCESS and \ log.configuration_response.retransmission_config.mode == l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM)