Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e0dbe91d authored by Rahul Sabnis's avatar Rahul Sabnis Committed by Android Build Coastguard Worker
Browse files

Add an IRK rotation sl4a_sl4a test

This test verifies that GATT reconnections succeed after an IRK
rotation.

Bug: 237600923
Test: system/gd/cert/run --clean --sl4a_sl4a IrkRotationTest
Merged-In: Idc54f21f366074070f245fccd9fdda14a50e1645
Change-Id: Idc54f21f366074070f245fccd9fdda14a50e1645
(cherry picked from commit c55ac6b8)
Merged-In: Idc54f21f366074070f245fccd9fdda14a50e1645
parent 49e3d437
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -38,7 +38,7 @@ class LeAdvertisingTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):
        super().teardown_test()

    def test_advertise_name(self):
        rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu()
        rpa_address = self.cert_advertiser_.advertise_public_extended_pdu()
        self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name())
        self.dut_scanner_.stop_scanning()
        self.cert_advertiser_.stop_advertising()
@@ -48,10 +48,10 @@ class LeAdvertisingTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):
            self.test_advertise_name()

    def test_advertise_name_twice_no_stop(self):
        rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu()
        rpa_address = self.cert_advertiser_.advertise_public_extended_pdu()
        self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name())
        self.dut_scanner_.stop_scanning()
        rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu()
        rpa_address = self.cert_advertiser_.advertise_public_extended_pdu()
        self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name())
        self.dut_scanner_.stop_scanning()
        self.cert_advertiser_.stop_advertising()
+3 −2
Original line number Diff line number Diff line
@@ -49,16 +49,17 @@ class LeAdvertiser(Closable):
            return False
        return True

    def advertise_rpa_public_extended_pdu(self, name="SL4A Device"):
    def advertise_public_extended_pdu(self, address_type=common.RANDOM_DEVICE_ADDRESS, name="SL4A Device"):
        if self.is_advertising:
            logging.info("Already advertising!")
            return
        logging.info("Configuring advertisement with address type %d", address_type)
        self.is_advertising = True
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.device.sl4a.bleSetAdvertiseSettingsIsConnectable(True)
        self.device.sl4a.bleSetAdvertiseDataIncludeDeviceName(True)
        self.device.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency'])
        self.device.sl4a.bleSetAdvertiseSettingsOwnAddressType(common.RANDOM_DEVICE_ADDRESS)
        self.device.sl4a.bleSetAdvertiseSettingsOwnAddressType(address_type)
        self.advertise_callback, self.advertise_data, self.advertise_settings = generate_ble_advertise_objects(
            self.device.sl4a)
        self.device.sl4a.bleStartBleAdvertising(self.advertise_callback, self.advertise_data, self.advertise_settings)
+1 −0
Original line number Diff line number Diff line
@@ -142,6 +142,7 @@ class LeScanner(Closable):
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
        expected_event_name = scan_result.format(1)
        self.device.ed.clear_events(expected_event_name)

        # Start scanning on SL4A DUT
        self.device.sl4a.bleSetScanFilterDeviceName(name)
+1 −0
Original line number Diff line number Diff line
@@ -83,6 +83,7 @@ class Security:
    def remove_all_bonded_devices(self):
        bonded_devices = self.__device.sl4a.bluetoothGetBondedDevices()
        for device in bonded_devices:
            logging.info(device)
            self.remove_bond(device["address"])

    def remove_bond(self, address):
+168 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import binascii
import io
import logging
import os
import queue

from blueberry.facade import common_pb2 as common
from blueberry.tests.gd.cert.context import get_current_context
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.gd_sl4a.lib.ble_lib import disable_bluetooth
from blueberry.tests.gd_sl4a.lib.ble_lib import enable_bluetooth
from blueberry.tests.gd_sl4a.lib.bt_constants import ble_address_types
from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test
from blueberry.tests.sl4a_sl4a.lib.security import Security
from blueberry.utils.bt_gatt_constants import GattCallbackString
from blueberry.utils.bt_gatt_constants import GattTransport


class IrkRotationTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):

    def setup_class(self):
        super().setup_class()
        self.default_timeout = 10  # seconds

    def setup_test(self):
        assertThat(super().setup_test()).isTrue()

    def teardown_test(self):
        current_test_dir = get_current_context().get_full_output_path()
        self.cert.adb.pull([
            "/data/misc/bluetooth/logs/btsnoop_hci.log",
            os.path.join(current_test_dir, "CERT_%s_btsnoop_hci.log" % self.cert.serial)
        ])
        self.cert.adb.pull([
            "/data/misc/bluetooth/logs/btsnoop_hci.log.last",
            os.path.join(current_test_dir, "CERT_%s_btsnoop_hci.log.last" % self.cert.serial)
        ])
        super().teardown_test()
        self.cert.adb.shell("setprop bluetooth.core.gap.le.privacy.enabled \'\'")

    def _wait_for_event(self, expected_event_name, device):
        try:
            event_info = device.ed.pop_event(expected_event_name, self.default_timeout)
            logging.info(event_info)
        except queue.Empty as error:
            logging.error("Failed to find event: %s", expected_event_name)
            return False
        return True

    def __get_cert_public_address_and_irk_from_bt_config(self):
        # Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning
        bt_config_file_path = os.path.join(get_current_context().get_full_output_path(),
                                           "DUT_%s_bt_config.conf" % self.cert.serial)
        try:
            self.cert.adb.pull(["/data/misc/bluedroid/bt_config.conf", bt_config_file_path])
        except AdbError as error:
            logging.error("Failed to pull SL4A cert BT config")
            return False
        logging.debug("Reading SL4A cert BT config")
        with io.open(bt_config_file_path) as f:
            for line in f.readlines():
                stripped_line = line.strip()
                if (stripped_line.startswith("Address")):
                    address_fields = stripped_line.split(' ')
                    # API currently requires public address to be capitalized
                    address = address_fields[2].upper()
                    logging.debug("Found cert address: %s" % address)
                    continue
                if (stripped_line.startswith("LE_LOCAL_KEY_IRK")):
                    irk_fields = stripped_line.split(' ')
                    irk = irk_fields[2]
                    logging.debug("Found cert IRK: %s" % irk)
                    continue

        return address, irk

    def test_le_reconnect_after_irk_rotation_cert_privacy_enabled(self):
        self._test_le_reconnect_after_irk_rotation(True)

    def test_le_reconnect_after_irk_rotation_cert_privacy_disabled(self):
        self.cert.sl4a.bluetoothDisableBLE()
        disable_bluetooth(self.cert.sl4a, self.cert.ed)
        self.cert.adb.shell("setprop bluetooth.core.gap.le.privacy.enabled false")
        self.cert.adb.shell("device_config put bluetooth INIT_logging_debug_enabled_for_all true")
        enable_bluetooth(self.cert.sl4a, self.cert.ed)
        self.cert.sl4a.bluetoothDisableBLE()
        self._test_le_reconnect_after_irk_rotation(False)

    def _bond_remote_device(self, cert_privacy_enabled, cert_public_address):
        if cert_privacy_enabled:
            self.cert_advertiser_.advertise_public_extended_pdu()
        else:
            self.cert_advertiser_.advertise_public_extended_pdu(common.PUBLIC_DEVICE_ADDRESS)

        advertising_device_name = self.cert_advertiser_.get_local_advertising_name()
        connect_address = self.dut_scanner_.scan_for_name(advertising_device_name)

        # Bond
        logging.info("Bonding with %s", connect_address)
        self.dut_security_.create_bond_numeric_comparison(connect_address)
        self.dut_scanner_.stop_scanning()
        self.cert_advertiser_.stop_advertising()

        return connect_address

    def _test_le_reconnect_after_irk_rotation(self, cert_privacy_enabled):

        cert_public_address, irk = self.__get_cert_public_address_and_irk_from_bt_config()
        self._bond_remote_device(cert_privacy_enabled, cert_public_address)

        # Remove all bonded devices to rotate the IRK
        logging.info("Unbonding all devices")
        self.dut_security_.remove_all_bonded_devices()
        self.cert_security_.remove_all_bonded_devices()

        # Bond again
        logging.info("Rebonding remote device")
        connect_address = self._bond_remote_device(cert_privacy_enabled, cert_public_address)

        # Connect GATT
        logging.info("Connecting GATT to %s", connect_address)
        gatt_callback = self.dut.sl4a.gattCreateGattCallback()
        bluetooth_gatt = self.dut.sl4a.gattClientConnectGatt(gatt_callback, connect_address, False,
                                                             GattTransport.TRANSPORT_LE, False, None)
        assertThat(bluetooth_gatt).isNotNone()
        expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
        assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()

        # Close GATT connection
        logging.info("Closing GATT connection")
        self.dut.sl4a.gattClientClose(bluetooth_gatt)

        # Reconnect GATT
        logging.info("Reconnecting GATT")
        gatt_callback = self.dut.sl4a.gattCreateGattCallback()
        bluetooth_gatt = self.dut.sl4a.gattClientConnectGatt(gatt_callback, connect_address, False,
                                                             GattTransport.TRANSPORT_LE, False, None)
        assertThat(bluetooth_gatt).isNotNone()
        expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
        assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()

        # Disconnect GATT
        logging.info("Disconnecting GATT")
        self.dut.sl4a.gattClientDisconnect(gatt_callback)
        expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
        assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()

        # Reconnect GATT
        logging.info("Reconnecting GATT")
        self.dut.sl4a.gattClientReconnect(gatt_callback)
        expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
        assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()
Loading