Loading android/pandora/test/smp_test.py +113 −4 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices from avatar.aio import asynchronous from bumble import smp from bumble.hci import Address from bumble.pairing import PairingDelegate from concurrent import futures from contextlib import suppress from mobly import base_test, signals, test_runner Loading Loading @@ -56,15 +57,35 @@ class SmpTest(base_test.BaseTestClass): # type: ignore[misc] async def handle_pairing_events(self) -> NoReturn: dut_pairing_stream = self.dut.aio.security.OnPairing() ref_pairing_stream = self.ref.aio.security.OnPairing() try: while True: dut_pairing_event = await (anext(dut_pairing_stream)) dut_pairing_stream.send_nowait( PairingEventAnswer( if dut_pairing_event.method_variant() == 'passkey_entry_notification': ref_pairing_event = await (anext(ref_pairing_stream)) assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_request') assert_is_not_none(dut_pairing_event.passkey_entry_notification) assert dut_pairing_event.passkey_entry_notification is not None ref_ev_answer = PairingEventAnswer( event=ref_pairing_event, passkey=dut_pairing_event.passkey_entry_notification, ) ref_pairing_stream.send_nowait(ref_ev_answer) else: dut_pairing_stream.send_nowait(PairingEventAnswer( event=dut_pairing_event, confirm=True, ) ) )) ref_pairing_event = await (anext(ref_pairing_stream)) ref_pairing_stream.send_nowait(PairingEventAnswer( event=ref_pairing_event, confirm=True, )) finally: dut_pairing_stream.cancel() Loading Loading @@ -148,6 +169,94 @@ class SmpTest(base_test.BaseTestClass): # type: ignore[misc] is_bonded = await self.dut.aio.security_storage.IsBonded(random=ref1.random) assert_false(is_bonded.value, "") @asynchronous async def test_mitm_sec_req_on_enc(self) -> None: if isinstance(self.ref, BumblePandoraDevice): io_capability = PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT self.ref.server_config.io_capability = io_capability advertisement = self.ref.aio.host.Advertise( legacy=True, connectable=True, own_address_type=RANDOM, data=DataTypes(manufacturer_specific_data=b'pause cafe'), ) scan = self.dut.aio.host.Scan(own_address_type=RANDOM) ref = await anext((x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)) scan.cancel() asyncio.create_task(self.handle_pairing_events()) (dut_ref_res, ref_dut_res) = await asyncio.gather( self.dut.aio.host.ConnectLE(own_address_type=RANDOM, **ref.address_asdict()), anext(aiter(advertisement)), ) advertisement.cancel() ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection assert_is_not_none(dut_ref) assert dut_ref # Pair with MITM requirements (secure, wait_security) = await asyncio.gather( self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), self.ref.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), ) assert_equal(secure.result_variant(), 'success') assert_equal(wait_security.result_variant(), 'success') # Disconnect await asyncio.gather( self.ref.aio.host.Disconnect(connection=ref_dut), self.dut.aio.host.WaitDisconnection(connection=dut_ref), ) advertisement = self.ref.aio.host.Advertise( legacy=True, connectable=True, own_address_type=RANDOM, data=DataTypes(manufacturer_specific_data=b'pause cafe'), ) scan = self.dut.aio.host.Scan(own_address_type=RANDOM) ref = await anext((x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)) scan.cancel() (dut_ref_res, ref_dut_res) = await asyncio.gather( self.dut.aio.host.ConnectLE(own_address_type=RANDOM, **ref.address_asdict()), anext(aiter(advertisement)), ) ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection # Wait for the link to get encrypted connection = self.ref.device.lookup_connection(int.from_bytes(ref_dut.cookie.value, 'big')) def on_connection_encryption_change(): self.ref.device.smp_manager.request_pairing(connection) connection.on('connection_encryption_change', on_connection_encryption_change) # Fail if repairing is initiated fut = asyncio.get_running_loop().create_future() class Session(smp.Session): def on_smp_pairing_request_command(self, command: smp.SMP_Pairing_Request_Command) -> None: nonlocal fut fut.set_result(False) self.ref.device.smp_session_proxy = Session # Pass if the link is encrypted again def on_connection_encryption_key_refresh(): nonlocal fut fut.set_result(True) connection.on('connection_encryption_key_refresh', on_connection_encryption_key_refresh) assert_true(await fut, "Repairing initiated") if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) Loading Loading
android/pandora/test/smp_test.py +113 −4 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices from avatar.aio import asynchronous from bumble import smp from bumble.hci import Address from bumble.pairing import PairingDelegate from concurrent import futures from contextlib import suppress from mobly import base_test, signals, test_runner Loading Loading @@ -56,15 +57,35 @@ class SmpTest(base_test.BaseTestClass): # type: ignore[misc] async def handle_pairing_events(self) -> NoReturn: dut_pairing_stream = self.dut.aio.security.OnPairing() ref_pairing_stream = self.ref.aio.security.OnPairing() try: while True: dut_pairing_event = await (anext(dut_pairing_stream)) dut_pairing_stream.send_nowait( PairingEventAnswer( if dut_pairing_event.method_variant() == 'passkey_entry_notification': ref_pairing_event = await (anext(ref_pairing_stream)) assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_request') assert_is_not_none(dut_pairing_event.passkey_entry_notification) assert dut_pairing_event.passkey_entry_notification is not None ref_ev_answer = PairingEventAnswer( event=ref_pairing_event, passkey=dut_pairing_event.passkey_entry_notification, ) ref_pairing_stream.send_nowait(ref_ev_answer) else: dut_pairing_stream.send_nowait(PairingEventAnswer( event=dut_pairing_event, confirm=True, ) ) )) ref_pairing_event = await (anext(ref_pairing_stream)) ref_pairing_stream.send_nowait(PairingEventAnswer( event=ref_pairing_event, confirm=True, )) finally: dut_pairing_stream.cancel() Loading Loading @@ -148,6 +169,94 @@ class SmpTest(base_test.BaseTestClass): # type: ignore[misc] is_bonded = await self.dut.aio.security_storage.IsBonded(random=ref1.random) assert_false(is_bonded.value, "") @asynchronous async def test_mitm_sec_req_on_enc(self) -> None: if isinstance(self.ref, BumblePandoraDevice): io_capability = PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT self.ref.server_config.io_capability = io_capability advertisement = self.ref.aio.host.Advertise( legacy=True, connectable=True, own_address_type=RANDOM, data=DataTypes(manufacturer_specific_data=b'pause cafe'), ) scan = self.dut.aio.host.Scan(own_address_type=RANDOM) ref = await anext((x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)) scan.cancel() asyncio.create_task(self.handle_pairing_events()) (dut_ref_res, ref_dut_res) = await asyncio.gather( self.dut.aio.host.ConnectLE(own_address_type=RANDOM, **ref.address_asdict()), anext(aiter(advertisement)), ) advertisement.cancel() ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection assert_is_not_none(dut_ref) assert dut_ref # Pair with MITM requirements (secure, wait_security) = await asyncio.gather( self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), self.ref.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), ) assert_equal(secure.result_variant(), 'success') assert_equal(wait_security.result_variant(), 'success') # Disconnect await asyncio.gather( self.ref.aio.host.Disconnect(connection=ref_dut), self.dut.aio.host.WaitDisconnection(connection=dut_ref), ) advertisement = self.ref.aio.host.Advertise( legacy=True, connectable=True, own_address_type=RANDOM, data=DataTypes(manufacturer_specific_data=b'pause cafe'), ) scan = self.dut.aio.host.Scan(own_address_type=RANDOM) ref = await anext((x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)) scan.cancel() (dut_ref_res, ref_dut_res) = await asyncio.gather( self.dut.aio.host.ConnectLE(own_address_type=RANDOM, **ref.address_asdict()), anext(aiter(advertisement)), ) ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection # Wait for the link to get encrypted connection = self.ref.device.lookup_connection(int.from_bytes(ref_dut.cookie.value, 'big')) def on_connection_encryption_change(): self.ref.device.smp_manager.request_pairing(connection) connection.on('connection_encryption_change', on_connection_encryption_change) # Fail if repairing is initiated fut = asyncio.get_running_loop().create_future() class Session(smp.Session): def on_smp_pairing_request_command(self, command: smp.SMP_Pairing_Request_Command) -> None: nonlocal fut fut.set_result(False) self.ref.device.smp_session_proxy = Session # Pass if the link is encrypted again def on_connection_encryption_key_refresh(): nonlocal fut fut.set_result(True) connection.on('connection_encryption_key_refresh', on_connection_encryption_key_refresh) assert_true(await fut, "Repairing initiated") if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) Loading