Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 36722675 authored by Abel Lucas's avatar Abel Lucas Committed by Automerger Merge Worker
Browse files

Merge "avatar: rework classic SSP tests" am: 9ae6b903 am: 2ac9625b

parents fc3cd67f 2ac9625b
Loading
Loading
Loading
Loading
+257 −123
Original line number Diff line number Diff line
@@ -13,182 +13,316 @@
# limitations under the License.

import asyncio
import avatar
import itertools
import logging

from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous, parameterized
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.hci import HCI_CENTRAL_ROLE, Address as BumbleAddress
from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
from bumble.hci import HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE
from bumble.pairing import PairingDelegate
from concurrent import futures
from contextlib import suppress
from mobly import base_test, signals, test_runner
from mobly.asserts import assert_equal  # type: ignore
from mobly.asserts import assert_in  # type: ignore
from mobly.asserts import assert_is_not_none  # type: ignore
from mobly.asserts import fail  # type: ignore
from pandora.host_pb2 import RANDOM, DataTypes, OwnAddressType
from pandora.security_pb2 import LEVEL2, PairingEventAnswer
from typing import NoReturn, Optional
from pandora.host_pb2 import Connection
from pandora.security_pb2 import LEVEL2, PairingEventAnswer, SecureResponse, SecurityLevel, WaitSecurityResponse
from typing import Callable, Coroutine, Optional, Tuple

ALL_ROLES = (HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE)
ALL_IO_CAPABILITIES = (
    None,
    PairingDelegate.DISPLAY_OUTPUT_ONLY,
    PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
    PairingDelegate.KEYBOARD_INPUT_ONLY,
    PairingDelegate.NO_OUTPUT_NO_INPUT,
    PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
)


class ClassicSspTest(base_test.BaseTestClass):  # type: ignore[misc]
    '''
    This class aim to test SSP (Secure Simple Pairing) on Classic
    Bluetooth devices.
    '''

    devices: Optional[PandoraDevices] = None

    # pandora devices.
    dut: PandoraDevice
    ref: PandoraDevice

    def setup_class(self) -> None:
    @avatar.asynchronous
    async def setup_class(self) -> None:
        self.devices = PandoraDevices(self)
        self.dut, self.ref, *_ = self.devices

        # Enable BR/EDR mode for Bumble devices.
        # Enable BR/EDR mode and SSP for Bumble devices.
        for device in self.devices:
            if isinstance(device, BumblePandoraDevice):
                device.config.setdefault('classic_enabled', True)
                device.config.setdefault('classic_ssp_enabled', True)
                device.config.setdefault(
                    'server',
                    {
                        'io_capability': 'display_output_and_yes_no_input',
                    },
                )

        await asyncio.gather(self.dut.reset(), self.ref.reset())

    def teardown_class(self) -> None:
        if self.devices:
            self.devices.stop_all()

    @asynchronous
    async def setup_test(self) -> None:
    @avatar.asynchronous
    async def setup_test(self) -> None:  # pytype: disable=wrong-arg-types
        await asyncio.gather(self.dut.reset(), self.ref.reset())

    async def connect_le(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None:
        advertisement = self.dut.aio.host.Advertise(
            legacy=True,
            connectable=True,
            own_address_type=dut_address_type,
            data=DataTypes(manufacturer_specific_data=b'pause cafe'),
        )
    @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES))  # type: ignore[misc]
    @avatar.asynchronous
    async def test_success_initiate_connection_initiate_pairing(
        self,
        ref_io_capability: Optional[PairingDelegate.IoCapability],
        ref_role: Optional[int],
    ) -> None:
        # Override REF IO capability if supported.
        set_io_capability(self.ref, ref_io_capability)

        scan = self.ref.aio.host.Scan(own_address_type=ref_address_type)
        dut = await anext(
            (x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)
        )  # pytype: disable=name-error
        scan.cancel()
        # Connection/pairing task.
        async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
            dut_ref, ref_dut = await make_classic_connection(self.dut, self.ref)
            if ref_role is not None:
                await role_switch(self.ref, ref_dut, ref_role)
            return await authenticate(self.dut, dut_ref, self.ref, ref_dut, LEVEL2)

        (ref_dut_res, dut_ref_res) = await asyncio.gather(
            self.ref.aio.host.ConnectLE(own_address_type=ref_address_type, **dut.address_asdict()),
            anext(aiter(advertisement)),  # pytype: disable=name-error
        # Handle pairing.
        initiator_pairing, acceptor_pairing = await handle_pairing(
            self.dut,
            self.ref,
            connect_and_pair,
        )

        advertisement.cancel()
        ref_dut, _ = ref_dut_res.connection, dut_ref_res.connection
        assert_is_not_none(ref_dut)
        # Assert success.
        assert_equal(initiator_pairing.result_variant(), 'success')
        assert_equal(acceptor_pairing.result_variant(), 'success')

    async def handle_pairing_events(self) -> NoReturn:
        ref_pairing_stream = self.ref.aio.security.OnPairing()
        dut_pairing_stream = self.dut.aio.security.OnPairing()
    @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES))  # type: ignore[misc]
    @avatar.asynchronous
    async def test_success_initiate_connection_accept_pairing(
        self,
        ref_io_capability: Optional[PairingDelegate.IoCapability],
        ref_role: Optional[int],
    ) -> None:
        if not isinstance(self.dut, BumblePandoraDevice):
            raise signals.TestSkip('TODO: Fix rootcanal when both AOSP and Bumble trigger the auth.')

        try:
            while True:
                ref_pairing_event, dut_pairing_event = await asyncio.gather(
                    anext(ref_pairing_stream),
                    anext(dut_pairing_stream),
                )
        # Override REF IO capability if supported.
        set_io_capability(self.ref, ref_io_capability)

                if dut_pairing_event.method_variant() in (
                    'numeric_comparison',
                    'just_works',
                ):
                    assert_in(ref_pairing_event.method_variant(), ('numeric_comparison', 'just_works'))
                    dut_pairing_stream.send_nowait(
                        PairingEventAnswer(
                            event=dut_pairing_event,
                            confirm=True,
                        )
                    )
                    ref_pairing_stream.send_nowait(
                        PairingEventAnswer(
                            event=ref_pairing_event,
                            confirm=True,
                        )
                    )
                elif dut_pairing_event.method_variant() == 'passkey_entry_notification':
                    assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_request')
                    ref_pairing_stream.send_nowait(
                        PairingEventAnswer(
                            event=ref_pairing_event,
                            passkey=dut_pairing_event.passkey_entry_notification,
                        )
        # Connection/pairing task.
        async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
            dut_ref, ref_dut = await make_classic_connection(self.dut, self.ref)
            if ref_role is not None:
                await role_switch(self.ref, ref_dut, ref_role)
            return await authenticate(self.ref, ref_dut, self.dut, dut_ref, LEVEL2)

        # Handle pairing.
        initiator_pairing, acceptor_pairing = await handle_pairing(
            self.dut,
            self.ref,
            connect_and_pair,
        )
                elif dut_pairing_event.method_variant() == 'passkey_entry_request':
                    assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_notification')
                    dut_pairing_stream.send_nowait(
                        PairingEventAnswer(
                            event=dut_pairing_event,
                            passkey=ref_pairing_event.passkey_entry_notification,

        # Assert success.
        assert_equal(initiator_pairing.result_variant(), 'success')
        assert_equal(acceptor_pairing.result_variant(), 'success')

    @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES))  # type: ignore[misc]
    @avatar.asynchronous
    async def test_success_accept_connection_initiate_pairing(
        self,
        ref_io_capability: Optional[PairingDelegate.IoCapability],
        ref_role: Optional[int],
    ) -> None:
        # Override REF IO capability if supported.
        set_io_capability(self.ref, ref_io_capability)

        # Connection/pairing task.
        async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
            ref_dut, dut_ref = await make_classic_connection(self.ref, self.dut)
            if ref_role is not None:
                await role_switch(self.ref, ref_dut, ref_role)
            return await authenticate(self.dut, dut_ref, self.ref, ref_dut, LEVEL2)

        # Handle pairing.
        initiator_pairing, acceptor_pairing = await handle_pairing(
            self.dut,
            self.ref,
            connect_and_pair,
        )

        # Assert success.
        assert_equal(initiator_pairing.result_variant(), 'success')
        assert_equal(acceptor_pairing.result_variant(), 'success')

    @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES))  # type: ignore[misc]
    @avatar.asynchronous
    async def test_success_accept_connection_accept_pairing(
        self,
        ref_io_capability: Optional[PairingDelegate.IoCapability],
        ref_role: Optional[int],
    ) -> None:
        # Override REF IO capability if supported.
        set_io_capability(self.ref, ref_io_capability)

        # Connection/pairing task.
        async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
            ref_dut, dut_ref = await make_classic_connection(self.ref, self.dut)
            if ref_role is not None:
                await role_switch(self.ref, ref_dut, ref_role)
            return await authenticate(self.ref, ref_dut, self.dut, dut_ref, LEVEL2)

        # Handle pairing.
        initiator_pairing, acceptor_pairing = await handle_pairing(
            self.dut,
            self.ref,
            connect_and_pair,
        )

        # Assert success.
        assert_equal(initiator_pairing.result_variant(), 'success')
        assert_equal(acceptor_pairing.result_variant(), 'success')


def set_io_capability(device: PandoraDevice, io_capability: Optional[PairingDelegate.IoCapability]) -> None:
    if io_capability is None:
        return
    if isinstance(device, BumblePandoraDevice):
        # Override Bumble reference device default IO capability.
        device.server_config.io_capability = io_capability
    else:
                    fail("")
        raise signals.TestSkip('Unable to override IO capability on non Bumble device.')

        finally:
            ref_pairing_stream.cancel()
            dut_pairing_stream.cancel()

    @parameterized(
        *itertools.product(
            (PairingDelegate.NO_OUTPUT_NO_INPUT,),
            (HCI_CENTRAL_ROLE,),
            (RANDOM,),

# Connection task.
async def make_classic_connection(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]:
    '''Connect two device and returns both connection tokens.'''

    (connect, wait_connection) = await asyncio.gather(
        initiator.aio.host.Connect(address=acceptor.address),
        acceptor.aio.host.WaitConnection(address=initiator.address),
    )
    )  # type: ignore[misc]
    @asynchronous
    async def test_classic_pairing_incoming(
        self, ref_io_capability: PairingDelegate.IoCapability, ref_role: int, ref_le_addr_type: OwnAddressType
    ) -> None:
        if not isinstance(self.ref, BumblePandoraDevice):
            raise signals.TestSkip('Test require Bumble as reference device(s)')

        # override reference device IO capability
        self.ref.server_config.io_capability = ref_io_capability
    # Assert connection are successful.
    assert_equal(connect.result_variant(), 'connection')
    assert_equal(wait_connection.result_variant(), 'connection')
    assert_is_not_none(connect.connection)
    assert_is_not_none(wait_connection.connection)
    assert connect.connection and wait_connection.connection

        pairing = asyncio.create_task(self.handle_pairing_events())
    # Returns connections.
    return connect.connection, wait_connection.connection

        await self.connect_le(RANDOM, ref_le_addr_type)

        (dut_ref_res, ref_dut_res) = await asyncio.gather(
            self.dut.aio.host.WaitConnection(address=self.ref.address),
            self.ref.aio.host.Connect(address=self.dut.address),
# Pairing task.
async def authenticate(
    initiator: PandoraDevice,
    initiator_connection: Connection,
    acceptor: PandoraDevice,
    acceptor_connection: Connection,
    security_level: SecurityLevel,
) -> Tuple[SecureResponse, WaitSecurityResponse]:
    '''Pair two device and returns both pairing responses.'''

    return await asyncio.gather(
        initiator.aio.security.Secure(connection=initiator_connection, classic=security_level),
        acceptor.aio.security.WaitSecurity(connection=acceptor_connection, classic=security_level),
    )

        assert_equal(ref_dut_res.result_variant(), 'connection')
        assert_equal(dut_ref_res.result_variant(), 'connection')
        ref_dut = ref_dut_res.connection
        dut_ref = dut_ref_res.connection
        assert_is_not_none(ref_dut)
        assert_is_not_none(dut_ref)
        assert ref_dut and dut_ref

        ref_dut_raw = self.ref.device.find_connection_by_bd_addr(
            BumbleAddress(bytes(reversed(self.dut.address)), BumbleAddress.PUBLIC_DEVICE_ADDRESS), BT_BR_EDR_TRANSPORT
        )
        assert_is_not_none(ref_dut_raw)
        assert ref_dut_raw
# Role switch task.
async def role_switch(
    device: PandoraDevice,
    connection: Connection,
    role: int,
) -> None:
    '''Switch role if supported.'''

        if ref_dut_raw.role != ref_role:
            await ref_dut_raw.switch_role(ref_role)
    if not isinstance(device, BumblePandoraDevice):
        return

        (secure, wait_security) = await asyncio.gather(
            self.ref.aio.security.Secure(connection=ref_dut, classic=LEVEL2),
            self.dut.aio.security.WaitSecurity(connection=dut_ref, classic=LEVEL2),
        )
    connection_handle = int.from_bytes(connection.cookie.value, 'big')
    bumble_connection = device.device.lookup_connection(connection_handle)
    assert_is_not_none(bumble_connection)
    assert bumble_connection

        pairing.cancel()
        with suppress(asyncio.CancelledError, futures.CancelledError):
            await pairing
    if bumble_connection.role != role:
        device.log.info(f"Role switch to: {'`CENTRAL`' if role == HCI_CENTRAL_ROLE else '`PERIPHERAL`'}")
        await bumble_connection.switch_role(role)

        assert_equal(secure.result_variant(), 'success')
        assert_equal(wait_security.result_variant(), 'success')

        await asyncio.gather(
            self.dut.aio.host.WaitDisconnection(connection=dut_ref),
            self.ref.aio.host.Disconnect(connection=ref_dut),
        )
# Handle pairing events task.
async def handle_pairing(
    dut: PandoraDevice,
    ref: PandoraDevice,
    connect_and_pair: Callable[[], Coroutine[None, None, Tuple[SecureResponse, WaitSecurityResponse]]],
    confirm: Callable[[bool], bool] = lambda x: x,
    passkey: Callable[[int], int] = lambda x: x,
) -> Tuple[SecureResponse, WaitSecurityResponse]:

    # Listen for pairing event on bot DUT and REF.
    dut_pairing, ref_pairing = dut.aio.security.OnPairing(), ref.aio.security.OnPairing()

    # Start connection/pairing.
    connect_and_pair_task = asyncio.create_task(connect_and_pair())

    try:
        dut_ev = await asyncio.wait_for(anext(dut_pairing), timeout=25.0)
        dut.log.info(f'DUT pairing event: {dut_ev.method_variant()}')

        ref_ev = await asyncio.wait_for(anext(ref_pairing), timeout=3.0)
        ref.log.info(f'REF pairing event: {ref_ev.method_variant()}')

        if dut_ev.method_variant() in ('numeric_comparison', 'just_works'):
            assert_in(ref_ev.method_variant(), ('numeric_comparison', 'just_works'))
            confirm_res = True
            if dut_ev.method_variant() == 'numeric_comparison' and ref_ev.method_variant() == 'numeric_comparison':
                confirm_res = ref_ev.numeric_comparison == dut_ev.numeric_comparison
            confirm_res = confirm(confirm_res)
            dut_pairing.send_nowait(PairingEventAnswer(event=dut_ev, confirm=confirm_res))
            ref_pairing.send_nowait(PairingEventAnswer(event=ref_ev, confirm=confirm_res))

        elif dut_ev.method_variant() == 'passkey_entry_notification':
            assert_equal(ref_ev.method_variant(), 'passkey_entry_request')
            assert_is_not_none(dut_ev.passkey_entry_notification)
            assert dut_ev.passkey_entry_notification is not None
            passkey_res = passkey(dut_ev.passkey_entry_notification)
            ref_pairing.send_nowait(PairingEventAnswer(event=ref_ev, passkey=passkey_res))

        elif dut_ev.method_variant() == 'passkey_entry_request':
            assert_equal(ref_ev.method_variant(), 'passkey_entry_notification')
            assert_is_not_none(ref_ev.passkey_entry_notification)
            assert ref_ev.passkey_entry_notification is not None
            passkey_res = passkey(ref_ev.passkey_entry_notification)
            dut_pairing.send_nowait(PairingEventAnswer(event=dut_ev, passkey=passkey_res))

        else:
            fail("")

    except (asyncio.CancelledError, asyncio.TimeoutError):
        logging.exception('Pairing timed-out.')

    finally:

        try:
            (secure, wait_security) = await asyncio.wait_for(connect_and_pair_task, 15.0)
            logging.info(f'Pairing result: {secure.result_variant()}/{wait_security.result_variant()}')
            return secure, wait_security

        finally:
            dut_pairing.cancel()
            ref_pairing.cancel()


if __name__ == '__main__':