Loading android/pandora/test/classic_ssp_test.py +257 −123 Original line number Diff line number Diff line Loading @@ -13,182 +13,316 @@ # limitations under the License. import asyncio import avatar import itertools import logging from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous, parameterized from bumble.core import BT_BR_EDR_TRANSPORT from bumble.hci import HCI_CENTRAL_ROLE, Address as BumbleAddress from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices from bumble.hci import HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE from bumble.pairing import PairingDelegate from concurrent import futures from contextlib import suppress from mobly import base_test, signals, test_runner from mobly.asserts import assert_equal # type: ignore from mobly.asserts import assert_in # type: ignore from mobly.asserts import assert_is_not_none # type: ignore from mobly.asserts import fail # type: ignore from pandora.host_pb2 import RANDOM, DataTypes, OwnAddressType from pandora.security_pb2 import LEVEL2, PairingEventAnswer from typing import NoReturn, Optional from pandora.host_pb2 import Connection from pandora.security_pb2 import LEVEL2, PairingEventAnswer, SecureResponse, SecurityLevel, WaitSecurityResponse from typing import Callable, Coroutine, Optional, Tuple ALL_ROLES = (HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE) ALL_IO_CAPABILITIES = ( None, PairingDelegate.DISPLAY_OUTPUT_ONLY, PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, PairingDelegate.KEYBOARD_INPUT_ONLY, PairingDelegate.NO_OUTPUT_NO_INPUT, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, ) class ClassicSspTest(base_test.BaseTestClass): # type: ignore[misc] ''' This class aim to test SSP (Secure Simple Pairing) on Classic Bluetooth devices. ''' devices: Optional[PandoraDevices] = None # pandora devices. dut: PandoraDevice ref: PandoraDevice def setup_class(self) -> None: @avatar.asynchronous async def setup_class(self) -> None: self.devices = PandoraDevices(self) self.dut, self.ref, *_ = self.devices # Enable BR/EDR mode for Bumble devices. # Enable BR/EDR mode and SSP for Bumble devices. for device in self.devices: if isinstance(device, BumblePandoraDevice): device.config.setdefault('classic_enabled', True) device.config.setdefault('classic_ssp_enabled', True) device.config.setdefault( 'server', { 'io_capability': 'display_output_and_yes_no_input', }, ) await asyncio.gather(self.dut.reset(), self.ref.reset()) def teardown_class(self) -> None: if self.devices: self.devices.stop_all() @asynchronous async def setup_test(self) -> None: @avatar.asynchronous async def setup_test(self) -> None: # pytype: disable=wrong-arg-types await asyncio.gather(self.dut.reset(), self.ref.reset()) async def connect_le(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None: advertisement = self.dut.aio.host.Advertise( legacy=True, connectable=True, own_address_type=dut_address_type, data=DataTypes(manufacturer_specific_data=b'pause cafe'), ) @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] @avatar.asynchronous async def test_success_initiate_connection_initiate_pairing( self, ref_io_capability: Optional[PairingDelegate.IoCapability], ref_role: Optional[int], ) -> None: # Override REF IO capability if supported. set_io_capability(self.ref, ref_io_capability) scan = self.ref.aio.host.Scan(own_address_type=ref_address_type) dut = await anext( (x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data) ) # pytype: disable=name-error scan.cancel() # Connection/pairing task. async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: dut_ref, ref_dut = await make_classic_connection(self.dut, self.ref) if ref_role is not None: await role_switch(self.ref, ref_dut, ref_role) return await authenticate(self.dut, dut_ref, self.ref, ref_dut, LEVEL2) (ref_dut_res, dut_ref_res) = await asyncio.gather( self.ref.aio.host.ConnectLE(own_address_type=ref_address_type, **dut.address_asdict()), anext(aiter(advertisement)), # pytype: disable=name-error # Handle pairing. initiator_pairing, acceptor_pairing = await handle_pairing( self.dut, self.ref, connect_and_pair, ) advertisement.cancel() ref_dut, _ = ref_dut_res.connection, dut_ref_res.connection assert_is_not_none(ref_dut) # Assert success. assert_equal(initiator_pairing.result_variant(), 'success') assert_equal(acceptor_pairing.result_variant(), 'success') async def handle_pairing_events(self) -> NoReturn: ref_pairing_stream = self.ref.aio.security.OnPairing() dut_pairing_stream = self.dut.aio.security.OnPairing() @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] @avatar.asynchronous async def test_success_initiate_connection_accept_pairing( self, ref_io_capability: Optional[PairingDelegate.IoCapability], ref_role: Optional[int], ) -> None: if not isinstance(self.dut, BumblePandoraDevice): raise signals.TestSkip('TODO: Fix rootcanal when both AOSP and Bumble trigger the auth.') try: while True: ref_pairing_event, dut_pairing_event = await asyncio.gather( anext(ref_pairing_stream), anext(dut_pairing_stream), ) # Override REF IO capability if supported. set_io_capability(self.ref, ref_io_capability) if dut_pairing_event.method_variant() in ( 'numeric_comparison', 'just_works', ): assert_in(ref_pairing_event.method_variant(), ('numeric_comparison', 'just_works')) dut_pairing_stream.send_nowait( PairingEventAnswer( event=dut_pairing_event, confirm=True, ) ) ref_pairing_stream.send_nowait( PairingEventAnswer( event=ref_pairing_event, confirm=True, ) ) elif dut_pairing_event.method_variant() == 'passkey_entry_notification': assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_request') ref_pairing_stream.send_nowait( PairingEventAnswer( event=ref_pairing_event, passkey=dut_pairing_event.passkey_entry_notification, ) # Connection/pairing task. async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: dut_ref, ref_dut = await make_classic_connection(self.dut, self.ref) if ref_role is not None: await role_switch(self.ref, ref_dut, ref_role) return await authenticate(self.ref, ref_dut, self.dut, dut_ref, LEVEL2) # Handle pairing. initiator_pairing, acceptor_pairing = await handle_pairing( self.dut, self.ref, connect_and_pair, ) elif dut_pairing_event.method_variant() == 'passkey_entry_request': assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_notification') dut_pairing_stream.send_nowait( PairingEventAnswer( event=dut_pairing_event, passkey=ref_pairing_event.passkey_entry_notification, # Assert success. assert_equal(initiator_pairing.result_variant(), 'success') assert_equal(acceptor_pairing.result_variant(), 'success') @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] @avatar.asynchronous async def test_success_accept_connection_initiate_pairing( self, ref_io_capability: Optional[PairingDelegate.IoCapability], ref_role: Optional[int], ) -> None: # Override REF IO capability if supported. set_io_capability(self.ref, ref_io_capability) # Connection/pairing task. async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: ref_dut, dut_ref = await make_classic_connection(self.ref, self.dut) if ref_role is not None: await role_switch(self.ref, ref_dut, ref_role) return await authenticate(self.dut, dut_ref, self.ref, ref_dut, LEVEL2) # Handle pairing. initiator_pairing, acceptor_pairing = await handle_pairing( self.dut, self.ref, connect_and_pair, ) # Assert success. assert_equal(initiator_pairing.result_variant(), 'success') assert_equal(acceptor_pairing.result_variant(), 'success') @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] @avatar.asynchronous async def test_success_accept_connection_accept_pairing( self, ref_io_capability: Optional[PairingDelegate.IoCapability], ref_role: Optional[int], ) -> None: # Override REF IO capability if supported. set_io_capability(self.ref, ref_io_capability) # Connection/pairing task. async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: ref_dut, dut_ref = await make_classic_connection(self.ref, self.dut) if ref_role is not None: await role_switch(self.ref, ref_dut, ref_role) return await authenticate(self.ref, ref_dut, self.dut, dut_ref, LEVEL2) # Handle pairing. initiator_pairing, acceptor_pairing = await handle_pairing( self.dut, self.ref, connect_and_pair, ) # Assert success. assert_equal(initiator_pairing.result_variant(), 'success') assert_equal(acceptor_pairing.result_variant(), 'success') def set_io_capability(device: PandoraDevice, io_capability: Optional[PairingDelegate.IoCapability]) -> None: if io_capability is None: return if isinstance(device, BumblePandoraDevice): # Override Bumble reference device default IO capability. device.server_config.io_capability = io_capability else: fail("") raise signals.TestSkip('Unable to override IO capability on non Bumble device.') finally: ref_pairing_stream.cancel() dut_pairing_stream.cancel() @parameterized( *itertools.product( (PairingDelegate.NO_OUTPUT_NO_INPUT,), (HCI_CENTRAL_ROLE,), (RANDOM,), # Connection task. async def make_classic_connection(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]: '''Connect two device and returns both connection tokens.''' (connect, wait_connection) = await asyncio.gather( initiator.aio.host.Connect(address=acceptor.address), acceptor.aio.host.WaitConnection(address=initiator.address), ) ) # type: ignore[misc] @asynchronous async def test_classic_pairing_incoming( self, ref_io_capability: PairingDelegate.IoCapability, ref_role: int, ref_le_addr_type: OwnAddressType ) -> None: if not isinstance(self.ref, BumblePandoraDevice): raise signals.TestSkip('Test require Bumble as reference device(s)') # override reference device IO capability self.ref.server_config.io_capability = ref_io_capability # Assert connection are successful. assert_equal(connect.result_variant(), 'connection') assert_equal(wait_connection.result_variant(), 'connection') assert_is_not_none(connect.connection) assert_is_not_none(wait_connection.connection) assert connect.connection and wait_connection.connection pairing = asyncio.create_task(self.handle_pairing_events()) # Returns connections. return connect.connection, wait_connection.connection await self.connect_le(RANDOM, ref_le_addr_type) (dut_ref_res, ref_dut_res) = await asyncio.gather( self.dut.aio.host.WaitConnection(address=self.ref.address), self.ref.aio.host.Connect(address=self.dut.address), # Pairing task. async def authenticate( initiator: PandoraDevice, initiator_connection: Connection, acceptor: PandoraDevice, acceptor_connection: Connection, security_level: SecurityLevel, ) -> Tuple[SecureResponse, WaitSecurityResponse]: '''Pair two device and returns both pairing responses.''' return await asyncio.gather( initiator.aio.security.Secure(connection=initiator_connection, classic=security_level), acceptor.aio.security.WaitSecurity(connection=acceptor_connection, classic=security_level), ) assert_equal(ref_dut_res.result_variant(), 'connection') assert_equal(dut_ref_res.result_variant(), 'connection') ref_dut = ref_dut_res.connection dut_ref = dut_ref_res.connection assert_is_not_none(ref_dut) assert_is_not_none(dut_ref) assert ref_dut and dut_ref ref_dut_raw = self.ref.device.find_connection_by_bd_addr( BumbleAddress(bytes(reversed(self.dut.address)), BumbleAddress.PUBLIC_DEVICE_ADDRESS), BT_BR_EDR_TRANSPORT ) assert_is_not_none(ref_dut_raw) assert ref_dut_raw # Role switch task. async def role_switch( device: PandoraDevice, connection: Connection, role: int, ) -> None: '''Switch role if supported.''' if ref_dut_raw.role != ref_role: await ref_dut_raw.switch_role(ref_role) if not isinstance(device, BumblePandoraDevice): return (secure, wait_security) = await asyncio.gather( self.ref.aio.security.Secure(connection=ref_dut, classic=LEVEL2), self.dut.aio.security.WaitSecurity(connection=dut_ref, classic=LEVEL2), ) connection_handle = int.from_bytes(connection.cookie.value, 'big') bumble_connection = device.device.lookup_connection(connection_handle) assert_is_not_none(bumble_connection) assert bumble_connection pairing.cancel() with suppress(asyncio.CancelledError, futures.CancelledError): await pairing if bumble_connection.role != role: device.log.info(f"Role switch to: {'`CENTRAL`' if role == HCI_CENTRAL_ROLE else '`PERIPHERAL`'}") await bumble_connection.switch_role(role) assert_equal(secure.result_variant(), 'success') assert_equal(wait_security.result_variant(), 'success') await asyncio.gather( self.dut.aio.host.WaitDisconnection(connection=dut_ref), self.ref.aio.host.Disconnect(connection=ref_dut), ) # Handle pairing events task. async def handle_pairing( dut: PandoraDevice, ref: PandoraDevice, connect_and_pair: Callable[[], Coroutine[None, None, Tuple[SecureResponse, WaitSecurityResponse]]], confirm: Callable[[bool], bool] = lambda x: x, passkey: Callable[[int], int] = lambda x: x, ) -> Tuple[SecureResponse, WaitSecurityResponse]: # Listen for pairing event on bot DUT and REF. dut_pairing, ref_pairing = dut.aio.security.OnPairing(), ref.aio.security.OnPairing() # Start connection/pairing. connect_and_pair_task = asyncio.create_task(connect_and_pair()) try: dut_ev = await asyncio.wait_for(anext(dut_pairing), timeout=25.0) dut.log.info(f'DUT pairing event: {dut_ev.method_variant()}') ref_ev = await asyncio.wait_for(anext(ref_pairing), timeout=3.0) ref.log.info(f'REF pairing event: {ref_ev.method_variant()}') if dut_ev.method_variant() in ('numeric_comparison', 'just_works'): assert_in(ref_ev.method_variant(), ('numeric_comparison', 'just_works')) confirm_res = True if dut_ev.method_variant() == 'numeric_comparison' and ref_ev.method_variant() == 'numeric_comparison': confirm_res = ref_ev.numeric_comparison == dut_ev.numeric_comparison confirm_res = confirm(confirm_res) dut_pairing.send_nowait(PairingEventAnswer(event=dut_ev, confirm=confirm_res)) ref_pairing.send_nowait(PairingEventAnswer(event=ref_ev, confirm=confirm_res)) elif dut_ev.method_variant() == 'passkey_entry_notification': assert_equal(ref_ev.method_variant(), 'passkey_entry_request') assert_is_not_none(dut_ev.passkey_entry_notification) assert dut_ev.passkey_entry_notification is not None passkey_res = passkey(dut_ev.passkey_entry_notification) ref_pairing.send_nowait(PairingEventAnswer(event=ref_ev, passkey=passkey_res)) elif dut_ev.method_variant() == 'passkey_entry_request': assert_equal(ref_ev.method_variant(), 'passkey_entry_notification') assert_is_not_none(ref_ev.passkey_entry_notification) assert ref_ev.passkey_entry_notification is not None passkey_res = passkey(ref_ev.passkey_entry_notification) dut_pairing.send_nowait(PairingEventAnswer(event=dut_ev, passkey=passkey_res)) else: fail("") except (asyncio.CancelledError, asyncio.TimeoutError): logging.exception('Pairing timed-out.') finally: try: (secure, wait_security) = await asyncio.wait_for(connect_and_pair_task, 15.0) logging.info(f'Pairing result: {secure.result_variant()}/{wait_security.result_variant()}') return secure, wait_security finally: dut_pairing.cancel() ref_pairing.cancel() if __name__ == '__main__': Loading Loading
android/pandora/test/classic_ssp_test.py +257 −123 Original line number Diff line number Diff line Loading @@ -13,182 +13,316 @@ # limitations under the License. import asyncio import avatar import itertools import logging from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous, parameterized from bumble.core import BT_BR_EDR_TRANSPORT from bumble.hci import HCI_CENTRAL_ROLE, Address as BumbleAddress from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices from bumble.hci import HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE from bumble.pairing import PairingDelegate from concurrent import futures from contextlib import suppress from mobly import base_test, signals, test_runner from mobly.asserts import assert_equal # type: ignore from mobly.asserts import assert_in # type: ignore from mobly.asserts import assert_is_not_none # type: ignore from mobly.asserts import fail # type: ignore from pandora.host_pb2 import RANDOM, DataTypes, OwnAddressType from pandora.security_pb2 import LEVEL2, PairingEventAnswer from typing import NoReturn, Optional from pandora.host_pb2 import Connection from pandora.security_pb2 import LEVEL2, PairingEventAnswer, SecureResponse, SecurityLevel, WaitSecurityResponse from typing import Callable, Coroutine, Optional, Tuple ALL_ROLES = (HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE) ALL_IO_CAPABILITIES = ( None, PairingDelegate.DISPLAY_OUTPUT_ONLY, PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, PairingDelegate.KEYBOARD_INPUT_ONLY, PairingDelegate.NO_OUTPUT_NO_INPUT, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, ) class ClassicSspTest(base_test.BaseTestClass): # type: ignore[misc] ''' This class aim to test SSP (Secure Simple Pairing) on Classic Bluetooth devices. ''' devices: Optional[PandoraDevices] = None # pandora devices. dut: PandoraDevice ref: PandoraDevice def setup_class(self) -> None: @avatar.asynchronous async def setup_class(self) -> None: self.devices = PandoraDevices(self) self.dut, self.ref, *_ = self.devices # Enable BR/EDR mode for Bumble devices. # Enable BR/EDR mode and SSP for Bumble devices. for device in self.devices: if isinstance(device, BumblePandoraDevice): device.config.setdefault('classic_enabled', True) device.config.setdefault('classic_ssp_enabled', True) device.config.setdefault( 'server', { 'io_capability': 'display_output_and_yes_no_input', }, ) await asyncio.gather(self.dut.reset(), self.ref.reset()) def teardown_class(self) -> None: if self.devices: self.devices.stop_all() @asynchronous async def setup_test(self) -> None: @avatar.asynchronous async def setup_test(self) -> None: # pytype: disable=wrong-arg-types await asyncio.gather(self.dut.reset(), self.ref.reset()) async def connect_le(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None: advertisement = self.dut.aio.host.Advertise( legacy=True, connectable=True, own_address_type=dut_address_type, data=DataTypes(manufacturer_specific_data=b'pause cafe'), ) @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] @avatar.asynchronous async def test_success_initiate_connection_initiate_pairing( self, ref_io_capability: Optional[PairingDelegate.IoCapability], ref_role: Optional[int], ) -> None: # Override REF IO capability if supported. set_io_capability(self.ref, ref_io_capability) scan = self.ref.aio.host.Scan(own_address_type=ref_address_type) dut = await anext( (x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data) ) # pytype: disable=name-error scan.cancel() # Connection/pairing task. async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: dut_ref, ref_dut = await make_classic_connection(self.dut, self.ref) if ref_role is not None: await role_switch(self.ref, ref_dut, ref_role) return await authenticate(self.dut, dut_ref, self.ref, ref_dut, LEVEL2) (ref_dut_res, dut_ref_res) = await asyncio.gather( self.ref.aio.host.ConnectLE(own_address_type=ref_address_type, **dut.address_asdict()), anext(aiter(advertisement)), # pytype: disable=name-error # Handle pairing. initiator_pairing, acceptor_pairing = await handle_pairing( self.dut, self.ref, connect_and_pair, ) advertisement.cancel() ref_dut, _ = ref_dut_res.connection, dut_ref_res.connection assert_is_not_none(ref_dut) # Assert success. assert_equal(initiator_pairing.result_variant(), 'success') assert_equal(acceptor_pairing.result_variant(), 'success') async def handle_pairing_events(self) -> NoReturn: ref_pairing_stream = self.ref.aio.security.OnPairing() dut_pairing_stream = self.dut.aio.security.OnPairing() @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] @avatar.asynchronous async def test_success_initiate_connection_accept_pairing( self, ref_io_capability: Optional[PairingDelegate.IoCapability], ref_role: Optional[int], ) -> None: if not isinstance(self.dut, BumblePandoraDevice): raise signals.TestSkip('TODO: Fix rootcanal when both AOSP and Bumble trigger the auth.') try: while True: ref_pairing_event, dut_pairing_event = await asyncio.gather( anext(ref_pairing_stream), anext(dut_pairing_stream), ) # Override REF IO capability if supported. set_io_capability(self.ref, ref_io_capability) if dut_pairing_event.method_variant() in ( 'numeric_comparison', 'just_works', ): assert_in(ref_pairing_event.method_variant(), ('numeric_comparison', 'just_works')) dut_pairing_stream.send_nowait( PairingEventAnswer( event=dut_pairing_event, confirm=True, ) ) ref_pairing_stream.send_nowait( PairingEventAnswer( event=ref_pairing_event, confirm=True, ) ) elif dut_pairing_event.method_variant() == 'passkey_entry_notification': assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_request') ref_pairing_stream.send_nowait( PairingEventAnswer( event=ref_pairing_event, passkey=dut_pairing_event.passkey_entry_notification, ) # Connection/pairing task. async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: dut_ref, ref_dut = await make_classic_connection(self.dut, self.ref) if ref_role is not None: await role_switch(self.ref, ref_dut, ref_role) return await authenticate(self.ref, ref_dut, self.dut, dut_ref, LEVEL2) # Handle pairing. initiator_pairing, acceptor_pairing = await handle_pairing( self.dut, self.ref, connect_and_pair, ) elif dut_pairing_event.method_variant() == 'passkey_entry_request': assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_notification') dut_pairing_stream.send_nowait( PairingEventAnswer( event=dut_pairing_event, passkey=ref_pairing_event.passkey_entry_notification, # Assert success. assert_equal(initiator_pairing.result_variant(), 'success') assert_equal(acceptor_pairing.result_variant(), 'success') @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] @avatar.asynchronous async def test_success_accept_connection_initiate_pairing( self, ref_io_capability: Optional[PairingDelegate.IoCapability], ref_role: Optional[int], ) -> None: # Override REF IO capability if supported. set_io_capability(self.ref, ref_io_capability) # Connection/pairing task. async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: ref_dut, dut_ref = await make_classic_connection(self.ref, self.dut) if ref_role is not None: await role_switch(self.ref, ref_dut, ref_role) return await authenticate(self.dut, dut_ref, self.ref, ref_dut, LEVEL2) # Handle pairing. initiator_pairing, acceptor_pairing = await handle_pairing( self.dut, self.ref, connect_and_pair, ) # Assert success. assert_equal(initiator_pairing.result_variant(), 'success') assert_equal(acceptor_pairing.result_variant(), 'success') @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] @avatar.asynchronous async def test_success_accept_connection_accept_pairing( self, ref_io_capability: Optional[PairingDelegate.IoCapability], ref_role: Optional[int], ) -> None: # Override REF IO capability if supported. set_io_capability(self.ref, ref_io_capability) # Connection/pairing task. async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: ref_dut, dut_ref = await make_classic_connection(self.ref, self.dut) if ref_role is not None: await role_switch(self.ref, ref_dut, ref_role) return await authenticate(self.ref, ref_dut, self.dut, dut_ref, LEVEL2) # Handle pairing. initiator_pairing, acceptor_pairing = await handle_pairing( self.dut, self.ref, connect_and_pair, ) # Assert success. assert_equal(initiator_pairing.result_variant(), 'success') assert_equal(acceptor_pairing.result_variant(), 'success') def set_io_capability(device: PandoraDevice, io_capability: Optional[PairingDelegate.IoCapability]) -> None: if io_capability is None: return if isinstance(device, BumblePandoraDevice): # Override Bumble reference device default IO capability. device.server_config.io_capability = io_capability else: fail("") raise signals.TestSkip('Unable to override IO capability on non Bumble device.') finally: ref_pairing_stream.cancel() dut_pairing_stream.cancel() @parameterized( *itertools.product( (PairingDelegate.NO_OUTPUT_NO_INPUT,), (HCI_CENTRAL_ROLE,), (RANDOM,), # Connection task. async def make_classic_connection(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]: '''Connect two device and returns both connection tokens.''' (connect, wait_connection) = await asyncio.gather( initiator.aio.host.Connect(address=acceptor.address), acceptor.aio.host.WaitConnection(address=initiator.address), ) ) # type: ignore[misc] @asynchronous async def test_classic_pairing_incoming( self, ref_io_capability: PairingDelegate.IoCapability, ref_role: int, ref_le_addr_type: OwnAddressType ) -> None: if not isinstance(self.ref, BumblePandoraDevice): raise signals.TestSkip('Test require Bumble as reference device(s)') # override reference device IO capability self.ref.server_config.io_capability = ref_io_capability # Assert connection are successful. assert_equal(connect.result_variant(), 'connection') assert_equal(wait_connection.result_variant(), 'connection') assert_is_not_none(connect.connection) assert_is_not_none(wait_connection.connection) assert connect.connection and wait_connection.connection pairing = asyncio.create_task(self.handle_pairing_events()) # Returns connections. return connect.connection, wait_connection.connection await self.connect_le(RANDOM, ref_le_addr_type) (dut_ref_res, ref_dut_res) = await asyncio.gather( self.dut.aio.host.WaitConnection(address=self.ref.address), self.ref.aio.host.Connect(address=self.dut.address), # Pairing task. async def authenticate( initiator: PandoraDevice, initiator_connection: Connection, acceptor: PandoraDevice, acceptor_connection: Connection, security_level: SecurityLevel, ) -> Tuple[SecureResponse, WaitSecurityResponse]: '''Pair two device and returns both pairing responses.''' return await asyncio.gather( initiator.aio.security.Secure(connection=initiator_connection, classic=security_level), acceptor.aio.security.WaitSecurity(connection=acceptor_connection, classic=security_level), ) assert_equal(ref_dut_res.result_variant(), 'connection') assert_equal(dut_ref_res.result_variant(), 'connection') ref_dut = ref_dut_res.connection dut_ref = dut_ref_res.connection assert_is_not_none(ref_dut) assert_is_not_none(dut_ref) assert ref_dut and dut_ref ref_dut_raw = self.ref.device.find_connection_by_bd_addr( BumbleAddress(bytes(reversed(self.dut.address)), BumbleAddress.PUBLIC_DEVICE_ADDRESS), BT_BR_EDR_TRANSPORT ) assert_is_not_none(ref_dut_raw) assert ref_dut_raw # Role switch task. async def role_switch( device: PandoraDevice, connection: Connection, role: int, ) -> None: '''Switch role if supported.''' if ref_dut_raw.role != ref_role: await ref_dut_raw.switch_role(ref_role) if not isinstance(device, BumblePandoraDevice): return (secure, wait_security) = await asyncio.gather( self.ref.aio.security.Secure(connection=ref_dut, classic=LEVEL2), self.dut.aio.security.WaitSecurity(connection=dut_ref, classic=LEVEL2), ) connection_handle = int.from_bytes(connection.cookie.value, 'big') bumble_connection = device.device.lookup_connection(connection_handle) assert_is_not_none(bumble_connection) assert bumble_connection pairing.cancel() with suppress(asyncio.CancelledError, futures.CancelledError): await pairing if bumble_connection.role != role: device.log.info(f"Role switch to: {'`CENTRAL`' if role == HCI_CENTRAL_ROLE else '`PERIPHERAL`'}") await bumble_connection.switch_role(role) assert_equal(secure.result_variant(), 'success') assert_equal(wait_security.result_variant(), 'success') await asyncio.gather( self.dut.aio.host.WaitDisconnection(connection=dut_ref), self.ref.aio.host.Disconnect(connection=ref_dut), ) # Handle pairing events task. async def handle_pairing( dut: PandoraDevice, ref: PandoraDevice, connect_and_pair: Callable[[], Coroutine[None, None, Tuple[SecureResponse, WaitSecurityResponse]]], confirm: Callable[[bool], bool] = lambda x: x, passkey: Callable[[int], int] = lambda x: x, ) -> Tuple[SecureResponse, WaitSecurityResponse]: # Listen for pairing event on bot DUT and REF. dut_pairing, ref_pairing = dut.aio.security.OnPairing(), ref.aio.security.OnPairing() # Start connection/pairing. connect_and_pair_task = asyncio.create_task(connect_and_pair()) try: dut_ev = await asyncio.wait_for(anext(dut_pairing), timeout=25.0) dut.log.info(f'DUT pairing event: {dut_ev.method_variant()}') ref_ev = await asyncio.wait_for(anext(ref_pairing), timeout=3.0) ref.log.info(f'REF pairing event: {ref_ev.method_variant()}') if dut_ev.method_variant() in ('numeric_comparison', 'just_works'): assert_in(ref_ev.method_variant(), ('numeric_comparison', 'just_works')) confirm_res = True if dut_ev.method_variant() == 'numeric_comparison' and ref_ev.method_variant() == 'numeric_comparison': confirm_res = ref_ev.numeric_comparison == dut_ev.numeric_comparison confirm_res = confirm(confirm_res) dut_pairing.send_nowait(PairingEventAnswer(event=dut_ev, confirm=confirm_res)) ref_pairing.send_nowait(PairingEventAnswer(event=ref_ev, confirm=confirm_res)) elif dut_ev.method_variant() == 'passkey_entry_notification': assert_equal(ref_ev.method_variant(), 'passkey_entry_request') assert_is_not_none(dut_ev.passkey_entry_notification) assert dut_ev.passkey_entry_notification is not None passkey_res = passkey(dut_ev.passkey_entry_notification) ref_pairing.send_nowait(PairingEventAnswer(event=ref_ev, passkey=passkey_res)) elif dut_ev.method_variant() == 'passkey_entry_request': assert_equal(ref_ev.method_variant(), 'passkey_entry_notification') assert_is_not_none(ref_ev.passkey_entry_notification) assert ref_ev.passkey_entry_notification is not None passkey_res = passkey(ref_ev.passkey_entry_notification) dut_pairing.send_nowait(PairingEventAnswer(event=dut_ev, passkey=passkey_res)) else: fail("") except (asyncio.CancelledError, asyncio.TimeoutError): logging.exception('Pairing timed-out.') finally: try: (secure, wait_security) = await asyncio.wait_for(connect_and_pair_task, 15.0) logging.info(f'Pairing result: {secure.result_variant()}/{wait_security.result_variant()}') return secure, wait_security finally: dut_pairing.cancel() ref_pairing.cancel() if __name__ == '__main__': Loading