Loading android/pandora/test/asha_test.py +137 −109 Original line number Diff line number Diff line Loading @@ -14,18 +14,17 @@ import asyncio import avatar import enum import grpc import logging import time from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous from bumble.gatt import GATT_ASHA_SERVICE from bumble.smp import PairingDelegate from mobly import base_test, signals, test_runner from mobly.asserts import assert_equal # type: ignore from mobly.asserts import assert_in # type: ignore from mobly.asserts import skip # type: ignore from pandora._utils import Stream from pandora._utils import AioStream from pandora.host_pb2 import PUBLIC, RANDOM, AdvertiseResponse, Connection, DataTypes, OwnAddressType, ScanningResponse from pandora.security_pb2 import LE_LEVEL3, LESecurityLevel from typing import List, Optional, Tuple Loading @@ -36,12 +35,15 @@ CAPABILITY: int = 0x0 COMPLETE_LOCAL_NAME: str = "Bumble" class Device: class Ear(enum.IntEnum): """Reference devices type""" LEFT = 0 RIGHT = 1 def __repr__(self) -> str: return str(self.value) class ASHATest(base_test.BaseTestClass): # type: ignore[misc] devices: Optional[PandoraDevices] = None Loading Loading @@ -74,61 +76,64 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] setattr(self.ref_left.device, "io_capability", PairingDelegate.NO_OUTPUT_NO_INPUT) setattr(self.ref_right.device, "io_capability", PairingDelegate.NO_OUTPUT_NO_INPUT) def ref_advertise_asha( async def ref_advertise_asha( self, ref_device: PandoraDevice, ref_address_type: OwnAddressType ) -> Stream[AdvertiseResponse]: ) -> AioStream[AdvertiseResponse]: """ Ref device starts to advertise :return: Ref device's advertise response Ref device starts to advertise with service data in advertisement data. :return: Ref device's advertise stream """ # Ref starts advertising with ASHA service data ref_device.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID) return ref_device.host.Advertise( await ref_device.aio.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID) return ref_device.aio.host.Advertise( legacy=True, connectable=True, own_address_type=ref_address_type, data=DataTypes( complete_local_name=COMPLETE_LOCAL_NAME, incomplete_service_class_uuids16=[ASHA_UUID], ), own_address_type=ref_address_type, ) def dut_scan_for_asha(self, dut_address_type: OwnAddressType) -> ScanningResponse: async def dut_scan_for_asha(self, dut_address_type: OwnAddressType) -> ScanningResponse: """ DUT starts to scan for the Ref device. :return: ScanningResponse for ASHA """ scan_result = self.dut.host.Scan(own_address_type=dut_address_type) ref = next((x for x in scan_result if ASHA_UUID in x.data.incomplete_service_class_uuids16)) scan_result.cancel() dut_scan = self.dut.aio.host.Scan(own_address_type=dut_address_type) ref = await anext((x async for x in dut_scan if ASHA_UUID in x.data.incomplete_service_class_uuids16)) dut_scan.cancel() assert ref return ref def dut_connect_to_ref( self, advertisement: Stream[AdvertiseResponse], ref: ScanningResponse, dut_address_type: OwnAddressType async def dut_connect_to_ref( self, advertisement: AioStream[AdvertiseResponse], ref: ScanningResponse, dut_address_type: OwnAddressType ) -> Tuple[Connection, Connection]: """ Helper method for Dut connects to Ref :return: a Tuple (DUT to REF connection, REF to DUT connection) """ # DUT connects to Ref dut_ref = self.dut.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()).connection ref_dut = (next(advertisement)).connection assert dut_ref, ref_dut (dut_ref_res, ref_dut_res) = await asyncio.gather( self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()), anext(aiter(advertisement)), # pytype: disable=name-error ) assert_equal(dut_ref_res.result_variant(), 'connection') dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection assert dut_ref and ref_dut advertisement.cancel() return dut_ref, ref_dut def is_device_connected(self, device: PandoraDevice, connection: Connection, timeout: float) -> bool: async def is_device_connected(self, device: PandoraDevice, connection: Connection, timeout: float) -> bool: try: device.host.WaitDisconnection(connection=connection, timeout=timeout) await device.aio.host.WaitDisconnection(connection=connection, timeout=timeout) return False except grpc.RpcError as e: assert e.code() == grpc.StatusCode.DEADLINE_EXCEEDED # type: ignore return True def test_advertising_advertisement_data(self) -> None: @asynchronous async def test_advertising_advertisement_data(self) -> None: """ Ref starts ASHA advertisements with service data in advertisement data. DUT starts a service discovery. Loading @@ -137,10 +142,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] protocol_version = 0x01 truncated_hisyncid = HISYCNID[:4] advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=RANDOM) advertisement = await self.ref_advertise_asha(self.ref_left, RANDOM) # DUT starts a service discovery scan_result = self.dut_scan_for_asha(dut_address_type=RANDOM) scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM) advertisement.cancel() # Verify Ref is correctly discovered by DUT as a hearing aid device Loading @@ -156,7 +161,8 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(), ) def test_advertising_scan_response(self) -> None: @asynchronous async def test_advertising_scan_response(self) -> None: """ Ref starts ASHA advertisements with service data in scan response data. DUT starts a service discovery. Loading @@ -165,10 +171,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] protocol_version = 0x01 truncated_hisyncid = HISYCNID[:4] self.ref_left.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID) await self.ref_left.aio.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID) # advertise with ASHA service data in scan response advertisement = self.ref_left.host.Advertise( advertisement = self.ref_left.aio.host.Advertise( legacy=True, scan_response_data=DataTypes( complete_local_name=COMPLETE_LOCAL_NAME, Loading @@ -176,7 +182,7 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] ), ) scan_result = self.dut_scan_for_asha(dut_address_type=RANDOM) scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM) advertisement.cancel() # Verify Ref is correctly discovered by DUT as a hearing aid device. Loading @@ -195,7 +201,8 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] (RANDOM, PUBLIC), (RANDOM, RANDOM), ) # type: ignore[misc] def test_pairing( @asynchronous async def test_pairing( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -205,24 +212,26 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] DUT initiates connection to Ref. Verify that DUT and Ref are bonded and connected. """ advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) # DUT initiates connection to Ref. dut_ref, _ = self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref, ref_dut # DUT starts pairing with the Ref. secure = self.dut.security.Secure(connection=dut_ref, le=LE_LEVEL3) # FIXME: assert the security Level on ref side secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3) assert_equal(secure.WhichOneof("result"), "success") assert_equal(secure.result_variant(), 'success') @avatar.parameterized( (RANDOM, PUBLIC), (RANDOM, RANDOM), ) # type: ignore[misc] def test_unbonding( @asynchronous async def test_unbonding( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -233,32 +242,34 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] """ raise signals.TestSkip("TODO: update rootcanal to retry") advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = self.dut_connect_to_ref(advertisement, ref, dut_address_type) dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) secure = self.dut.security.Secure(connection=dut_ref, le=LESecurityLevel.LE_LEVEL3) assert_equal(secure.WhichOneof("result"), "success") self.dut.host.Disconnect(dut_ref) self.ref_left.host.WaitDisconnection(ref_dut) await self.dut.aio.host.Disconnect(dut_ref) await self.ref_left.aio.host.WaitDisconnection(ref_dut) # delete the bond if dut_address_type == OwnAddressType.PUBLIC: self.dut.security_storage.DeleteBond(public=self.ref_left.address) await self.dut.aio.security_storage.DeleteBond(public=self.ref_left.address) else: self.dut.security_storage.DeleteBond(random=self.ref_left.random_address) await self.dut.aio.security_storage.DeleteBond(random=self.ref_left.random_address) # DUT connect to REF again dut_ref = (self.dut.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict())).connection dut_ref = ( await self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()) ).connection # TODO very likely there is a bug in android here logging.debug("result should come out") advertisement.cancel() assert dut_ref secure = self.dut.security.Secure(connection=dut_ref, le=LESecurityLevel.LE_LEVEL3) secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LESecurityLevel.LE_LEVEL3) assert_equal(secure.WhichOneof("result"), "success") Loading @@ -266,23 +277,25 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] (RANDOM, RANDOM), (RANDOM, PUBLIC), ) # type: ignore[misc] def test_connection(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None: @asynchronous async def test_connection(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None: """ DUT discovers Ref. DUT initiates connection to Ref. Verify that DUT and Ref are connected. """ advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref assert ref_dut advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref, ref_dut @avatar.parameterized( (RANDOM, RANDOM), (RANDOM, PUBLIC), ) # type: ignore[misc] def test_disconnect_initiator( @asynchronous async def test_disconnect_initiator( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -291,17 +304,22 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] DUT initiates disconnection to Ref. Verify that DUT and Ref are disconnected. """ advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, _ = self.dut_connect_to_ref(advertisement, ref, dut_address_type) advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref, ref_dut self.dut.host.Disconnect(connection=dut_ref) await asyncio.gather( self.dut.aio.host.Disconnect(connection=dut_ref), self.is_device_connected(self.ref_left, ref_dut, 5) ) @avatar.parameterized( (RANDOM, RANDOM), (RANDOM, PUBLIC), ) # type: ignore[misc] def test_disconnect_acceptor( @asynchronous async def test_disconnect_acceptor( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -310,12 +328,15 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] Ref initiates disconnection to DUT (typically when put back in its box). Verify that Ref is disconnected. """ advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref assert ref_dut self.ref_left.host.Disconnect(connection=ref_dut) advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref, ref_dut await asyncio.gather( self.ref_left.aio.host.Disconnect(connection=ref_dut), self.is_device_connected(self.dut, dut_ref, 5) ) @avatar.parameterized( (RANDOM, RANDOM, 0), Loading @@ -323,7 +344,8 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] (RANDOM, RANDOM, 1), (RANDOM, RANDOM, 5), ) # type: ignore[misc] def test_reconnection( @asynchronous async def test_reconnection( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -336,22 +358,23 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] Verify that DUT and Ref are connected. """ def connect_and_disconnect() -> None: advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, _ = self.dut_connect_to_ref(advertisement, ref, dut_address_type) self.dut.host.Disconnect(connection=dut_ref) async def connect_and_disconnect() -> None: advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) await self.dut.aio.host.Disconnect(connection=dut_ref) connect_and_disconnect() await connect_and_disconnect() # simulating reconnect interval time.sleep(reconnection_gap) connect_and_disconnect() await asyncio.sleep(reconnection_gap) await connect_and_disconnect() @avatar.parameterized( (RANDOM, RANDOM), (RANDOM, PUBLIC), ) # type: ignore[misc] def test_auto_connection( @asynchronous async def test_auto_connection( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -361,34 +384,39 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] Ref starts sending ASHA advertisements. Verify that DUT auto-connects to Ref. """ advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) # manually connect and not cancel advertisement dut_ref = self.dut.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()).connection ref_dut = next(advertisement).connection assert dut_ref assert ref_dut dut_ref_res, ref_dut_res = await asyncio.gather( self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()), anext(aiter(advertisement)), # pytype: disable=name-error ) assert_equal(dut_ref_res.result_variant(), 'connection') dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection assert dut_ref, ref_dut # pairing secure = self.dut.security.Secure(connection=dut_ref, le=LE_LEVEL3) # Pairing # FIXME: assert that the security Level is reached on ref side secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3) assert_equal(secure.WhichOneof("result"), "success") self.ref_left.host.Disconnect(connection=ref_dut) await self.ref_left.aio.host.Disconnect(connection=ref_dut) ref_dut = next(advertisement).connection ref_dut = (await anext(aiter(advertisement))).connection advertisement.cancel() assert ref_dut @avatar.parameterized( (RANDOM, RANDOM, Device.LEFT), (RANDOM, PUBLIC, Device.RIGHT), (RANDOM, RANDOM, Ear.LEFT), (RANDOM, PUBLIC, Ear.RIGHT), ) # type: ignore[misc] def test_disconnect_acceptor_dual_device( @asynchronous async def test_disconnect_acceptor_dual_device( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, disconnect_device: Device, disconnect_device: Ear, ) -> None: """ Prerequisites: DUT and Ref are connected and bonded. Loading @@ -397,32 +425,32 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] 2. Verify that it is disconnected and that the other peripheral is still connected. """ advertisement_left = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref_left = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref_left, ref_left_dut = self.dut_connect_to_ref( advertisement_left = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref_left, ref_left_dut = await self.dut_connect_to_ref( advertisement=advertisement_left, ref=ref_left, dut_address_type=dut_address_type ) advertisement_left.cancel() assert dut_ref_left assert ref_left_dut assert dut_ref_left, ref_left_dut advertisement_right = self.ref_advertise_asha(ref_device=self.ref_right, ref_address_type=ref_address_type) ref_right = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref_right, ref_right_dut = self.dut_connect_to_ref( advertisement_right = await self.ref_advertise_asha( ref_device=self.ref_right, ref_address_type=ref_address_type ) ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref_right, ref_right_dut = await self.dut_connect_to_ref( advertisement=advertisement_right, ref=ref_right, dut_address_type=dut_address_type ) advertisement_right.cancel() assert dut_ref_right assert ref_right_dut assert dut_ref_right, ref_right_dut if disconnect_device == Device.LEFT: self.ref_left.host.Disconnect(connection=ref_left_dut) assert self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0) == True assert self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0) == False if disconnect_device == Ear.LEFT: await self.ref_left.aio.host.Disconnect(connection=ref_left_dut) assert await self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0) assert not await self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0) else: self.ref_right.host.Disconnect(connection=ref_right_dut) assert self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0) == False assert self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0) == True await self.ref_right.aio.host.Disconnect(connection=ref_right_dut) assert not await self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0) assert await self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0) if __name__ == "__main__": Loading Loading
android/pandora/test/asha_test.py +137 −109 Original line number Diff line number Diff line Loading @@ -14,18 +14,17 @@ import asyncio import avatar import enum import grpc import logging import time from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous from bumble.gatt import GATT_ASHA_SERVICE from bumble.smp import PairingDelegate from mobly import base_test, signals, test_runner from mobly.asserts import assert_equal # type: ignore from mobly.asserts import assert_in # type: ignore from mobly.asserts import skip # type: ignore from pandora._utils import Stream from pandora._utils import AioStream from pandora.host_pb2 import PUBLIC, RANDOM, AdvertiseResponse, Connection, DataTypes, OwnAddressType, ScanningResponse from pandora.security_pb2 import LE_LEVEL3, LESecurityLevel from typing import List, Optional, Tuple Loading @@ -36,12 +35,15 @@ CAPABILITY: int = 0x0 COMPLETE_LOCAL_NAME: str = "Bumble" class Device: class Ear(enum.IntEnum): """Reference devices type""" LEFT = 0 RIGHT = 1 def __repr__(self) -> str: return str(self.value) class ASHATest(base_test.BaseTestClass): # type: ignore[misc] devices: Optional[PandoraDevices] = None Loading Loading @@ -74,61 +76,64 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] setattr(self.ref_left.device, "io_capability", PairingDelegate.NO_OUTPUT_NO_INPUT) setattr(self.ref_right.device, "io_capability", PairingDelegate.NO_OUTPUT_NO_INPUT) def ref_advertise_asha( async def ref_advertise_asha( self, ref_device: PandoraDevice, ref_address_type: OwnAddressType ) -> Stream[AdvertiseResponse]: ) -> AioStream[AdvertiseResponse]: """ Ref device starts to advertise :return: Ref device's advertise response Ref device starts to advertise with service data in advertisement data. :return: Ref device's advertise stream """ # Ref starts advertising with ASHA service data ref_device.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID) return ref_device.host.Advertise( await ref_device.aio.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID) return ref_device.aio.host.Advertise( legacy=True, connectable=True, own_address_type=ref_address_type, data=DataTypes( complete_local_name=COMPLETE_LOCAL_NAME, incomplete_service_class_uuids16=[ASHA_UUID], ), own_address_type=ref_address_type, ) def dut_scan_for_asha(self, dut_address_type: OwnAddressType) -> ScanningResponse: async def dut_scan_for_asha(self, dut_address_type: OwnAddressType) -> ScanningResponse: """ DUT starts to scan for the Ref device. :return: ScanningResponse for ASHA """ scan_result = self.dut.host.Scan(own_address_type=dut_address_type) ref = next((x for x in scan_result if ASHA_UUID in x.data.incomplete_service_class_uuids16)) scan_result.cancel() dut_scan = self.dut.aio.host.Scan(own_address_type=dut_address_type) ref = await anext((x async for x in dut_scan if ASHA_UUID in x.data.incomplete_service_class_uuids16)) dut_scan.cancel() assert ref return ref def dut_connect_to_ref( self, advertisement: Stream[AdvertiseResponse], ref: ScanningResponse, dut_address_type: OwnAddressType async def dut_connect_to_ref( self, advertisement: AioStream[AdvertiseResponse], ref: ScanningResponse, dut_address_type: OwnAddressType ) -> Tuple[Connection, Connection]: """ Helper method for Dut connects to Ref :return: a Tuple (DUT to REF connection, REF to DUT connection) """ # DUT connects to Ref dut_ref = self.dut.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()).connection ref_dut = (next(advertisement)).connection assert dut_ref, ref_dut (dut_ref_res, ref_dut_res) = await asyncio.gather( self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()), anext(aiter(advertisement)), # pytype: disable=name-error ) assert_equal(dut_ref_res.result_variant(), 'connection') dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection assert dut_ref and ref_dut advertisement.cancel() return dut_ref, ref_dut def is_device_connected(self, device: PandoraDevice, connection: Connection, timeout: float) -> bool: async def is_device_connected(self, device: PandoraDevice, connection: Connection, timeout: float) -> bool: try: device.host.WaitDisconnection(connection=connection, timeout=timeout) await device.aio.host.WaitDisconnection(connection=connection, timeout=timeout) return False except grpc.RpcError as e: assert e.code() == grpc.StatusCode.DEADLINE_EXCEEDED # type: ignore return True def test_advertising_advertisement_data(self) -> None: @asynchronous async def test_advertising_advertisement_data(self) -> None: """ Ref starts ASHA advertisements with service data in advertisement data. DUT starts a service discovery. Loading @@ -137,10 +142,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] protocol_version = 0x01 truncated_hisyncid = HISYCNID[:4] advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=RANDOM) advertisement = await self.ref_advertise_asha(self.ref_left, RANDOM) # DUT starts a service discovery scan_result = self.dut_scan_for_asha(dut_address_type=RANDOM) scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM) advertisement.cancel() # Verify Ref is correctly discovered by DUT as a hearing aid device Loading @@ -156,7 +161,8 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(), ) def test_advertising_scan_response(self) -> None: @asynchronous async def test_advertising_scan_response(self) -> None: """ Ref starts ASHA advertisements with service data in scan response data. DUT starts a service discovery. Loading @@ -165,10 +171,10 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] protocol_version = 0x01 truncated_hisyncid = HISYCNID[:4] self.ref_left.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID) await self.ref_left.aio.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID) # advertise with ASHA service data in scan response advertisement = self.ref_left.host.Advertise( advertisement = self.ref_left.aio.host.Advertise( legacy=True, scan_response_data=DataTypes( complete_local_name=COMPLETE_LOCAL_NAME, Loading @@ -176,7 +182,7 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] ), ) scan_result = self.dut_scan_for_asha(dut_address_type=RANDOM) scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM) advertisement.cancel() # Verify Ref is correctly discovered by DUT as a hearing aid device. Loading @@ -195,7 +201,8 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] (RANDOM, PUBLIC), (RANDOM, RANDOM), ) # type: ignore[misc] def test_pairing( @asynchronous async def test_pairing( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -205,24 +212,26 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] DUT initiates connection to Ref. Verify that DUT and Ref are bonded and connected. """ advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) # DUT initiates connection to Ref. dut_ref, _ = self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref, ref_dut # DUT starts pairing with the Ref. secure = self.dut.security.Secure(connection=dut_ref, le=LE_LEVEL3) # FIXME: assert the security Level on ref side secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3) assert_equal(secure.WhichOneof("result"), "success") assert_equal(secure.result_variant(), 'success') @avatar.parameterized( (RANDOM, PUBLIC), (RANDOM, RANDOM), ) # type: ignore[misc] def test_unbonding( @asynchronous async def test_unbonding( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -233,32 +242,34 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] """ raise signals.TestSkip("TODO: update rootcanal to retry") advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = self.dut_connect_to_ref(advertisement, ref, dut_address_type) dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) secure = self.dut.security.Secure(connection=dut_ref, le=LESecurityLevel.LE_LEVEL3) assert_equal(secure.WhichOneof("result"), "success") self.dut.host.Disconnect(dut_ref) self.ref_left.host.WaitDisconnection(ref_dut) await self.dut.aio.host.Disconnect(dut_ref) await self.ref_left.aio.host.WaitDisconnection(ref_dut) # delete the bond if dut_address_type == OwnAddressType.PUBLIC: self.dut.security_storage.DeleteBond(public=self.ref_left.address) await self.dut.aio.security_storage.DeleteBond(public=self.ref_left.address) else: self.dut.security_storage.DeleteBond(random=self.ref_left.random_address) await self.dut.aio.security_storage.DeleteBond(random=self.ref_left.random_address) # DUT connect to REF again dut_ref = (self.dut.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict())).connection dut_ref = ( await self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()) ).connection # TODO very likely there is a bug in android here logging.debug("result should come out") advertisement.cancel() assert dut_ref secure = self.dut.security.Secure(connection=dut_ref, le=LESecurityLevel.LE_LEVEL3) secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LESecurityLevel.LE_LEVEL3) assert_equal(secure.WhichOneof("result"), "success") Loading @@ -266,23 +277,25 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] (RANDOM, RANDOM), (RANDOM, PUBLIC), ) # type: ignore[misc] def test_connection(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None: @asynchronous async def test_connection(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None: """ DUT discovers Ref. DUT initiates connection to Ref. Verify that DUT and Ref are connected. """ advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref assert ref_dut advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref, ref_dut @avatar.parameterized( (RANDOM, RANDOM), (RANDOM, PUBLIC), ) # type: ignore[misc] def test_disconnect_initiator( @asynchronous async def test_disconnect_initiator( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -291,17 +304,22 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] DUT initiates disconnection to Ref. Verify that DUT and Ref are disconnected. """ advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, _ = self.dut_connect_to_ref(advertisement, ref, dut_address_type) advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref, ref_dut self.dut.host.Disconnect(connection=dut_ref) await asyncio.gather( self.dut.aio.host.Disconnect(connection=dut_ref), self.is_device_connected(self.ref_left, ref_dut, 5) ) @avatar.parameterized( (RANDOM, RANDOM), (RANDOM, PUBLIC), ) # type: ignore[misc] def test_disconnect_acceptor( @asynchronous async def test_disconnect_acceptor( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -310,12 +328,15 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] Ref initiates disconnection to DUT (typically when put back in its box). Verify that Ref is disconnected. """ advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref assert ref_dut self.ref_left.host.Disconnect(connection=ref_dut) advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) assert dut_ref, ref_dut await asyncio.gather( self.ref_left.aio.host.Disconnect(connection=ref_dut), self.is_device_connected(self.dut, dut_ref, 5) ) @avatar.parameterized( (RANDOM, RANDOM, 0), Loading @@ -323,7 +344,8 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] (RANDOM, RANDOM, 1), (RANDOM, RANDOM, 5), ) # type: ignore[misc] def test_reconnection( @asynchronous async def test_reconnection( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -336,22 +358,23 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] Verify that DUT and Ref are connected. """ def connect_and_disconnect() -> None: advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, _ = self.dut_connect_to_ref(advertisement, ref, dut_address_type) self.dut.host.Disconnect(connection=dut_ref) async def connect_and_disconnect() -> None: advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) await self.dut.aio.host.Disconnect(connection=dut_ref) connect_and_disconnect() await connect_and_disconnect() # simulating reconnect interval time.sleep(reconnection_gap) connect_and_disconnect() await asyncio.sleep(reconnection_gap) await connect_and_disconnect() @avatar.parameterized( (RANDOM, RANDOM), (RANDOM, PUBLIC), ) # type: ignore[misc] def test_auto_connection( @asynchronous async def test_auto_connection( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, Loading @@ -361,34 +384,39 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] Ref starts sending ASHA advertisements. Verify that DUT auto-connects to Ref. """ advertisement = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = self.dut_scan_for_asha(dut_address_type=dut_address_type) advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type) # manually connect and not cancel advertisement dut_ref = self.dut.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()).connection ref_dut = next(advertisement).connection assert dut_ref assert ref_dut dut_ref_res, ref_dut_res = await asyncio.gather( self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()), anext(aiter(advertisement)), # pytype: disable=name-error ) assert_equal(dut_ref_res.result_variant(), 'connection') dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection assert dut_ref, ref_dut # pairing secure = self.dut.security.Secure(connection=dut_ref, le=LE_LEVEL3) # Pairing # FIXME: assert that the security Level is reached on ref side secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3) assert_equal(secure.WhichOneof("result"), "success") self.ref_left.host.Disconnect(connection=ref_dut) await self.ref_left.aio.host.Disconnect(connection=ref_dut) ref_dut = next(advertisement).connection ref_dut = (await anext(aiter(advertisement))).connection advertisement.cancel() assert ref_dut @avatar.parameterized( (RANDOM, RANDOM, Device.LEFT), (RANDOM, PUBLIC, Device.RIGHT), (RANDOM, RANDOM, Ear.LEFT), (RANDOM, PUBLIC, Ear.RIGHT), ) # type: ignore[misc] def test_disconnect_acceptor_dual_device( @asynchronous async def test_disconnect_acceptor_dual_device( self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, disconnect_device: Device, disconnect_device: Ear, ) -> None: """ Prerequisites: DUT and Ref are connected and bonded. Loading @@ -397,32 +425,32 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc] 2. Verify that it is disconnected and that the other peripheral is still connected. """ advertisement_left = self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref_left = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref_left, ref_left_dut = self.dut_connect_to_ref( advertisement_left = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=ref_address_type) ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref_left, ref_left_dut = await self.dut_connect_to_ref( advertisement=advertisement_left, ref=ref_left, dut_address_type=dut_address_type ) advertisement_left.cancel() assert dut_ref_left assert ref_left_dut assert dut_ref_left, ref_left_dut advertisement_right = self.ref_advertise_asha(ref_device=self.ref_right, ref_address_type=ref_address_type) ref_right = self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref_right, ref_right_dut = self.dut_connect_to_ref( advertisement_right = await self.ref_advertise_asha( ref_device=self.ref_right, ref_address_type=ref_address_type ) ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type) dut_ref_right, ref_right_dut = await self.dut_connect_to_ref( advertisement=advertisement_right, ref=ref_right, dut_address_type=dut_address_type ) advertisement_right.cancel() assert dut_ref_right assert ref_right_dut assert dut_ref_right, ref_right_dut if disconnect_device == Device.LEFT: self.ref_left.host.Disconnect(connection=ref_left_dut) assert self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0) == True assert self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0) == False if disconnect_device == Ear.LEFT: await self.ref_left.aio.host.Disconnect(connection=ref_left_dut) assert await self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0) assert not await self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0) else: self.ref_right.host.Disconnect(connection=ref_right_dut) assert self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0) == False assert self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0) == True await self.ref_right.aio.host.Disconnect(connection=ref_right_dut) assert not await self.is_device_connected(device=self.ref_right, connection=ref_right_dut, timeout=5.0) assert await self.is_device_connected(device=self.ref_left, connection=ref_left_dut, timeout=5.0) if __name__ == "__main__": Loading