Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6f89f10e authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Avatar: Add ASHA dual device test - test_pairing_dual_device Test:...

Merge "Avatar: Add ASHA dual device test - test_pairing_dual_device Test: avatar run --mobly-std-log --include-filter 'ASHATest' Bug: 254077091"
parents 7688c8d8 7a854c6d
Loading
Loading
Loading
Loading
+145 −52
Original line number Diff line number Diff line
@@ -33,7 +33,6 @@ from typing import List, Optional, Tuple

ASHA_UUID = GATT_ASHA_SERVICE.to_hex_str()
HISYCNID: List[int] = [0x01, 0x02, 0x03, 0x04, 0x5, 0x6, 0x7, 0x8]
CAPABILITY: int = 0x0
COMPLETE_LOCAL_NAME: str = "Bumble"


@@ -84,7 +83,7 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        setattr(self.ref_right.device, "io_capability", PairingDelegate.NO_OUTPUT_NO_INPUT)

    async def ref_advertise_asha(
        self, ref_device: PandoraDevice, ref_address_type: OwnAddressType
        self, ref_device: PandoraDevice, ref_address_type: OwnAddressType, ear: Ear
    ) -> AioStream[AdvertiseResponse]:
        """
        Ref device starts to advertise with service data in advertisement data.
@@ -92,7 +91,7 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        """
        # Ref starts advertising with ASHA service data
        asha = AioAsha(ref_device.aio.channel)
        await asha.Register(capability=CAPABILITY, hisyncid=HISYCNID)
        await asha.Register(capability=ear, hisyncid=HISYCNID)
        return ref_device.aio.host.Advertise(
            legacy=True,
            connectable=True,
@@ -103,13 +102,23 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
            ),
        )

    async def dut_scan_for_asha(self, dut_address_type: OwnAddressType) -> ScanningResponse:
    async def dut_scan_for_asha(self, dut_address_type: OwnAddressType, ear: Ear) -> ScanningResponse:
        """
        DUT starts to scan for the Ref device.
        :return: ScanningResponse for ASHA
        """
        dut_scan = self.dut.aio.host.Scan(own_address_type=dut_address_type)
        ref = await anext((x async for x in dut_scan if ASHA_UUID in x.data.incomplete_service_class_uuids16))
        expected_advertisement_data = self.get_expected_advertisement_data(ear)
        ref = await anext(
            (
                x
                async for x in dut_scan
                if (
                    ASHA_UUID in x.data.incomplete_service_class_uuids16
                    and expected_advertisement_data == (x.data.service_data_uuid16[ASHA_UUID]).hex()
                )
            )
        )
        dut_scan.cancel()
        assert ref
        return ref
@@ -140,30 +149,40 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
            assert e.code() == grpc.StatusCode.DEADLINE_EXCEEDED  # type: ignore
            return True

    def get_expected_advertisement_data(self, ear: Ear):
        protocol_version = 0x01
        truncated_hisyncid = HISYCNID[:4]
        return (
            "{:02x}".format(protocol_version)
            + "{:02x}".format(ear)
            + "".join([("{:02x}".format(x)) for x in truncated_hisyncid])
        )

    @avatar.parameterized(
        (RANDOM, Ear.LEFT),
        (RANDOM, Ear.RIGHT),
    )  # type: ignore[misc]
    @asynchronous
    async def test_advertising_advertisement_data(self) -> None:
    async def test_advertising_advertisement_data(
        self,
        ref_address_type: OwnAddressType,
        ear: Ear,
    ) -> None:
        """
        Ref starts ASHA advertisements with service data in advertisement data.
        DUT starts a service discovery.
        Verify Ref is correctly discovered by DUT as a hearing aid device.
        """
        protocol_version = 0x01
        truncated_hisyncid = HISYCNID[:4]

        advertisement = await self.ref_advertise_asha(self.ref_left, RANDOM)
        advertisement = await self.ref_advertise_asha(self.ref_left, ref_address_type, ear)

        # DUT starts a service discovery
        scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM)
        scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
        advertisement.cancel()

        # Verify Ref is correctly discovered by DUT as a hearing aid device
        assert_in(ASHA_UUID, scan_result.data.service_data_uuid16)
        assert_equal(type(scan_result.data.complete_local_name), str)
        expected_advertisement_data = (
            "{:02x}".format(protocol_version)
            + "{:02x}".format(CAPABILITY)
            + "".join([("{:02x}".format(x)) for x in truncated_hisyncid])
        )
        expected_advertisement_data = self.get_expected_advertisement_data(ear)
        assert_equal(
            expected_advertisement_data,
            (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(),
@@ -176,11 +195,8 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        DUT starts a service discovery.
        Verify Ref is correctly discovered by DUT as a hearing aid device.
        """
        protocol_version = 0x01
        truncated_hisyncid = HISYCNID[:4]

        asha = AioAsha(self.ref_left.aio.channel)
        await asha.Register(capability=CAPABILITY, hisyncid=HISYCNID)
        await asha.Register(capability=Ear.LEFT, hisyncid=HISYCNID)

        # advertise with ASHA service data in scan response
        advertisement = self.ref_left.aio.host.Advertise(
@@ -191,16 +207,12 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
            ),
        )

        scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM)
        scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT)
        advertisement.cancel()

        # Verify Ref is correctly discovered by DUT as a hearing aid device.
        assert_in(ASHA_UUID, scan_result.data.service_data_uuid16)
        expected_advertisement_data = (
            "{:02x}".format(protocol_version)
            + "{:02x}".format(CAPABILITY)
            + "".join([("{:02x}".format(x)) for x in truncated_hisyncid])
        )
        expected_advertisement_data = self.get_expected_advertisement_data(Ear.LEFT)
        assert_equal(
            expected_advertisement_data,
            (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(),
@@ -221,19 +233,84 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        DUT initiates connection to Ref.
        Verify that DUT and Ref are bonded and connected.
        """
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        advertisement = await self.ref_advertise_asha(
            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
        )

        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)

        # DUT initiates connection to Ref.
        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        assert dut_ref, ref_dut

        # DUT starts pairing with the Ref.
        # FIXME: assert the security Level on ref side
        secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3)
        (secure, wait_security) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
        )

        assert_equal(secure.result_variant(), 'success')
        assert_equal(wait_security.result_variant(), 'success')

    @avatar.parameterized(
        (RANDOM, PUBLIC),
        (RANDOM, RANDOM),
    )  # type: ignore[misc]
    @asynchronous
    async def test_pairing_dual_device(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
    ) -> None:
        """
        DUT discovers Ref.
        DUT initiates connection to Ref.
        Verify that DUT and Ref are bonded and connected.
        """

        async def ref_device_connect(ref_device, ear):
            advertisement = await self.ref_advertise_asha(
                ref_device=ref_device, ref_address_type=ref_address_type, ear=ear
            )
            expected_data = self.get_expected_advertisement_data(ear)
            ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=ear)
            # DUT initiates connection to ref_device.
            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
            assert dut_ref, ref_dut
            advertisement.cancel()

            return dut_ref, ref_dut

        ((dut_ref_left, ref_left_dut), (dut_ref_right, ref_right_dut)) = await asyncio.gather(
            ref_device_connect(self.ref_left, Ear.LEFT), ref_device_connect(self.ref_right, Ear.RIGHT)
        )

        # DUT starts pairing with the ref_left
        (secure_left, wait_security_left) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
        )

        assert_equal(secure_left.result_variant(), 'success')
        assert_equal(wait_security_left.result_variant(), 'success')

        # DUT starts pairing with the ref_right
        (secure_right, wait_security_right) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3),
            self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3),
        )

        assert_equal(secure_right.result_variant(), 'success')
        assert_equal(wait_security_right.result_variant(), 'success')

        await asyncio.gather(
            self.ref_left.aio.host.Disconnect(connection=ref_left_dut),
            self.dut.aio.host.WaitDisconnection(connection=dut_ref_left),
        )
        await asyncio.gather(
            self.ref_right.aio.host.Disconnect(connection=ref_right_dut),
            self.dut.aio.host.WaitDisconnection(connection=dut_ref_right),
        )

    @avatar.parameterized(
        (RANDOM, PUBLIC),
@@ -251,8 +328,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        """
        raise signals.TestSkip("TODO: update rootcanal to retry")

        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        advertisement = await self.ref_advertise_asha(
            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
        )
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)

        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)

@@ -293,8 +372,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        DUT initiates connection to Ref.
        Verify that DUT and Ref are connected.
        """
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        advertisement = await self.ref_advertise_asha(
            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
        )
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)

        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        assert dut_ref, ref_dut
@@ -313,8 +394,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        DUT initiates disconnection to Ref.
        Verify that DUT and Ref are disconnected.
        """
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        advertisement = await self.ref_advertise_asha(
            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
        )
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)

        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        assert dut_ref, ref_dut
@@ -337,8 +420,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        Ref initiates disconnection to DUT (typically when put back in its box).
        Verify that Ref is disconnected.
        """
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        advertisement = await self.ref_advertise_asha(
            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
        )
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)

        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        assert dut_ref, ref_dut
@@ -368,8 +453,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        """

        async def connect_and_disconnect() -> None:
            advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
            ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
            advertisement = await self.ref_advertise_asha(
                ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
            )
            ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
            dut_ref, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
            await self.dut.aio.host.Disconnect(connection=dut_ref)

@@ -393,8 +480,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        Ref starts sending ASHA advertisements.
        Verify that DUT auto-connects to Ref.
        """
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        advertisement = await self.ref_advertise_asha(
            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
        )
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)

        # manually connect and not cancel advertisement
        dut_ref_res, ref_dut_res = await asyncio.gather(
@@ -437,8 +526,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
           2. Verify that it is disconnected and that the other peripheral is still connected.
        """

        advertisement_left = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        advertisement_left = await self.ref_advertise_asha(
            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
        )
        ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
        dut_ref_left, ref_left_dut = await self.dut_connect_to_ref(
            advertisement=advertisement_left, ref=ref_left, dut_address_type=dut_address_type
        )
@@ -446,9 +537,9 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        assert dut_ref_left, ref_left_dut

        advertisement_right = await self.ref_advertise_asha(
            ref_device=self.ref_right, ref_address_type=ref_address_type
            ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT
        )
        ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.RIGHT)
        dut_ref_right, ref_right_dut = await self.dut_connect_to_ref(
            advertisement=advertisement_right, ref=ref_right, dut_address_type=dut_address_type
        )
@@ -482,8 +573,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
           3. Verify that DUT auto-connects to the peripheral.
        """

        advertisement_left = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        advertisement_left = await self.ref_advertise_asha(
            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
        )
        ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
        (dut_ref_left_res, ref_left_dut_res) = await asyncio.gather(
            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref_left.address_asdict()),
            anext(aiter(advertisement_left)),  # pytype: disable=name-error
@@ -494,9 +587,9 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        advertisement_left.cancel()

        advertisement_right = await self.ref_advertise_asha(
            ref_device=self.ref_right, ref_address_type=ref_address_type
            ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT
        )
        ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.RIGHT)
        (dut_ref_right_res, ref_right_dut_res) = await asyncio.gather(
            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref_right.address_asdict()),
            anext(aiter(advertisement_right)),  # pytype: disable=name-error
@@ -529,7 +622,7 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
            assert not await self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0)

            advertisement_left = await self.ref_advertise_asha(
                ref_device=self.ref_left, ref_address_type=ref_address_type
                ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
            )
            ref_left_dut = (await anext(aiter(advertisement_left))).connection
            advertisement_left.cancel()
@@ -542,7 +635,7 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
            assert not await self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0)

            advertisement_right = await self.ref_advertise_asha(
                ref_device=self.ref_right, ref_address_type=ref_address_type
                ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT
            )
            ref_right_dut = (await anext(aiter(advertisement_right))).connection
            advertisement_right.cancel()