Loading system/blueberry/tests/gd/cert/gd_device.py +10 −9 Original line number Diff line number Diff line Loading @@ -557,29 +557,30 @@ class GdAndroidDevice(GdDeviceBase): logging.error("logcat_process %s_%s stopped with code: %d" % (self.label, self.serial_number, return_code)) self.logcat_logger.stop() self.cleanup_port_forwarding() self.pull_logs(self.log_path_base) def pull_logs(self, base_dir): try: self.adb.pull([ "/data/misc/bluetooth/logs/btsnoop_hci.log", str(os.path.join(self.log_path_base, "%s_btsnoop_hci.log" % self.label)) str(os.path.join(base_dir, "%s_btsnoop_hci.log" % self.label)) ]) except AdbError as error: # Some tests have no snoop logs, and that's OK if ADB_FILE_NOT_EXIST_ERROR not in str(error): logging.error(PULL_LOG_FILE_ERROR_MSG_PREFIX + str(error)) try: self.adb.pull([ "/data/misc/bluedroid/bt_config.conf", str(os.path.join(self.log_path_base, "%s_bt_config.conf" % self.label)) ]) self.adb.pull( ["/data/misc/bluedroid/bt_config.conf", str(os.path.join(base_dir, "%s_bt_config.conf" % self.label))]) except AdbError as error: # Some tests have no config file, and that's OK if ADB_FILE_NOT_EXIST_ERROR not in str(error): logging.error(PULL_LOG_FILE_ERROR_MSG_PREFIX + str(error)) try: self.adb.pull([ "/data/misc/bluedroid/bt_config.bak", str(os.path.join(self.log_path_base, "%s_bt_config.bak" % self.label)) ]) self.adb.pull( ["/data/misc/bluedroid/bt_config.bak", str(os.path.join(base_dir, "%s_bt_config.bak" % self.label))]) except AdbError as error: # Some tests have no config.bak file, and that's OK if ADB_FILE_NOT_EXIST_ERROR not in str(error): Loading system/blueberry/tests/gd_sl4a/gd_sl4a_device_config.yaml 0 → 100644 +35 −0 Original line number Diff line number Diff line _description: Bluetooth cert testing with SL4A TestBeds: - Name: AndroidDeviceCert Controllers: GdDevice: - grpc_port: '8898' grpc_root_server_port: '8896' signal_port: '8894' label: cert serial_number: 'CERT' name: Cert Device cmd: - "adb" - "-s" - "$(serial_number)" - "shell" - "ASAN_OPTIONS=detect_container_overflow=0" - "/system/bin/bluetooth_stack_with_facade" - "--grpc-port=$(grpc_port)" - "--root-server-port=$(grpc_root_server_port)" - "--btsnoop=/data/misc/bluetooth/logs/btsnoop_hci.log" - "--btsnooz=/data/misc/bluetooth/logs/btsnooz_hci.log" - "--btconfig=/data/misc/bluedroid/bt_config.conf" - "--signal-port=$(signal_port)" AndroidDevice: - label: dut # Preferred client port number on the PC host side for SL4A client_port: '8895' # Preferred server port number forwarded from Android to the host PC # via adb for SL4A connections forwarded_port: '8899' # Preferred server port used by SL4A on Android device server_port: '8899' serial: 'DUT' logpath: "/tmp/logs" No newline at end of file system/blueberry/tests/gd_sl4a/gd_sl4a_test_runner.py 0 → 100644 +62 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2021 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from blueberry.tests.gd_sl4a.hci.le_advanced_scanning_test import LeAdvancedScanningTest from mobly import suite_runner import argparse ALL_TESTS = [LeAdvancedScanningTest] def main(): """ Local test runner that allows to specify list of tests to and customize test config file location """ parser = argparse.ArgumentParser(description="Run local GD SL4A tests.") parser.add_argument( '-c', '--config', type=str, required=True, metavar='<PATH>', help='Path to the test configuration file.') parser.add_argument( '--tests', '--test_case', nargs='+', type=str, metavar='[ClassA[.test_a] ClassB[.test_b] ...]', help='A list of test classes and optional tests to execute.') parser.add_argument("--all_tests", "-A", type=bool, dest="all_tests", default=False, nargs="?") parser.add_argument("--presubmit", type=bool, dest="presubmit", default=False, nargs="?") parser.add_argument("--postsubmit", type=bool, dest="postsubmit", default=False, nargs="?") args = parser.parse_args() test_list = ALL_TESTS if args.all_tests: test_list = ALL_TESTS elif args.presubmit: test_list = ALL_TESTS elif args.postsubmit: test_list = ALL_TESTS # Do not pass this layer's cmd line argument to next layer argv = ["--config", args.config] if args.tests: argv.append("--tests") for test in args.tests: argv.append(test) suite_runner.run_suite(test_list, argv=argv) if __name__ == "__main__": main() system/blueberry/tests/gd_sl4a/hci/le_advanced_scanning_test.py 0 → 100644 +249 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2021 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import queue import logging from google.protobuf import empty_pb2 as empty_proto from bluetooth_packets_python3 import hci_packets from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes, ble_address_types, scan_result, ble_scan_settings_phys from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects from blueberry.tests.gd_sl4a.lib.gd_sl4a_base_test import GdSl4aBaseTestClass from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade from blueberry.facade import common_pb2 as common class LeAdvancedScanningTest(GdSl4aBaseTestClass): def setup_class(self): super().setup_class(cert_module='HCI_INTERFACES') self.default_timeout = 10 # seconds def setup_test(self): super().setup_test() def teardown_test(self): super().teardown_test() def set_cert_privacy_policy_with_random_address(self, random_address): private_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, address_with_type=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(random_address, encoding='utf8')), type=common.RANDOM_DEVICE_ADDRESS)) self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) def set_cert_privacy_policy_with_public_address(self): public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address private_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS, address_with_type=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS)) self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) # Bluetooth MAC address must be upper case return public_address_bytes.decode('utf-8').upper() def test_scan_filter_device_name_legacy_pdu(self): # Use public address on cert side logging.info("Setting public address") DEVICE_NAME = 'Im_The_CERT!' public_address = self.set_cert_privacy_policy_with_public_address() logging.info("Set public address") # Setup cert side to advertise gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES, tx_power=20) request = le_advertising_facade.CreateAdvertiserRequest(config=config) logging.info("Creating advertiser") create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request) logging.info("Created advertiser") # Setup SL4A DUT side to scan logging.info("Start scanning with public address %s" % public_address) self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) expected_event_name = scan_result.format(scan_callback) # Setup SL4A DUT filter self.dut.sl4a.bleSetScanFilterDeviceName(DEVICE_NAME) self.dut.sl4a.bleBuildScanFilter(filter_list) # Start scanning on SL4A DUT side self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) logging.info("Started scanning") try: # Verify if there is scan result event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout) except queue.Empty as error: logging.error("Could not find initial advertisement.") return False # Print out scan result mac_address = event_info['data']['Result']['deviceInfo']['address'] logging.info("Filter advertisement with address {}".format(mac_address)) # Stop scanning logging.info("Stop scanning") self.dut.sl4a.bleStopBleScan(scan_callback) logging.info("Stopped scanning") # Stop advertising logging.info("Stop advertising") remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) logging.info("Stopped advertising") return True def test_scan_filter_device_random_address_legacy_pdu(self): # Use random address on cert side logging.info("Setting random address") RANDOM_ADDRESS = 'D0:05:04:03:02:01' DEVICE_NAME = 'Im_The_CERT!' self.set_cert_privacy_policy_with_random_address(RANDOM_ADDRESS) logging.info("Set random address") # Setup cert side to advertise gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) request = le_advertising_facade.CreateAdvertiserRequest(config=config) logging.info("Creating advertiser") create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request) logging.info("Created advertiser") # Setup SL4A DUT side to scan addr_type = ble_address_types["random"] logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d" % (RANDOM_ADDRESS, addr_type)) self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) expected_event_name = scan_result.format(scan_callback) # Setup SL4A DUT filter self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(RANDOM_ADDRESS, int(addr_type)) self.dut.sl4a.bleBuildScanFilter(filter_list) # Start scanning on SL4A DUT side self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) logging.info("Started scanning") try: # Verify if there is scan result event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout) except queue.Empty as error: logging.error("Could not find initial advertisement.") return False # Print out scan result mac_address = event_info['data']['Result']['deviceInfo']['address'] logging.info("Filter advertisement with address {}".format(mac_address)) # Stop scanning logging.info("Stop scanning") self.dut.sl4a.bleStopBleScan(scan_callback) logging.info("Stopped scanning") # Stop advertising logging.info("Stop advertising") remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) logging.info("Stopped advertising") return True def test_scan_filter_device_public_address_extended_pdu(self): # Use public address on cert side logging.info("Setting public address") DEVICE_NAME = 'Im_The_CERT!' public_address = self.set_cert_privacy_policy_with_public_address() logging.info("Set public address") # Setup cert side to advertise gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) extended_config = le_advertising_facade.ExtendedAdvertisingConfig( advertising_config=config, secondary_advertising_phy=ble_scan_settings_phys["1m"]) request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) logging.info("Creating advertiser") create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) logging.info("Created advertiser") # Setup SL4A DUT side to scan addr_type = ble_address_types["public"] logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d" % (public_address, addr_type)) self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.dut.sl4a.bleSetScanSettingsLegacy(False) filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) expected_event_name = scan_result.format(scan_callback) # Setup SL4A DUT filter self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(public_address, int(addr_type)) self.dut.sl4a.bleBuildScanFilter(filter_list) # Start scanning on SL4A DUT side self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) logging.info("Started scanning") try: # Verify if there is scan result event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout) except queue.Empty as error: logging.error("Could not find initial advertisement.") return False # Print out scan result mac_address = event_info['data']['Result']['deviceInfo']['address'] logging.info("Filter advertisement with address {}".format(mac_address)) # Stop scanning logging.info("Stop scanning") self.dut.sl4a.bleStopBleScan(scan_callback) logging.info("Stopped scanning") # Stop advertising logging.info("Stop advertising") remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) logging.info("Stopped advertising") return True Loading
system/blueberry/tests/gd/cert/gd_device.py +10 −9 Original line number Diff line number Diff line Loading @@ -557,29 +557,30 @@ class GdAndroidDevice(GdDeviceBase): logging.error("logcat_process %s_%s stopped with code: %d" % (self.label, self.serial_number, return_code)) self.logcat_logger.stop() self.cleanup_port_forwarding() self.pull_logs(self.log_path_base) def pull_logs(self, base_dir): try: self.adb.pull([ "/data/misc/bluetooth/logs/btsnoop_hci.log", str(os.path.join(self.log_path_base, "%s_btsnoop_hci.log" % self.label)) str(os.path.join(base_dir, "%s_btsnoop_hci.log" % self.label)) ]) except AdbError as error: # Some tests have no snoop logs, and that's OK if ADB_FILE_NOT_EXIST_ERROR not in str(error): logging.error(PULL_LOG_FILE_ERROR_MSG_PREFIX + str(error)) try: self.adb.pull([ "/data/misc/bluedroid/bt_config.conf", str(os.path.join(self.log_path_base, "%s_bt_config.conf" % self.label)) ]) self.adb.pull( ["/data/misc/bluedroid/bt_config.conf", str(os.path.join(base_dir, "%s_bt_config.conf" % self.label))]) except AdbError as error: # Some tests have no config file, and that's OK if ADB_FILE_NOT_EXIST_ERROR not in str(error): logging.error(PULL_LOG_FILE_ERROR_MSG_PREFIX + str(error)) try: self.adb.pull([ "/data/misc/bluedroid/bt_config.bak", str(os.path.join(self.log_path_base, "%s_bt_config.bak" % self.label)) ]) self.adb.pull( ["/data/misc/bluedroid/bt_config.bak", str(os.path.join(base_dir, "%s_bt_config.bak" % self.label))]) except AdbError as error: # Some tests have no config.bak file, and that's OK if ADB_FILE_NOT_EXIST_ERROR not in str(error): Loading
system/blueberry/tests/gd_sl4a/gd_sl4a_device_config.yaml 0 → 100644 +35 −0 Original line number Diff line number Diff line _description: Bluetooth cert testing with SL4A TestBeds: - Name: AndroidDeviceCert Controllers: GdDevice: - grpc_port: '8898' grpc_root_server_port: '8896' signal_port: '8894' label: cert serial_number: 'CERT' name: Cert Device cmd: - "adb" - "-s" - "$(serial_number)" - "shell" - "ASAN_OPTIONS=detect_container_overflow=0" - "/system/bin/bluetooth_stack_with_facade" - "--grpc-port=$(grpc_port)" - "--root-server-port=$(grpc_root_server_port)" - "--btsnoop=/data/misc/bluetooth/logs/btsnoop_hci.log" - "--btsnooz=/data/misc/bluetooth/logs/btsnooz_hci.log" - "--btconfig=/data/misc/bluedroid/bt_config.conf" - "--signal-port=$(signal_port)" AndroidDevice: - label: dut # Preferred client port number on the PC host side for SL4A client_port: '8895' # Preferred server port number forwarded from Android to the host PC # via adb for SL4A connections forwarded_port: '8899' # Preferred server port used by SL4A on Android device server_port: '8899' serial: 'DUT' logpath: "/tmp/logs" No newline at end of file
system/blueberry/tests/gd_sl4a/gd_sl4a_test_runner.py 0 → 100644 +62 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2021 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from blueberry.tests.gd_sl4a.hci.le_advanced_scanning_test import LeAdvancedScanningTest from mobly import suite_runner import argparse ALL_TESTS = [LeAdvancedScanningTest] def main(): """ Local test runner that allows to specify list of tests to and customize test config file location """ parser = argparse.ArgumentParser(description="Run local GD SL4A tests.") parser.add_argument( '-c', '--config', type=str, required=True, metavar='<PATH>', help='Path to the test configuration file.') parser.add_argument( '--tests', '--test_case', nargs='+', type=str, metavar='[ClassA[.test_a] ClassB[.test_b] ...]', help='A list of test classes and optional tests to execute.') parser.add_argument("--all_tests", "-A", type=bool, dest="all_tests", default=False, nargs="?") parser.add_argument("--presubmit", type=bool, dest="presubmit", default=False, nargs="?") parser.add_argument("--postsubmit", type=bool, dest="postsubmit", default=False, nargs="?") args = parser.parse_args() test_list = ALL_TESTS if args.all_tests: test_list = ALL_TESTS elif args.presubmit: test_list = ALL_TESTS elif args.postsubmit: test_list = ALL_TESTS # Do not pass this layer's cmd line argument to next layer argv = ["--config", args.config] if args.tests: argv.append("--tests") for test in args.tests: argv.append(test) suite_runner.run_suite(test_list, argv=argv) if __name__ == "__main__": main()
system/blueberry/tests/gd_sl4a/hci/le_advanced_scanning_test.py 0 → 100644 +249 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2021 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import queue import logging from google.protobuf import empty_pb2 as empty_proto from bluetooth_packets_python3 import hci_packets from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes, ble_address_types, scan_result, ble_scan_settings_phys from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects from blueberry.tests.gd_sl4a.lib.gd_sl4a_base_test import GdSl4aBaseTestClass from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade from blueberry.facade import common_pb2 as common class LeAdvancedScanningTest(GdSl4aBaseTestClass): def setup_class(self): super().setup_class(cert_module='HCI_INTERFACES') self.default_timeout = 10 # seconds def setup_test(self): super().setup_test() def teardown_test(self): super().teardown_test() def set_cert_privacy_policy_with_random_address(self, random_address): private_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, address_with_type=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(random_address, encoding='utf8')), type=common.RANDOM_DEVICE_ADDRESS)) self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) def set_cert_privacy_policy_with_public_address(self): public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address private_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS, address_with_type=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS)) self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) # Bluetooth MAC address must be upper case return public_address_bytes.decode('utf-8').upper() def test_scan_filter_device_name_legacy_pdu(self): # Use public address on cert side logging.info("Setting public address") DEVICE_NAME = 'Im_The_CERT!' public_address = self.set_cert_privacy_policy_with_public_address() logging.info("Set public address") # Setup cert side to advertise gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES, tx_power=20) request = le_advertising_facade.CreateAdvertiserRequest(config=config) logging.info("Creating advertiser") create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request) logging.info("Created advertiser") # Setup SL4A DUT side to scan logging.info("Start scanning with public address %s" % public_address) self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) expected_event_name = scan_result.format(scan_callback) # Setup SL4A DUT filter self.dut.sl4a.bleSetScanFilterDeviceName(DEVICE_NAME) self.dut.sl4a.bleBuildScanFilter(filter_list) # Start scanning on SL4A DUT side self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) logging.info("Started scanning") try: # Verify if there is scan result event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout) except queue.Empty as error: logging.error("Could not find initial advertisement.") return False # Print out scan result mac_address = event_info['data']['Result']['deviceInfo']['address'] logging.info("Filter advertisement with address {}".format(mac_address)) # Stop scanning logging.info("Stop scanning") self.dut.sl4a.bleStopBleScan(scan_callback) logging.info("Stopped scanning") # Stop advertising logging.info("Stop advertising") remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) logging.info("Stopped advertising") return True def test_scan_filter_device_random_address_legacy_pdu(self): # Use random address on cert side logging.info("Setting random address") RANDOM_ADDRESS = 'D0:05:04:03:02:01' DEVICE_NAME = 'Im_The_CERT!' self.set_cert_privacy_policy_with_random_address(RANDOM_ADDRESS) logging.info("Set random address") # Setup cert side to advertise gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) request = le_advertising_facade.CreateAdvertiserRequest(config=config) logging.info("Creating advertiser") create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request) logging.info("Created advertiser") # Setup SL4A DUT side to scan addr_type = ble_address_types["random"] logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d" % (RANDOM_ADDRESS, addr_type)) self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) expected_event_name = scan_result.format(scan_callback) # Setup SL4A DUT filter self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(RANDOM_ADDRESS, int(addr_type)) self.dut.sl4a.bleBuildScanFilter(filter_list) # Start scanning on SL4A DUT side self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) logging.info("Started scanning") try: # Verify if there is scan result event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout) except queue.Empty as error: logging.error("Could not find initial advertisement.") return False # Print out scan result mac_address = event_info['data']['Result']['deviceInfo']['address'] logging.info("Filter advertisement with address {}".format(mac_address)) # Stop scanning logging.info("Stop scanning") self.dut.sl4a.bleStopBleScan(scan_callback) logging.info("Stopped scanning") # Stop advertising logging.info("Stop advertising") remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) logging.info("Stopped advertising") return True def test_scan_filter_device_public_address_extended_pdu(self): # Use public address on cert side logging.info("Setting public address") DEVICE_NAME = 'Im_The_CERT!' public_address = self.set_cert_privacy_policy_with_public_address() logging.info("Set public address") # Setup cert side to advertise gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) extended_config = le_advertising_facade.ExtendedAdvertisingConfig( advertising_config=config, secondary_advertising_phy=ble_scan_settings_phys["1m"]) request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) logging.info("Creating advertiser") create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) logging.info("Created advertiser") # Setup SL4A DUT side to scan addr_type = ble_address_types["public"] logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d" % (public_address, addr_type)) self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.dut.sl4a.bleSetScanSettingsLegacy(False) filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) expected_event_name = scan_result.format(scan_callback) # Setup SL4A DUT filter self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(public_address, int(addr_type)) self.dut.sl4a.bleBuildScanFilter(filter_list) # Start scanning on SL4A DUT side self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) logging.info("Started scanning") try: # Verify if there is scan result event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout) except queue.Empty as error: logging.error("Could not find initial advertisement.") return False # Print out scan result mac_address = event_info['data']['Result']['deviceInfo']['address'] logging.info("Filter advertisement with address {}".format(mac_address)) # Stop scanning logging.info("Stop scanning") self.dut.sl4a.bleStopBleScan(scan_callback) logging.info("Stopped scanning") # Stop advertising logging.info("Stop advertising") remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id) self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) logging.info("Stopped advertising") return True