Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 45595625 authored by Treehugger Robot's avatar Treehugger Robot Committed by Automerger Merge Worker
Browse files

Merge "Avatar: Rewrite the ASHA tests in asyncronous" am: ac40b1b4

parents c83c8d80 ac40b1b4
Loading
Loading
Loading
Loading
+137 −109
Original line number Diff line number Diff line
@@ -14,18 +14,17 @@

import asyncio
import avatar
import enum
import grpc
import logging
import time

from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous
from bumble.gatt import GATT_ASHA_SERVICE
from bumble.smp import PairingDelegate
from mobly import base_test, signals, test_runner
from mobly.asserts import assert_equal  # type: ignore
from mobly.asserts import assert_in  # type: ignore
from mobly.asserts import skip  # type: ignore
from pandora._utils import Stream
from pandora._utils import AioStream
from pandora.host_pb2 import PUBLIC, RANDOM, AdvertiseResponse, Connection, DataTypes, OwnAddressType, ScanningResponse
from pandora.security_pb2 import LE_LEVEL3, LESecurityLevel
from typing import List, Optional, Tuple
@@ -36,12 +35,15 @@ CAPABILITY: int = 0x0
COMPLETE_LOCAL_NAME: str = "Bumble"


class Device:
class Ear(enum.IntEnum):
    """Reference devices type"""

    LEFT = 0
    RIGHT = 1

    def __repr__(self) -> str:
        return str(self.value)


class ASHATest(base_test.BaseTestClass):  # type: ignore[misc]
    devices: Optional[PandoraDevices] = None
@@ -74,61 +76,64 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        setattr(self.ref_left.device, "io_capability", PairingDelegate.NO_OUTPUT_NO_INPUT)
        setattr(self.ref_right.device, "io_capability", PairingDelegate.NO_OUTPUT_NO_INPUT)

    def ref_advertise_asha(
    async def ref_advertise_asha(
        self, ref_device: PandoraDevice, ref_address_type: OwnAddressType
    ) -> Stream[AdvertiseResponse]:
    ) -> AioStream[AdvertiseResponse]:
        """
        Ref device starts to advertise
        :return: Ref device's advertise response
        Ref device starts to advertise with service data in advertisement data.
        :return: Ref device's advertise stream
        """
        # Ref starts advertising with ASHA service data
        ref_device.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID)
        return ref_device.host.Advertise(
        await ref_device.aio.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID)
        return ref_device.aio.host.Advertise(
            legacy=True,
            connectable=True,
            own_address_type=ref_address_type,
            data=DataTypes(
                complete_local_name=COMPLETE_LOCAL_NAME,
                incomplete_service_class_uuids16=[ASHA_UUID],
            ),
            own_address_type=ref_address_type,
        )

    def dut_scan_for_asha(self, dut_address_type: OwnAddressType) -> ScanningResponse:
    async def dut_scan_for_asha(self, dut_address_type: OwnAddressType) -> ScanningResponse:
        """
        DUT starts to scan for the Ref device.
        :return: ScanningResponse for ASHA
        """
        scan_result = self.dut.host.Scan(own_address_type=dut_address_type)
        ref = next((x for x in scan_result if ASHA_UUID in x.data.incomplete_service_class_uuids16))
        scan_result.cancel()

        dut_scan = self.dut.aio.host.Scan(own_address_type=dut_address_type)
        ref = await anext((x async for x in dut_scan if ASHA_UUID in x.data.incomplete_service_class_uuids16))
        dut_scan.cancel()
        assert ref
        return ref

    def dut_connect_to_ref(
        self, advertisement: Stream[AdvertiseResponse], ref: ScanningResponse, dut_address_type: OwnAddressType
    async def dut_connect_to_ref(
        self, advertisement: AioStream[AdvertiseResponse], ref: ScanningResponse, dut_address_type: OwnAddressType
    ) -> Tuple[Connection, Connection]:
        """
        Helper method for Dut connects to Ref
        :return: a Tuple (DUT to REF connection, REF to DUT connection)
        """
        # DUT connects to Ref
        dut_ref = self.dut.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()).connection
        ref_dut = (next(advertisement)).connection
        assert dut_ref, ref_dut

        (dut_ref_res, ref_dut_res) = await asyncio.gather(
            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()),
            anext(aiter(advertisement)),  # pytype: disable=name-error
        )
        assert_equal(dut_ref_res.result_variant(), 'connection')
        dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection
        assert dut_ref and ref_dut
        advertisement.cancel()

        return dut_ref, ref_dut

    def is_device_connected(self, device: PandoraDevice, connection: Connection, timeout: float) -> bool:
    async def is_device_connected(self, device: PandoraDevice, connection: Connection, timeout: float) -> bool:
        try:
            device.host.WaitDisconnection(connection=connection, timeout=timeout)
            await device.aio.host.WaitDisconnection(connection=connection, timeout=timeout)
            return False
        except grpc.RpcError as e:
            assert e.code() == grpc.StatusCode.DEADLINE_EXCEEDED  # type: ignore
            return True

    def test_advertising_advertisement_data(self) -> None:
    @asynchronous
    async def test_advertising_advertisement_data(self) -> None:
        """
        Ref starts ASHA advertisements with service data in advertisement data.
        DUT starts a service discovery.
@@ -137,10 +142,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        protocol_version = 0x01
        truncated_hisyncid = HISYCNID[:4]

        advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=RANDOM)
        advertisement = await self.ref_advertise_asha(self.ref_left, RANDOM)

        # DUT starts a service discovery
        scan_result = self.dut_scan_for_asha(dut_address_type=RANDOM)
        scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM)
        advertisement.cancel()

        # Verify Ref is correctly discovered by DUT as a hearing aid device
@@ -156,7 +161,8 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
            (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(),
        )

    def test_advertising_scan_response(self) -> None:
    @asynchronous
    async def test_advertising_scan_response(self) -> None:
        """
        Ref starts ASHA advertisements with service data in scan response data.
        DUT starts a service discovery.
@@ -165,10 +171,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        protocol_version = 0x01
        truncated_hisyncid = HISYCNID[:4]

        self.ref_left.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID)
        await self.ref_left.aio.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID)

        # advertise with ASHA service data in scan response
        advertisement = self.ref_left.host.Advertise(
        advertisement = self.ref_left.aio.host.Advertise(
            legacy=True,
            scan_response_data=DataTypes(
                complete_local_name=COMPLETE_LOCAL_NAME,
@@ -176,7 +182,7 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
            ),
        )

        scan_result = self.dut_scan_for_asha(dut_address_type=RANDOM)
        scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM)
        advertisement.cancel()

        # Verify Ref is correctly discovered by DUT as a hearing aid device.
@@ -195,7 +201,8 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        (RANDOM, PUBLIC),
        (RANDOM, RANDOM),
    )  # type: ignore[misc]
    def test_pairing(
    @asynchronous
    async def test_pairing(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
@@ -205,24 +212,26 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        DUT initiates connection to Ref.
        Verify that DUT and Ref are bonded and connected.
        """
        advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)

        ref = self.dut_scan_for_asha(dut_address_type=dut_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)

        # DUT initiates connection to Ref.
        dut_ref, _ = self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        assert dut_ref
        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        assert dut_ref, ref_dut

        # DUT starts pairing with the Ref.
        secure = self.dut.security.Secure(connection=dut_ref, le=LE_LEVEL3)
        # FIXME: assert the security Level on ref side
        secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3)

        assert_equal(secure.WhichOneof("result"), "success")
        assert_equal(secure.result_variant(), 'success')

    @avatar.parameterized(
        (RANDOM, PUBLIC),
        (RANDOM, RANDOM),
    )  # type: ignore[misc]
    def test_unbonding(
    @asynchronous
    async def test_unbonding(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
@@ -233,32 +242,34 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        """
        raise signals.TestSkip("TODO: update rootcanal to retry")

        advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = self.dut_scan_for_asha(dut_address_type=dut_address_type)
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)

        dut_ref, ref_dut = self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)

        secure = self.dut.security.Secure(connection=dut_ref, le=LESecurityLevel.LE_LEVEL3)

        assert_equal(secure.WhichOneof("result"), "success")
        self.dut.host.Disconnect(dut_ref)
        self.ref_left.host.WaitDisconnection(ref_dut)
        await self.dut.aio.host.Disconnect(dut_ref)
        await self.ref_left.aio.host.WaitDisconnection(ref_dut)

        # delete the bond
        if dut_address_type == OwnAddressType.PUBLIC:
            self.dut.security_storage.DeleteBond(public=self.ref_left.address)
            await self.dut.aio.security_storage.DeleteBond(public=self.ref_left.address)
        else:
            self.dut.security_storage.DeleteBond(random=self.ref_left.random_address)
            await self.dut.aio.security_storage.DeleteBond(random=self.ref_left.random_address)

        # DUT connect to REF again
        dut_ref = (self.dut.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict())).connection
        dut_ref = (
            await self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict())
        ).connection
        # TODO very likely there is a bug in android here
        logging.debug("result should come out")

        advertisement.cancel()
        assert dut_ref

        secure = self.dut.security.Secure(connection=dut_ref, le=LESecurityLevel.LE_LEVEL3)
        secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LESecurityLevel.LE_LEVEL3)

        assert_equal(secure.WhichOneof("result"), "success")

@@ -266,23 +277,25 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        (RANDOM, RANDOM),
        (RANDOM, PUBLIC),
    )  # type: ignore[misc]
    def test_connection(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None:
    @asynchronous
    async def test_connection(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None:
        """
        DUT discovers Ref.
        DUT initiates connection to Ref.
        Verify that DUT and Ref are connected.
        """
        advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = self.dut_scan_for_asha(dut_address_type=dut_address_type)
        dut_ref, ref_dut = self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        assert dut_ref
        assert ref_dut
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)

        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        assert dut_ref, ref_dut

    @avatar.parameterized(
        (RANDOM, RANDOM),
        (RANDOM, PUBLIC),
    )  # type: ignore[misc]
    def test_disconnect_initiator(
    @asynchronous
    async def test_disconnect_initiator(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
@@ -291,17 +304,22 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        DUT initiates disconnection to Ref.
        Verify that DUT and Ref are disconnected.
        """
        advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = self.dut_scan_for_asha(dut_address_type=dut_address_type)
        dut_ref, _ = self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)

        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        assert dut_ref, ref_dut

        self.dut.host.Disconnect(connection=dut_ref)
        await asyncio.gather(
            self.dut.aio.host.Disconnect(connection=dut_ref), self.is_device_connected(self.ref_left, ref_dut, 5)
        )

    @avatar.parameterized(
        (RANDOM, RANDOM),
        (RANDOM, PUBLIC),
    )  # type: ignore[misc]
    def test_disconnect_acceptor(
    @asynchronous
    async def test_disconnect_acceptor(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
@@ -310,12 +328,15 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        Ref initiates disconnection to DUT (typically when put back in its box).
        Verify that Ref is disconnected.
        """
        advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = self.dut_scan_for_asha(dut_address_type=dut_address_type)
        dut_ref, ref_dut = self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        assert dut_ref
        assert ref_dut
        self.ref_left.host.Disconnect(connection=ref_dut)
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)

        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
        assert dut_ref, ref_dut

        await asyncio.gather(
            self.ref_left.aio.host.Disconnect(connection=ref_dut), self.is_device_connected(self.dut, dut_ref, 5)
        )

    @avatar.parameterized(
        (RANDOM, RANDOM, 0),
@@ -323,7 +344,8 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        (RANDOM, RANDOM, 1),
        (RANDOM, RANDOM, 5),
    )  # type: ignore[misc]
    def test_reconnection(
    @asynchronous
    async def test_reconnection(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
@@ -336,22 +358,23 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        Verify that DUT and Ref are connected.
        """

        def connect_and_disconnect() -> None:
            advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
            ref = self.dut_scan_for_asha(dut_address_type=dut_address_type)
            dut_ref, _ = self.dut_connect_to_ref(advertisement, ref, dut_address_type)
            self.dut.host.Disconnect(connection=dut_ref)
        async def connect_and_disconnect() -> None:
            advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
            ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
            dut_ref, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
            await self.dut.aio.host.Disconnect(connection=dut_ref)

        connect_and_disconnect()
        await connect_and_disconnect()
        # simulating reconnect interval
        time.sleep(reconnection_gap)
        connect_and_disconnect()
        await asyncio.sleep(reconnection_gap)
        await connect_and_disconnect()

    @avatar.parameterized(
        (RANDOM, RANDOM),
        (RANDOM, PUBLIC),
    )  # type: ignore[misc]
    def test_auto_connection(
    @asynchronous
    async def test_auto_connection(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
@@ -361,34 +384,39 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        Ref starts sending ASHA advertisements.
        Verify that DUT auto-connects to Ref.
        """
        advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = self.dut_scan_for_asha(dut_address_type=dut_address_type)
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type)

        # manually connect and not cancel advertisement
        dut_ref = self.dut.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()).connection
        ref_dut = next(advertisement).connection
        assert dut_ref
        assert ref_dut
        dut_ref_res, ref_dut_res = await asyncio.gather(
            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()),
            anext(aiter(advertisement)),  # pytype: disable=name-error
        )
        assert_equal(dut_ref_res.result_variant(), 'connection')
        dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection
        assert dut_ref, ref_dut

        # pairing
        secure = self.dut.security.Secure(connection=dut_ref, le=LE_LEVEL3)
        # Pairing
        # FIXME: assert that the security Level is reached on ref side
        secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3)
        assert_equal(secure.WhichOneof("result"), "success")

        self.ref_left.host.Disconnect(connection=ref_dut)
        await self.ref_left.aio.host.Disconnect(connection=ref_dut)

        ref_dut = next(advertisement).connection
        ref_dut = (await anext(aiter(advertisement))).connection
        advertisement.cancel()
        assert ref_dut

    @avatar.parameterized(
        (RANDOM, RANDOM, Device.LEFT),
        (RANDOM, PUBLIC, Device.RIGHT),
        (RANDOM, RANDOM, Ear.LEFT),
        (RANDOM, PUBLIC, Ear.RIGHT),
    )  # type: ignore[misc]
    def test_disconnect_acceptor_dual_device(
    @asynchronous
    async def test_disconnect_acceptor_dual_device(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
        disconnect_device: Device,
        disconnect_device: Ear,
    ) -> None:
        """
        Prerequisites: DUT and Ref are connected and bonded.
@@ -397,32 +425,32 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
           2. Verify that it is disconnected and that the other peripheral is still connected.
        """

        advertisement_left = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref_left = self.dut_scan_for_asha(dut_address_type=dut_address_type)
        dut_ref_left, ref_left_dut = self.dut_connect_to_ref(
        advertisement_left = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type)
        ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        dut_ref_left, ref_left_dut = await self.dut_connect_to_ref(
            advertisement=advertisement_left, ref=ref_left, dut_address_type=dut_address_type
        )
        advertisement_left.cancel()
        assert dut_ref_left
        assert ref_left_dut
        assert dut_ref_left, ref_left_dut

        advertisement_right = self.ref_advertise_asha(ref_device=self.ref_right, ref_address_type=ref_address_type)
        ref_right = self.dut_scan_for_asha(dut_address_type=dut_address_type)
        dut_ref_right, ref_right_dut = self.dut_connect_to_ref(
        advertisement_right = await self.ref_advertise_asha(
            ref_device=self.ref_right, ref_address_type=ref_address_type
        )
        ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type)
        dut_ref_right, ref_right_dut = await self.dut_connect_to_ref(
            advertisement=advertisement_right, ref=ref_right, dut_address_type=dut_address_type
        )
        advertisement_right.cancel()
        assert dut_ref_right
        assert ref_right_dut
        assert dut_ref_right, ref_right_dut

        if disconnect_device == Device.LEFT:
            self.ref_left.host.Disconnect(connection=ref_left_dut)
            assert self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0) == True
            assert self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0) == False
        if disconnect_device == Ear.LEFT:
            await self.ref_left.aio.host.Disconnect(connection=ref_left_dut)
            assert await self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0)
            assert not await self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0)
        else:
            self.ref_right.host.Disconnect(connection=ref_right_dut)
            assert self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0) == False
            assert self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0) == True
            await self.ref_right.aio.host.Disconnect(connection=ref_right_dut)
            assert not await self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0)
            assert await self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0)


if __name__ == "__main__":