Loading android/app/src/com/android/bluetooth/btservice/AdapterService.java +52 −7 Original line number Diff line number Diff line Loading @@ -3679,9 +3679,17 @@ public class AdapterService extends Service { receiver.propagateException(e); } } @RequiresPermission(allOf = { android.Manifest.permission.BLUETOOTH_CONNECT, android.Manifest.permission.BLUETOOTH_PRIVILEGED, }) private boolean allowLowLatencyAudio(boolean allowed, BluetoothDevice device) { AdapterService service = getService(); if (service == null) { if (service == null || !Utils.checkCallerIsSystemOrActiveUser(TAG) || !Utils.checkConnectPermissionForDataDelivery( service, Utils.getCallingAttributionSource(service), "AdapterService allowLowLatencyAudio")) { return false; } enforceBluetoothPrivilegedPermission(service); Loading @@ -3697,12 +3705,24 @@ public class AdapterService extends Service { receiver.propagateException(e); } } public int startRfcommListener( @RequiresPermission(allOf = { android.Manifest.permission.BLUETOOTH_CONNECT, android.Manifest.permission.BLUETOOTH_PRIVILEGED, }) private int startRfcommListener( String name, ParcelUuid uuid, PendingIntent pendingIntent, AttributionSource attributionSource) { return mService.startRfcommListener(name, uuid, pendingIntent, attributionSource); AdapterService service = getService(); if (service == null || !Utils.checkCallerIsSystemOrActiveUser(TAG) || !Utils.checkConnectPermissionForDataDelivery( service, attributionSource, "AdapterService startRfcommListener")) { return BluetoothStatusCodes.ERROR_BLUETOOTH_NOT_ALLOWED; } enforceBluetoothPrivilegedPermission(service); return service.startRfcommListener(name, uuid, pendingIntent, attributionSource); } @Override Loading @@ -3714,8 +3734,20 @@ public class AdapterService extends Service { receiver.propagateException(e); } } public int stopRfcommListener(ParcelUuid uuid, AttributionSource attributionSource) { return mService.stopRfcommListener(uuid, attributionSource); @RequiresPermission(allOf = { android.Manifest.permission.BLUETOOTH_CONNECT, android.Manifest.permission.BLUETOOTH_PRIVILEGED, }) private int stopRfcommListener(ParcelUuid uuid, AttributionSource attributionSource) { AdapterService service = getService(); if (service == null || !Utils.checkCallerIsSystemOrActiveUser(TAG) || !Utils.checkConnectPermissionForDataDelivery( service, attributionSource, "AdapterService stopRfcommListener")) { return BluetoothStatusCodes.ERROR_BLUETOOTH_NOT_ALLOWED; } enforceBluetoothPrivilegedPermission(service); return service.stopRfcommListener(uuid, attributionSource); } @Override Loading @@ -3727,9 +3759,22 @@ public class AdapterService extends Service { receiver.propagateException(e); } } public IncomingRfcommSocketInfo retrievePendingSocketForServiceRecord( @RequiresPermission(allOf = { android.Manifest.permission.BLUETOOTH_CONNECT, android.Manifest.permission.BLUETOOTH_PRIVILEGED, }) private IncomingRfcommSocketInfo retrievePendingSocketForServiceRecord( ParcelUuid uuid, AttributionSource attributionSource) { return mService.retrievePendingSocketForServiceRecord(uuid, attributionSource); AdapterService service = getService(); if (service == null || !Utils.checkCallerIsSystemOrActiveUser(TAG) || !Utils.checkConnectPermissionForDataDelivery( service, attributionSource, "AdapterService retrievePendingSocketForServiceRecord")) { return null; } enforceBluetoothPrivilegedPermission(service); return service.retrievePendingSocketForServiceRecord(uuid, attributionSource); } @Override Loading system/blueberry/tests/gd/cert/os_utils.py +0 −1 Original line number Diff line number Diff line Loading @@ -77,7 +77,6 @@ def make_ports_available(ports: Container[int], timeout_seconds=10): logging.warning("Freeing port %d used by %s" % (conn.laddr.port, str(conn))) if not conn.pid: logging.error("Failed to kill process occupying port %d due to lack of pid" % conn.laddr.port) success = False continue logging.warning("Killing pid %d that is using port port %d" % (conn.pid, conn.laddr.port)) if conn.pid in killed_pids: Loading system/blueberry/tests/sl4a_sl4a/advertising/le_advertising.py 0 → 100644 +57 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2022 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import binascii import io import logging import os import queue from blueberry.tests.gd.cert.context import get_current_context from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test from blueberry.tests.gd_sl4a.lib.bt_constants import ble_address_types class LeAdvertisingTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass): def setup_class(self): super().setup_class() def setup_test(self): super().setup_test() def teardown_test(self): super().teardown_test() def test_advertise_name(self): rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu() self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name()) self.dut_scanner_.stop_scanning() self.cert_advertiser_.stop_advertising() def test_advertise_name_stress(self): for i in range(0, 10): self.test_advertise_name() def test_advertise_name_twice_no_stop(self): rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu() self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name()) self.dut_scanner_.stop_scanning() rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu() self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name()) self.dut_scanner_.stop_scanning() self.cert_advertiser_.stop_advertising() system/blueberry/tests/sl4a_sl4a/lib/le_advertiser.py 0 → 100644 +81 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2022 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import binascii import logging import queue from blueberry.facade import common_pb2 as common from blueberry.tests.gd.cert.closable import Closable from blueberry.tests.gd.cert.closable import safeClose from blueberry.tests.gd.cert.truth import assertThat from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_advertise_objects from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ from blueberry.tests.gd_sl4a.lib.bt_constants import ble_advertise_settings_modes from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test class LeAdvertiser(Closable): is_advertising = False device = None default_timeout = 10 # seconds advertise_callback = None advertise_data = None advertise_settings = None def __init__(self, device): self.device = device def __wait_for_event(self, expected_event_name): try: event_info = self.device.ed.pop_event(expected_event_name, self.default_timeout) logging.info(event_info) except queue.Empty as error: logging.error("Failed to find event: %s", expected_event_name) return False return True def advertise_rpa_public_extended_pdu(self, name="SL4A Device"): if self.is_advertising: logging.info("Already advertising!") return self.is_advertising = True self.device.sl4a.bleSetScanSettingsLegacy(False) self.device.sl4a.bleSetAdvertiseSettingsIsConnectable(True) self.device.sl4a.bleSetAdvertiseDataIncludeDeviceName(True) self.device.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency']) self.device.sl4a.bleSetAdvertiseSettingsOwnAddressType(common.RANDOM_DEVICE_ADDRESS) self.advertise_callback, self.advertise_data, self.advertise_settings = generate_ble_advertise_objects( self.device.sl4a) self.device.sl4a.bleStartBleAdvertising(self.advertise_callback, self.advertise_data, self.advertise_settings) # Wait for SL4A cert to start advertising assertThat(self.__wait_for_event(adv_succ.format(self.advertise_callback))).isTrue() logging.info("Advertising started") def get_local_advertising_name(self): return self.device.sl4a.bluetoothGetLocalName() def stop_advertising(self): if self.is_advertising: logging.info("Stopping advertisement") self.device.sl4a.bleStopBleAdvertising(self.advertise_callback) self.is_advertising = False def close(self): self.stop_advertising() self.device = None system/blueberry/tests/sl4a_sl4a/lib/le_scanner.py 0 → 100644 +168 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2022 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import queue from blueberry.facade import common_pb2 as common from blueberry.tests.gd.cert.closable import Closable from blueberry.tests.gd.cert.closable import safeClose from blueberry.tests.gd.cert.truth import assertThat from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes from blueberry.tests.gd_sl4a.lib.bt_constants import scan_result from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test class LeScanner(Closable): is_scanning = False device = None filter_list = None scan_settings = None scan_callback = None def __init__(self, device): self.device = device def __wait_for_scan_result_event(self, expected_event_name, timeout=60): try: event_info = self.device.ed.pop_event(expected_event_name, timeout) except queue.Empty as error: logging.error("Could not find scan result event: %s", expected_event_name) return None return event_info['data']['Result']['deviceInfo']['address'] def scan_for_address_expect_none(self, address, addr_type): if self.is_scanning: print("Already scanning!") return self.is_scanning = True logging.info("Start scanning for identity address {} or type {}".format(address, addr_type)) self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.device.sl4a.bleSetScanSettingsLegacy(False) self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) expected_event_name = scan_result.format(self.scan_callback) # Start scanning on SL4A DUT self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type) self.device.sl4a.bleBuildScanFilter(self.filter_list) self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback) # Verify that scan result is received on SL4A DUT advertising_address = self.__wait_for_scan_result_event(expected_event_name, 1) assertThat(advertising_address).isNone() logging.info("Filter advertisement with address {}".format(advertising_address)) def scan_for_address(self, address, addr_type): if self.is_scanning: print("Already scanning!") return self.is_scanning = True logging.info("Start scanning for identity address {} or type {}".format(address, addr_type)) self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.device.sl4a.bleSetScanSettingsLegacy(False) self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) expected_event_name = scan_result.format(self.scan_callback) # Start scanning on SL4A DUT self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type) self.device.sl4a.bleBuildScanFilter(self.filter_list) self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback) # Verify that scan result is received on SL4A DUT advertising_address = self.__wait_for_scan_result_event(expected_event_name) assertThat(advertising_address).isNotNone() logging.info("Filter advertisement with address {}".format(advertising_address)) def scan_for_address_with_irk(self, address, addr_type, irk): if self.is_scanning: print("Already scanning!") return self.is_scanning = True logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk)) self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.device.sl4a.bleSetScanSettingsLegacy(False) self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) expected_event_name = scan_result.format(self.scan_callback) # Start scanning on SL4A DUT self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk) self.device.sl4a.bleBuildScanFilter(self.filter_list) self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback) # Verify that scan result is received on SL4A DUT advertising_address = self.__wait_for_scan_result_event(expected_event_name) assertThat(advertising_address).isNotNone() logging.info("Filter advertisement with address {}".format(advertising_address)) def scan_for_address_with_irk_pending_intent(self, address, addr_type, irk): if self.is_scanning: print("Already scanning!") return self.is_scanning = True logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk)) self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.device.sl4a.bleSetScanSettingsLegacy(False) self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) # Hard code here since callback index iterates and will cause this to fail if ran # Second as the impl in SL4A sends this since it's a single callback for broadcast. expected_event_name = "BleScan1onScanResults" # Start scanning on SL4A DUT self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk) self.device.sl4a.bleBuildScanFilter(self.filter_list) self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings) # Verify that scan result is received on SL4A DUT advertising_address = self.__wait_for_scan_result_event(expected_event_name) assertThat(advertising_address).isNotNone() logging.info("Filter advertisement with address {}".format(advertising_address)) def scan_for_name(self, name): if self.is_scanning: print("Already scanning!") return self.is_scanning = True logging.info("Start scanning for name {}".format(name)) self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.device.sl4a.bleSetScanSettingsLegacy(False) self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) expected_event_name = scan_result.format(1) # Start scanning on SL4A DUT self.device.sl4a.bleSetScanFilterDeviceName(name) self.device.sl4a.bleBuildScanFilter(self.filter_list) self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings) # Verify that scan result is received on SL4A DUT advertising_address = self.__wait_for_scan_result_event(expected_event_name) assertThat(advertising_address).isNotNone() logging.info("Filter advertisement with address {}".format(advertising_address)) return advertising_address def stop_scanning(self): """ Warning: no java callback registered for this """ if self.is_scanning: logging.info("Stopping scan") self.device.sl4a.bleStopBleScan(self.scan_callback) self.is_scanning = False def close(self): self.stop_scanning() self.device = None Loading
android/app/src/com/android/bluetooth/btservice/AdapterService.java +52 −7 Original line number Diff line number Diff line Loading @@ -3679,9 +3679,17 @@ public class AdapterService extends Service { receiver.propagateException(e); } } @RequiresPermission(allOf = { android.Manifest.permission.BLUETOOTH_CONNECT, android.Manifest.permission.BLUETOOTH_PRIVILEGED, }) private boolean allowLowLatencyAudio(boolean allowed, BluetoothDevice device) { AdapterService service = getService(); if (service == null) { if (service == null || !Utils.checkCallerIsSystemOrActiveUser(TAG) || !Utils.checkConnectPermissionForDataDelivery( service, Utils.getCallingAttributionSource(service), "AdapterService allowLowLatencyAudio")) { return false; } enforceBluetoothPrivilegedPermission(service); Loading @@ -3697,12 +3705,24 @@ public class AdapterService extends Service { receiver.propagateException(e); } } public int startRfcommListener( @RequiresPermission(allOf = { android.Manifest.permission.BLUETOOTH_CONNECT, android.Manifest.permission.BLUETOOTH_PRIVILEGED, }) private int startRfcommListener( String name, ParcelUuid uuid, PendingIntent pendingIntent, AttributionSource attributionSource) { return mService.startRfcommListener(name, uuid, pendingIntent, attributionSource); AdapterService service = getService(); if (service == null || !Utils.checkCallerIsSystemOrActiveUser(TAG) || !Utils.checkConnectPermissionForDataDelivery( service, attributionSource, "AdapterService startRfcommListener")) { return BluetoothStatusCodes.ERROR_BLUETOOTH_NOT_ALLOWED; } enforceBluetoothPrivilegedPermission(service); return service.startRfcommListener(name, uuid, pendingIntent, attributionSource); } @Override Loading @@ -3714,8 +3734,20 @@ public class AdapterService extends Service { receiver.propagateException(e); } } public int stopRfcommListener(ParcelUuid uuid, AttributionSource attributionSource) { return mService.stopRfcommListener(uuid, attributionSource); @RequiresPermission(allOf = { android.Manifest.permission.BLUETOOTH_CONNECT, android.Manifest.permission.BLUETOOTH_PRIVILEGED, }) private int stopRfcommListener(ParcelUuid uuid, AttributionSource attributionSource) { AdapterService service = getService(); if (service == null || !Utils.checkCallerIsSystemOrActiveUser(TAG) || !Utils.checkConnectPermissionForDataDelivery( service, attributionSource, "AdapterService stopRfcommListener")) { return BluetoothStatusCodes.ERROR_BLUETOOTH_NOT_ALLOWED; } enforceBluetoothPrivilegedPermission(service); return service.stopRfcommListener(uuid, attributionSource); } @Override Loading @@ -3727,9 +3759,22 @@ public class AdapterService extends Service { receiver.propagateException(e); } } public IncomingRfcommSocketInfo retrievePendingSocketForServiceRecord( @RequiresPermission(allOf = { android.Manifest.permission.BLUETOOTH_CONNECT, android.Manifest.permission.BLUETOOTH_PRIVILEGED, }) private IncomingRfcommSocketInfo retrievePendingSocketForServiceRecord( ParcelUuid uuid, AttributionSource attributionSource) { return mService.retrievePendingSocketForServiceRecord(uuid, attributionSource); AdapterService service = getService(); if (service == null || !Utils.checkCallerIsSystemOrActiveUser(TAG) || !Utils.checkConnectPermissionForDataDelivery( service, attributionSource, "AdapterService retrievePendingSocketForServiceRecord")) { return null; } enforceBluetoothPrivilegedPermission(service); return service.retrievePendingSocketForServiceRecord(uuid, attributionSource); } @Override Loading
system/blueberry/tests/gd/cert/os_utils.py +0 −1 Original line number Diff line number Diff line Loading @@ -77,7 +77,6 @@ def make_ports_available(ports: Container[int], timeout_seconds=10): logging.warning("Freeing port %d used by %s" % (conn.laddr.port, str(conn))) if not conn.pid: logging.error("Failed to kill process occupying port %d due to lack of pid" % conn.laddr.port) success = False continue logging.warning("Killing pid %d that is using port port %d" % (conn.pid, conn.laddr.port)) if conn.pid in killed_pids: Loading
system/blueberry/tests/sl4a_sl4a/advertising/le_advertising.py 0 → 100644 +57 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2022 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import binascii import io import logging import os import queue from blueberry.tests.gd.cert.context import get_current_context from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test from blueberry.tests.gd_sl4a.lib.bt_constants import ble_address_types class LeAdvertisingTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass): def setup_class(self): super().setup_class() def setup_test(self): super().setup_test() def teardown_test(self): super().teardown_test() def test_advertise_name(self): rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu() self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name()) self.dut_scanner_.stop_scanning() self.cert_advertiser_.stop_advertising() def test_advertise_name_stress(self): for i in range(0, 10): self.test_advertise_name() def test_advertise_name_twice_no_stop(self): rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu() self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name()) self.dut_scanner_.stop_scanning() rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu() self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name()) self.dut_scanner_.stop_scanning() self.cert_advertiser_.stop_advertising()
system/blueberry/tests/sl4a_sl4a/lib/le_advertiser.py 0 → 100644 +81 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2022 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import binascii import logging import queue from blueberry.facade import common_pb2 as common from blueberry.tests.gd.cert.closable import Closable from blueberry.tests.gd.cert.closable import safeClose from blueberry.tests.gd.cert.truth import assertThat from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_advertise_objects from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ from blueberry.tests.gd_sl4a.lib.bt_constants import ble_advertise_settings_modes from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test class LeAdvertiser(Closable): is_advertising = False device = None default_timeout = 10 # seconds advertise_callback = None advertise_data = None advertise_settings = None def __init__(self, device): self.device = device def __wait_for_event(self, expected_event_name): try: event_info = self.device.ed.pop_event(expected_event_name, self.default_timeout) logging.info(event_info) except queue.Empty as error: logging.error("Failed to find event: %s", expected_event_name) return False return True def advertise_rpa_public_extended_pdu(self, name="SL4A Device"): if self.is_advertising: logging.info("Already advertising!") return self.is_advertising = True self.device.sl4a.bleSetScanSettingsLegacy(False) self.device.sl4a.bleSetAdvertiseSettingsIsConnectable(True) self.device.sl4a.bleSetAdvertiseDataIncludeDeviceName(True) self.device.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency']) self.device.sl4a.bleSetAdvertiseSettingsOwnAddressType(common.RANDOM_DEVICE_ADDRESS) self.advertise_callback, self.advertise_data, self.advertise_settings = generate_ble_advertise_objects( self.device.sl4a) self.device.sl4a.bleStartBleAdvertising(self.advertise_callback, self.advertise_data, self.advertise_settings) # Wait for SL4A cert to start advertising assertThat(self.__wait_for_event(adv_succ.format(self.advertise_callback))).isTrue() logging.info("Advertising started") def get_local_advertising_name(self): return self.device.sl4a.bluetoothGetLocalName() def stop_advertising(self): if self.is_advertising: logging.info("Stopping advertisement") self.device.sl4a.bleStopBleAdvertising(self.advertise_callback) self.is_advertising = False def close(self): self.stop_advertising() self.device = None
system/blueberry/tests/sl4a_sl4a/lib/le_scanner.py 0 → 100644 +168 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2022 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import queue from blueberry.facade import common_pb2 as common from blueberry.tests.gd.cert.closable import Closable from blueberry.tests.gd.cert.closable import safeClose from blueberry.tests.gd.cert.truth import assertThat from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes from blueberry.tests.gd_sl4a.lib.bt_constants import scan_result from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test class LeScanner(Closable): is_scanning = False device = None filter_list = None scan_settings = None scan_callback = None def __init__(self, device): self.device = device def __wait_for_scan_result_event(self, expected_event_name, timeout=60): try: event_info = self.device.ed.pop_event(expected_event_name, timeout) except queue.Empty as error: logging.error("Could not find scan result event: %s", expected_event_name) return None return event_info['data']['Result']['deviceInfo']['address'] def scan_for_address_expect_none(self, address, addr_type): if self.is_scanning: print("Already scanning!") return self.is_scanning = True logging.info("Start scanning for identity address {} or type {}".format(address, addr_type)) self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.device.sl4a.bleSetScanSettingsLegacy(False) self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) expected_event_name = scan_result.format(self.scan_callback) # Start scanning on SL4A DUT self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type) self.device.sl4a.bleBuildScanFilter(self.filter_list) self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback) # Verify that scan result is received on SL4A DUT advertising_address = self.__wait_for_scan_result_event(expected_event_name, 1) assertThat(advertising_address).isNone() logging.info("Filter advertisement with address {}".format(advertising_address)) def scan_for_address(self, address, addr_type): if self.is_scanning: print("Already scanning!") return self.is_scanning = True logging.info("Start scanning for identity address {} or type {}".format(address, addr_type)) self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.device.sl4a.bleSetScanSettingsLegacy(False) self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) expected_event_name = scan_result.format(self.scan_callback) # Start scanning on SL4A DUT self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type) self.device.sl4a.bleBuildScanFilter(self.filter_list) self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback) # Verify that scan result is received on SL4A DUT advertising_address = self.__wait_for_scan_result_event(expected_event_name) assertThat(advertising_address).isNotNone() logging.info("Filter advertisement with address {}".format(advertising_address)) def scan_for_address_with_irk(self, address, addr_type, irk): if self.is_scanning: print("Already scanning!") return self.is_scanning = True logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk)) self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.device.sl4a.bleSetScanSettingsLegacy(False) self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) expected_event_name = scan_result.format(self.scan_callback) # Start scanning on SL4A DUT self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk) self.device.sl4a.bleBuildScanFilter(self.filter_list) self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback) # Verify that scan result is received on SL4A DUT advertising_address = self.__wait_for_scan_result_event(expected_event_name) assertThat(advertising_address).isNotNone() logging.info("Filter advertisement with address {}".format(advertising_address)) def scan_for_address_with_irk_pending_intent(self, address, addr_type, irk): if self.is_scanning: print("Already scanning!") return self.is_scanning = True logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk)) self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.device.sl4a.bleSetScanSettingsLegacy(False) self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) # Hard code here since callback index iterates and will cause this to fail if ran # Second as the impl in SL4A sends this since it's a single callback for broadcast. expected_event_name = "BleScan1onScanResults" # Start scanning on SL4A DUT self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk) self.device.sl4a.bleBuildScanFilter(self.filter_list) self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings) # Verify that scan result is received on SL4A DUT advertising_address = self.__wait_for_scan_result_event(expected_event_name) assertThat(advertising_address).isNotNone() logging.info("Filter advertisement with address {}".format(advertising_address)) def scan_for_name(self, name): if self.is_scanning: print("Already scanning!") return self.is_scanning = True logging.info("Start scanning for name {}".format(name)) self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.device.sl4a.bleSetScanSettingsLegacy(False) self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) expected_event_name = scan_result.format(1) # Start scanning on SL4A DUT self.device.sl4a.bleSetScanFilterDeviceName(name) self.device.sl4a.bleBuildScanFilter(self.filter_list) self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings) # Verify that scan result is received on SL4A DUT advertising_address = self.__wait_for_scan_result_event(expected_event_name) assertThat(advertising_address).isNotNone() logging.info("Filter advertisement with address {}".format(advertising_address)) return advertising_address def stop_scanning(self): """ Warning: no java callback registered for this """ if self.is_scanning: logging.info("Stopping scan") self.device.sl4a.bleStopBleScan(self.scan_callback) self.is_scanning = False def close(self): self.stop_scanning() self.device = None