Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit cf0dd9c9 authored by Himanshu Rawat's avatar Himanshu Rawat
Browse files

SMP MITM Security Request on encrypted link

Test steps
1. Connect over LE
2. Perform authenticated LE pairing
3. Disconnect
4. Reconnect as central
5. Wait for the link to be encrypted
6. Send SMP Security Request with MITM protection requirement

Expectation: The link is encrypted again. No repairing.

Bug: 295061381
Test: avatar run --include-filter="SmpTest"
Test: atest avatar -v
Change-Id: Ia1d7030e6cf604f57ea14663d99d678074c93894
parent 5400d131
Loading
Loading
Loading
Loading
+113 −4
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
from avatar.aio import asynchronous
from bumble import smp
from bumble.hci import Address
from bumble.pairing import PairingDelegate
from concurrent import futures
from contextlib import suppress
from mobly import base_test, signals, test_runner
@@ -56,15 +57,35 @@ class SmpTest(base_test.BaseTestClass): # type: ignore[misc]

    async def handle_pairing_events(self) -> NoReturn:
        dut_pairing_stream = self.dut.aio.security.OnPairing()
        ref_pairing_stream = self.ref.aio.security.OnPairing()
        try:
            while True:
                dut_pairing_event = await (anext(dut_pairing_stream))
                dut_pairing_stream.send_nowait(
                    PairingEventAnswer(

                if dut_pairing_event.method_variant() == 'passkey_entry_notification':
                    ref_pairing_event = await (anext(ref_pairing_stream))

                    assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_request')
                    assert_is_not_none(dut_pairing_event.passkey_entry_notification)
                    assert dut_pairing_event.passkey_entry_notification is not None

                    ref_ev_answer = PairingEventAnswer(
                        event=ref_pairing_event,
                        passkey=dut_pairing_event.passkey_entry_notification,
                    )
                    ref_pairing_stream.send_nowait(ref_ev_answer)
                else:
                    dut_pairing_stream.send_nowait(PairingEventAnswer(
                        event=dut_pairing_event,
                        confirm=True,
                    )
                )
                    ))
                    ref_pairing_event = await (anext(ref_pairing_stream))

                    ref_pairing_stream.send_nowait(PairingEventAnswer(
                        event=ref_pairing_event,
                        confirm=True,
                    ))

        finally:
            dut_pairing_stream.cancel()

@@ -148,6 +169,94 @@ class SmpTest(base_test.BaseTestClass): # type: ignore[misc]
        is_bonded = await self.dut.aio.security_storage.IsBonded(random=ref1.random)
        assert_false(is_bonded.value, "")

    @asynchronous
    async def test_mitm_sec_req_on_enc(self) -> None:
        if isinstance(self.ref, BumblePandoraDevice):
            io_capability = PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
            self.ref.server_config.io_capability = io_capability

        advertisement = self.ref.aio.host.Advertise(
            legacy=True,
            connectable=True,
            own_address_type=RANDOM,
            data=DataTypes(manufacturer_specific_data=b'pause cafe'),
        )

        scan = self.dut.aio.host.Scan(own_address_type=RANDOM)
        ref = await anext((x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data))
        scan.cancel()

        asyncio.create_task(self.handle_pairing_events())
        (dut_ref_res, ref_dut_res) = await asyncio.gather(
            self.dut.aio.host.ConnectLE(own_address_type=RANDOM, **ref.address_asdict()),
            anext(aiter(advertisement)),
        )

        advertisement.cancel()
        ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection
        assert_is_not_none(dut_ref)
        assert dut_ref

        # Pair with MITM requirements
        (secure, wait_security) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
            self.ref.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
        )

        assert_equal(secure.result_variant(), 'success')
        assert_equal(wait_security.result_variant(), 'success')

        # Disconnect
        await asyncio.gather(
            self.ref.aio.host.Disconnect(connection=ref_dut),
            self.dut.aio.host.WaitDisconnection(connection=dut_ref),
        )

        advertisement = self.ref.aio.host.Advertise(
            legacy=True,
            connectable=True,
            own_address_type=RANDOM,
            data=DataTypes(manufacturer_specific_data=b'pause cafe'),
        )

        scan = self.dut.aio.host.Scan(own_address_type=RANDOM)
        ref = await anext((x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data))
        scan.cancel()

        (dut_ref_res, ref_dut_res) = await asyncio.gather(
            self.dut.aio.host.ConnectLE(own_address_type=RANDOM, **ref.address_asdict()),
            anext(aiter(advertisement)),
        )
        ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection

        # Wait for the link to get encrypted
        connection = self.ref.device.lookup_connection(int.from_bytes(ref_dut.cookie.value, 'big'))

        def on_connection_encryption_change():
            self.ref.device.smp_manager.request_pairing(connection)

        connection.on('connection_encryption_change', on_connection_encryption_change)

        # Fail if repairing is initiated
        fut = asyncio.get_running_loop().create_future()

        class Session(smp.Session):

            def on_smp_pairing_request_command(self, command: smp.SMP_Pairing_Request_Command) -> None:
                nonlocal fut
                fut.set_result(False)

        self.ref.device.smp_session_proxy = Session

        # Pass if the link is encrypted again
        def on_connection_encryption_key_refresh():
            nonlocal fut
            fut.set_result(True)

        connection.on('connection_encryption_key_refresh', on_connection_encryption_key_refresh)

        assert_true(await fut, "Repairing initiated")


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)