Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 96b86320 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge changes from topic "oob_sl4a_cert_test"

* changes:
  OobSl4aPairingTest: remove bond and generate local oob after pair
  OobSl4aPairingTest: add advertising start/stop functions
parents bff9e091 a72e4404
Loading
Loading
Loading
Loading
+79 −36
Original line number Diff line number Diff line
@@ -64,6 +64,7 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
    SL4A_EVENT_GENERATED = "GeneratedOobData"
    SL4A_EVENT_ERROR = "ErrorOobData"
    SL4A_EVENT_BONDED = "Bonded"
    SL4A_EVENT_UNBONDED = "Unbonded"

    # Matches tBT_TRANSPORT
    # Used Strings because ints were causing gRPC problems
@@ -83,6 +84,10 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
        self.cert_security.close()
        super().teardown_test()

    def __get_test_irk(self):
        return bytes(
            bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10]))

    def _generate_sl4a_oob_data(self, transport):
        logging.info("Fetching OOB data...")
        self.dut.sl4a.bluetoothGenerateLocalOobData(transport)
@@ -109,7 +114,7 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
            return oob_data
        return None

    def set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk):
    def _set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk):
        # Random static address below, no random resolvable address at this point
        random_address_bytes = "DD:34:02:05:5C:EE".encode()
        private_policy = le_initiator_address_facade.PrivacyPolicy(
@@ -121,7 +126,50 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
        # Bluetooth MAC address must be upper case
        return random_address_bytes.decode('utf-8').upper()

    def wait_for_own_address(self):
    def __advertise_rpa_random_policy(self, legacy_pdus, irk):
        DEVICE_NAME = 'Im_The_CERT!'
        logging.info("Getting random address")
        ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk)
        logging.info("Done %s" % ADDRESS)

        # Setup cert side to advertise
        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
        config = le_advertising_facade.AdvertisingConfig(
            advertisement=[gap_data],
            interval_min=512,
            interval_max=768,
            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
            include_tx_power=True,
            connectable=True,
            legacy_pdus=legacy_pdus,
            advertising_config=config,
            secondary_advertising_phy=ble_scan_settings_phys["1m"])
        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
        logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended"))
        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
        logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended"))
        return (ADDRESS, create_response)

    def _advertise_rpa_random_legacy_pdu(self, irk):
        return self.__advertise_rpa_random_policy(True, irk)

    def _advertise_rpa_random_extended_pdu(self, irk):
        return self.__advertise_rpa_random_policy(False, irk)

    def _stop_advertising(self, advertiser_id):
        logging.info("Stop advertising")
        remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=advertiser_id)
        self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request)
        logging.info("Stopped advertising")

    def _wait_for_own_address(self):
        own_address = common.BluetoothAddress()

        def get_address(event):
@@ -134,7 +182,7 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
        assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_address)
        return own_address

    def wait_for_advertising_set_started(self):
    def _wait_for_advertising_set_started(self):
        advertising_started = False

        def get_advertising_set_started(event):
@@ -148,7 +196,7 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
        assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_advertising_set_started)
        return advertising_started

    def wait_for_yes_no_dialog(self):
    def _wait_for_yes_no_dialog(self):
        address_with_type = common.BluetoothAddressWithType()

        def get_address_with_type(event):
@@ -179,45 +227,19 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
        oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE)
        assertThat(oob_data).isNotNone()

    def test_sl4a_create_bond_out_of_band(self):
    def _bond_sl4a_cert_oob(self):
        self.cert.security.SetLeIoCapability(DISPLAY_ONLY)
        self.cert.security.SetLeOobDataPresent(OOB_NOT_PRESENT)
        self.cert.security.SetLeAuthRequirements(LeAuthRequirementsMessage(bond=1, mitm=1, secure_connections=1))

        data = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10]
        byteArrayObject = bytearray(data)
        irk = bytes(byteArrayObject)

        DEVICE_NAME = 'Im_The_CERT!'
        self.set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk)

        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
        config = le_advertising_facade.AdvertisingConfig(
            advertisement=[gap_data],
            interval_min=512,
            interval_max=768,
            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
            include_tx_power=True,
            connectable=True,
            legacy_pdus=True,
            advertising_config=config,
            secondary_advertising_phy=ble_scan_settings_phys["1m"])
        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
        RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk())

        self.wait_for_advertising_set_started()
        self._wait_for_advertising_set_started()

        get_own_address_request = le_advertising_facade.GetOwnAddressRequest(
            advertiser_id=create_response.advertiser_id)
        self.cert.hci_le_advertising_manager.GetOwnAddress(get_own_address_request)
        advertising_address = self.wait_for_own_address()
        advertising_address = self._wait_for_own_address()

        oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE)
        assertThat(oob_data).isNotNone()
@@ -226,7 +248,7 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
            advertising_address.decode("utf-8").upper(), self.TRANSPORT_LE, oob_data.confirmation_value.hex(),
            oob_data.random_value.hex())

        address_with_type = self.wait_for_yes_no_dialog()
        address_with_type = self._wait_for_yes_no_dialog()
        self.cert.security.SendUiCallback(
            UiCallbackMsg(
                message_type=UiCallbackType.PAIRING_PROMPT, boolean=True, unique_id=1, address=address_with_type))
@@ -236,7 +258,28 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
        try:
            bond_state = self.dut.ed.pop_event(self.SL4A_EVENT_BONDED, self.default_timeout)
        except queue.Empty as error:
            logging.error("Failed to generate OOB data!")
            logging.error("Failed to get bond event!")

        assertThat(bond_state).isNotNone()
        logging.info("Bonded: %s", bond_state["data"]["bonded_state"])
        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True)
        self._stop_advertising(create_response.advertiser_id)
        return (RANDOM_ADDRESS, advertising_address)

    def test_sl4a_create_bond_out_of_band(self):
        self._bond_sl4a_cert_oob()

    def test_generate_oob_after_pairing(self):
        self._bond_sl4a_cert_oob()
        assertThat(self._generate_sl4a_oob_data(self.TRANSPORT_LE)).isNotNone()

    def test_remove_bond(self):
        self.dut.sl4a.bluetoothUnbond(self._bond_sl4a_cert_oob()[1].decode().upper())
        bond_state = None
        try:
            bond_state = self.dut.ed.pop_event(self.SL4A_EVENT_UNBONDED, self.default_timeout)
        except queue.Empty as error:
            logging.error("Failed to get bond event!")

        assertThat(bond_state).isNotNone()
        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(False)