Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1b0322b9 authored by Martin Brabham's avatar Martin Brabham
Browse files

OobSl4aPairingTest: add advertising start/stop functions

Add '_' prefix to non test methods

Also, stop advertising after pairing success.

Bug: 229093268
Test: gd/cert/run --device --sl4a OobSl4aPairingTest
Tag: #stability
Change-Id: Ied56ae44ed6660caee022eab3b2e2c764cf92c87
parent 77c7617e
Loading
Loading
Loading
Loading
+56 −34
Original line number Diff line number Diff line
@@ -83,6 +83,10 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
        self.cert_security.close()
        super().teardown_test()

    def __get_test_irk(self):
        return bytes(
            bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10]))

    def _generate_sl4a_oob_data(self, transport):
        logging.info("Fetching OOB data...")
        self.dut.sl4a.bluetoothGenerateLocalOobData(transport)
@@ -109,7 +113,7 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
            return oob_data
        return None

    def set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk):
    def _set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk):
        # Random static address below, no random resolvable address at this point
        random_address_bytes = "DD:34:02:05:5C:EE".encode()
        private_policy = le_initiator_address_facade.PrivacyPolicy(
@@ -121,7 +125,50 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
        # Bluetooth MAC address must be upper case
        return random_address_bytes.decode('utf-8').upper()

    def wait_for_own_address(self):
    def __advertise_rpa_random_policy(self, legacy_pdus, irk):
        DEVICE_NAME = 'Im_The_CERT!'
        logging.info("Getting public address")
        ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk)
        logging.info("Done %s" % ADDRESS)

        # Setup cert side to advertise
        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
        config = le_advertising_facade.AdvertisingConfig(
            advertisement=[gap_data],
            interval_min=512,
            interval_max=768,
            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
            include_tx_power=True,
            connectable=True,
            legacy_pdus=legacy_pdus,
            advertising_config=config,
            secondary_advertising_phy=ble_scan_settings_phys["1m"])
        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
        logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended"))
        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
        logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended"))
        return (ADDRESS, create_response)

    def _advertise_rpa_random_legacy_pdu(self, irk):
        return self.__advertise_rpa_random_policy(True, irk)

    def _advertise_rpa_random_extended_pdu(self, irk):
        return self.__advertise_rpa_random_policy(False, irk)

    def _stop_advertising(self, advertiser_id):
        logging.info("Stop advertising")
        remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=advertiser_id)
        self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request)
        logging.info("Stopped advertising")

    def _wait_for_own_address(self):
        own_address = common.BluetoothAddress()

        def get_address(event):
@@ -134,7 +181,7 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
        assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_address)
        return own_address

    def wait_for_advertising_set_started(self):
    def _wait_for_advertising_set_started(self):
        advertising_started = False

        def get_advertising_set_started(event):
@@ -148,7 +195,7 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
        assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_advertising_set_started)
        return advertising_started

    def wait_for_yes_no_dialog(self):
    def _wait_for_yes_no_dialog(self):
        address_with_type = common.BluetoothAddressWithType()

        def get_address_with_type(event):
@@ -184,40 +231,14 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
        self.cert.security.SetLeOobDataPresent(OOB_NOT_PRESENT)
        self.cert.security.SetLeAuthRequirements(LeAuthRequirementsMessage(bond=1, mitm=1, secure_connections=1))

        data = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10]
        byteArrayObject = bytearray(data)
        irk = bytes(byteArrayObject)

        DEVICE_NAME = 'Im_The_CERT!'
        self.set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk)

        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
        config = le_advertising_facade.AdvertisingConfig(
            advertisement=[gap_data],
            interval_min=512,
            interval_max=768,
            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
            include_tx_power=True,
            connectable=True,
            legacy_pdus=True,
            advertising_config=config,
            secondary_advertising_phy=ble_scan_settings_phys["1m"])
        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
        RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk())

        self.wait_for_advertising_set_started()
        self._wait_for_advertising_set_started()

        get_own_address_request = le_advertising_facade.GetOwnAddressRequest(
            advertiser_id=create_response.advertiser_id)
        self.cert.hci_le_advertising_manager.GetOwnAddress(get_own_address_request)
        advertising_address = self.wait_for_own_address()
        advertising_address = self._wait_for_own_address()

        oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE)
        assertThat(oob_data).isNotNone()
@@ -226,7 +247,7 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
            advertising_address.decode("utf-8").upper(), self.TRANSPORT_LE, oob_data.confirmation_value.hex(),
            oob_data.random_value.hex())

        address_with_type = self.wait_for_yes_no_dialog()
        address_with_type = self._wait_for_yes_no_dialog()
        self.cert.security.SendUiCallback(
            UiCallbackMsg(
                message_type=UiCallbackType.PAIRING_PROMPT, boolean=True, unique_id=1, address=address_with_type))
@@ -240,3 +261,4 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):

        assertThat(bond_state).isNotNone()
        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True)
        self._stop_advertising(create_response.advertiser_id)