Loading system/blueberry/tests/gd_sl4a/lib/bt_constants.py +1 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ adv_succ = "BleAdvertise{}onSuccess" bluetooth_off = "BluetoothStateChangedOff" bluetooth_on = "BluetoothStateChangedOn" mtu_changed = "GattConnect{}onMtuChanged" gatt_connection_state_change = "GattConnect{}onConnectionStateChange" advertising_set_started = "AdvertisingSet{}onAdvertisingSetStarted" advertising_set_stopped = "AdvertisingSet{}onAdvertisingSetStopped" advertising_set_on_own_address_read = "AdvertisingSet{}onOwnAddressRead" Loading system/blueberry/tests/sl4a_sl4a/gatt/gatt_connect_test.py 0 → 100644 +137 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2021 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import io import os import queue import logging from blueberry.tests.gd.cert.context import get_current_context from blueberry.tests.gd.cert.truth import assertThat from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ, ble_advertise_settings_modes, ble_scan_settings_modes, ble_address_types, ble_scan_settings_phys, gatt_connection_state_change, gatt_transport, scan_result from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects, generate_ble_advertise_objects from blueberry.tests.sl4a_sl4a.lib.sl4a_sl4a_base_test import Sl4aSl4aBaseTestClass from blueberry.facade import common_pb2 as common from mobly.controllers.android_device_lib.adb import AdbError class GattConnectTest(Sl4aSl4aBaseTestClass): def setup_class(self): super().setup_class() self.default_timeout = 10 # seconds def setup_test(self): super().setup_test() def teardown_test(self): super().teardown_test() def _wait_for_event(self, expected_event_name, device): try: event_info = device.ed.pop_event(expected_event_name, self.default_timeout) logging.info(event_info) except queue.Empty as error: logging.error("Failed to find event: %s", expected_event_name) return False return True def _wait_for_scan_result_event(self, expected_event_name, device): try: event_info = device.ed.pop_event(expected_event_name, self.default_timeout) except queue.Empty as error: logging.error("Could not find scan result event: %s", expected_event_name) return None return event_info['data']['Result']['deviceInfo']['address'] def _get_cert_public_address_and_irk_from_bt_config(self): # Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning bt_config_file_path = os.path.join(get_current_context().get_full_output_path(), "DUT_%s_bt_config.conf" % self.cert.serial) try: self.cert.adb.pull(["/data/misc/bluedroid/bt_config.conf", bt_config_file_path]) except AdbError as error: logging.error("Failed to pull SL4A cert BT config") return False logging.debug("Reading SL4A cert BT config") with io.open(bt_config_file_path) as f: for line in f.readlines(): stripped_line = line.strip() if (stripped_line.startswith("Address")): address_fields = stripped_line.split(' ') # API currently requires public address to be capitalized address = address_fields[2].upper() logging.debug("Found cert address: %s" % address) continue if (stripped_line.startswith("LE_LOCAL_KEY_IRK")): irk_fields = stripped_line.split(' ') irk = irk_fields[2] logging.debug("Found cert IRK: %s" % irk) continue return address, irk def test_scan_connect_unbonded_device_public_address_with_irk(self): # Set up SL4A cert side to advertise logging.info("Starting advertising") self.cert.sl4a.bleSetAdvertiseSettingsIsConnectable(True) self.cert.sl4a.bleSetAdvertiseDataIncludeDeviceName(True) self.cert.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency']) self.cert.sl4a.bleSetAdvertiseSettingsOwnAddressType(common.RANDOM_DEVICE_ADDRESS) advertise_callback, advertise_data, advertise_settings = generate_ble_advertise_objects(self.cert.sl4a) self.cert.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) # Wait for SL4A cert to start advertising assertThat(self._wait_for_event(adv_succ.format(advertise_callback), self.cert)).isTrue() logging.info("Advertising started") # Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning cert_public_address, irk = self._get_cert_public_address_and_irk_from_bt_config() # Set up SL4A DUT side to scan addr_type = ble_address_types["public"] logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" % (cert_public_address, addr_type, irk)) self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.dut.sl4a.bleSetScanSettingsLegacy(False) filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) expected_event_name = scan_result.format(scan_callback) # Start scanning on SL4A DUT self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(cert_public_address, int(addr_type), irk) self.dut.sl4a.bleBuildScanFilter(filter_list) self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) logging.info("Started scanning") # Verify that scan result is received on SL4A DUT mac_address = self._wait_for_scan_result_event(expected_event_name, self.dut) assertThat(mac_address).isNotNone() logging.info("Filter advertisement with address {}".format(mac_address)) # Stop scanning and try to connect GATT self.dut.sl4a.bleStopBleScan(scan_callback) gatt_callback = self.dut.sl4a.gattCreateGattCallback() bluetooth_gatt = self.dut.sl4a.gattClientConnectGatt(gatt_callback, mac_address, False, gatt_transport['le'], False, None) assertThat(bluetooth_gatt).isNotNone() # Verify that GATT connect event occurs on SL4A DUT expected_event_name = gatt_connection_state_change.format(gatt_callback) assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue() # Test over self.cert.sl4a.bleStopBleAdvertising(advertise_callback) system/blueberry/tests/sl4a_sl4a/sl4a_sl4a_test_runner.py +3 −1 Original line number Diff line number Diff line Loading @@ -14,10 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. from blueberry.tests.sl4a_sl4a.gatt.gatt_connect_test import GattConnectTest from mobly import suite_runner import argparse ALL_TESTS = [] ALL_TESTS = [GattConnectTest] def main(): Loading system/gd/hci/acl_manager/le_impl.h +11 −2 Original line number Diff line number Diff line Loading @@ -292,7 +292,6 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback { on_le_connection_canceled_on_pause(); return; } // TODO: find out which address and type was used to initiate the connection AddressWithType remote_address(address, peer_address_type); AddressWithType local_address = le_address_manager_->GetCurrentAddress(); on_common_le_connection_complete(remote_address); Loading Loading @@ -363,9 +362,15 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback { return; } AddressWithType remote_address(address, peer_address_type); if (!peer_resolvable_address.IsEmpty()) { // The address added to the connect list is the one that callbacks // should be sent for, given that is the address passed in // call to BluetoothDevice#connectGatt and tied to app layer callbacks. if (!peer_resolvable_address.IsEmpty() && is_device_in_connect_list(AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS))) { LOG_INFO("Using resolvable address"); remote_address = AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS); } on_common_le_connection_complete(remote_address); if (status == ErrorCode::UNKNOWN_CONNECTION) { if (remote_address.GetAddress() != Address::kEmpty) { Loading Loading @@ -556,6 +561,10 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback { address_with_type.ToFilterAcceptListAddressType(), address_with_type.GetAddress()); } bool is_device_in_connect_list(AddressWithType address_with_type) { return (connect_list.find(address_with_type) != connect_list.end()); } void remove_device_from_connect_list(AddressWithType address_with_type) { if (connect_list.find(address_with_type) == connect_list.end()) { LOG_WARN("Device not in acceptlist and cannot be removed:%s", PRIVATE_ADDRESS_WITH_TYPE(address_with_type)); Loading system/main/shim/acl.cc +1 −0 Original line number Diff line number Diff line Loading @@ -1516,6 +1516,7 @@ void shim::legacy::Acl::OnLeConnectSuccess( // Once an le connection has successfully been established // the device address is removed from the controller accept list. if (IsRpa(address_with_type)) { LOG_DEBUG("Connection address is rpa:%s identity_addr:%s", PRIVATE_ADDRESS(address_with_type), Loading system/blueberry/tests/sl4a_sl4a/lib/sl4a_sl4a_base_test.py +7 −7 File changed.Contains only whitespace changes. Show changes Loading
system/blueberry/tests/gd_sl4a/lib/bt_constants.py +1 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ adv_succ = "BleAdvertise{}onSuccess" bluetooth_off = "BluetoothStateChangedOff" bluetooth_on = "BluetoothStateChangedOn" mtu_changed = "GattConnect{}onMtuChanged" gatt_connection_state_change = "GattConnect{}onConnectionStateChange" advertising_set_started = "AdvertisingSet{}onAdvertisingSetStarted" advertising_set_stopped = "AdvertisingSet{}onAdvertisingSetStopped" advertising_set_on_own_address_read = "AdvertisingSet{}onOwnAddressRead" Loading
system/blueberry/tests/sl4a_sl4a/gatt/gatt_connect_test.py 0 → 100644 +137 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2021 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import io import os import queue import logging from blueberry.tests.gd.cert.context import get_current_context from blueberry.tests.gd.cert.truth import assertThat from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ, ble_advertise_settings_modes, ble_scan_settings_modes, ble_address_types, ble_scan_settings_phys, gatt_connection_state_change, gatt_transport, scan_result from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects, generate_ble_advertise_objects from blueberry.tests.sl4a_sl4a.lib.sl4a_sl4a_base_test import Sl4aSl4aBaseTestClass from blueberry.facade import common_pb2 as common from mobly.controllers.android_device_lib.adb import AdbError class GattConnectTest(Sl4aSl4aBaseTestClass): def setup_class(self): super().setup_class() self.default_timeout = 10 # seconds def setup_test(self): super().setup_test() def teardown_test(self): super().teardown_test() def _wait_for_event(self, expected_event_name, device): try: event_info = device.ed.pop_event(expected_event_name, self.default_timeout) logging.info(event_info) except queue.Empty as error: logging.error("Failed to find event: %s", expected_event_name) return False return True def _wait_for_scan_result_event(self, expected_event_name, device): try: event_info = device.ed.pop_event(expected_event_name, self.default_timeout) except queue.Empty as error: logging.error("Could not find scan result event: %s", expected_event_name) return None return event_info['data']['Result']['deviceInfo']['address'] def _get_cert_public_address_and_irk_from_bt_config(self): # Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning bt_config_file_path = os.path.join(get_current_context().get_full_output_path(), "DUT_%s_bt_config.conf" % self.cert.serial) try: self.cert.adb.pull(["/data/misc/bluedroid/bt_config.conf", bt_config_file_path]) except AdbError as error: logging.error("Failed to pull SL4A cert BT config") return False logging.debug("Reading SL4A cert BT config") with io.open(bt_config_file_path) as f: for line in f.readlines(): stripped_line = line.strip() if (stripped_line.startswith("Address")): address_fields = stripped_line.split(' ') # API currently requires public address to be capitalized address = address_fields[2].upper() logging.debug("Found cert address: %s" % address) continue if (stripped_line.startswith("LE_LOCAL_KEY_IRK")): irk_fields = stripped_line.split(' ') irk = irk_fields[2] logging.debug("Found cert IRK: %s" % irk) continue return address, irk def test_scan_connect_unbonded_device_public_address_with_irk(self): # Set up SL4A cert side to advertise logging.info("Starting advertising") self.cert.sl4a.bleSetAdvertiseSettingsIsConnectable(True) self.cert.sl4a.bleSetAdvertiseDataIncludeDeviceName(True) self.cert.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency']) self.cert.sl4a.bleSetAdvertiseSettingsOwnAddressType(common.RANDOM_DEVICE_ADDRESS) advertise_callback, advertise_data, advertise_settings = generate_ble_advertise_objects(self.cert.sl4a) self.cert.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) # Wait for SL4A cert to start advertising assertThat(self._wait_for_event(adv_succ.format(advertise_callback), self.cert)).isTrue() logging.info("Advertising started") # Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning cert_public_address, irk = self._get_cert_public_address_and_irk_from_bt_config() # Set up SL4A DUT side to scan addr_type = ble_address_types["public"] logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" % (cert_public_address, addr_type, irk)) self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) self.dut.sl4a.bleSetScanSettingsLegacy(False) filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) expected_event_name = scan_result.format(scan_callback) # Start scanning on SL4A DUT self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(cert_public_address, int(addr_type), irk) self.dut.sl4a.bleBuildScanFilter(filter_list) self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) logging.info("Started scanning") # Verify that scan result is received on SL4A DUT mac_address = self._wait_for_scan_result_event(expected_event_name, self.dut) assertThat(mac_address).isNotNone() logging.info("Filter advertisement with address {}".format(mac_address)) # Stop scanning and try to connect GATT self.dut.sl4a.bleStopBleScan(scan_callback) gatt_callback = self.dut.sl4a.gattCreateGattCallback() bluetooth_gatt = self.dut.sl4a.gattClientConnectGatt(gatt_callback, mac_address, False, gatt_transport['le'], False, None) assertThat(bluetooth_gatt).isNotNone() # Verify that GATT connect event occurs on SL4A DUT expected_event_name = gatt_connection_state_change.format(gatt_callback) assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue() # Test over self.cert.sl4a.bleStopBleAdvertising(advertise_callback)
system/blueberry/tests/sl4a_sl4a/sl4a_sl4a_test_runner.py +3 −1 Original line number Diff line number Diff line Loading @@ -14,10 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. from blueberry.tests.sl4a_sl4a.gatt.gatt_connect_test import GattConnectTest from mobly import suite_runner import argparse ALL_TESTS = [] ALL_TESTS = [GattConnectTest] def main(): Loading
system/gd/hci/acl_manager/le_impl.h +11 −2 Original line number Diff line number Diff line Loading @@ -292,7 +292,6 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback { on_le_connection_canceled_on_pause(); return; } // TODO: find out which address and type was used to initiate the connection AddressWithType remote_address(address, peer_address_type); AddressWithType local_address = le_address_manager_->GetCurrentAddress(); on_common_le_connection_complete(remote_address); Loading Loading @@ -363,9 +362,15 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback { return; } AddressWithType remote_address(address, peer_address_type); if (!peer_resolvable_address.IsEmpty()) { // The address added to the connect list is the one that callbacks // should be sent for, given that is the address passed in // call to BluetoothDevice#connectGatt and tied to app layer callbacks. if (!peer_resolvable_address.IsEmpty() && is_device_in_connect_list(AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS))) { LOG_INFO("Using resolvable address"); remote_address = AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS); } on_common_le_connection_complete(remote_address); if (status == ErrorCode::UNKNOWN_CONNECTION) { if (remote_address.GetAddress() != Address::kEmpty) { Loading Loading @@ -556,6 +561,10 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback { address_with_type.ToFilterAcceptListAddressType(), address_with_type.GetAddress()); } bool is_device_in_connect_list(AddressWithType address_with_type) { return (connect_list.find(address_with_type) != connect_list.end()); } void remove_device_from_connect_list(AddressWithType address_with_type) { if (connect_list.find(address_with_type) == connect_list.end()) { LOG_WARN("Device not in acceptlist and cannot be removed:%s", PRIVATE_ADDRESS_WITH_TYPE(address_with_type)); Loading
system/main/shim/acl.cc +1 −0 Original line number Diff line number Diff line Loading @@ -1516,6 +1516,7 @@ void shim::legacy::Acl::OnLeConnectSuccess( // Once an le connection has successfully been established // the device address is removed from the controller accept list. if (IsRpa(address_with_type)) { LOG_DEBUG("Connection address is rpa:%s identity_addr:%s", PRIVATE_ADDRESS(address_with_type), Loading
system/blueberry/tests/sl4a_sl4a/lib/sl4a_sl4a_base_test.py +7 −7 File changed.Contains only whitespace changes. Show changes