Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4e1612bd authored by Charlie Boutier's avatar Charlie Boutier
Browse files

Pandora: Adapting avatar example test to aosp

Bug: 260774624
Test: avatar_runner example.py config.yml

Change-Id: Ia0420918d366fd463eeeda6f444691b3d1e5126a
parent 2703518e
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -207,7 +207,7 @@ class GAPProxy(ProfileProxy):

        self.host.StartAdvertising(
            own_address_type=OwnAddressType.PUBLIC,
            data=DataTypes(complete_service_class_uuids128=["955798ce-3022-455c-b759-ee8edcd73d1a"],))
            data=DataTypes(incomplete_service_class_uuids128=["955798ce-3022-455c-b759-ee8edcd73d1a"],))
        return "OK"

    @assert_description
+7 −7
Original line number Diff line number Diff line
@@ -492,21 +492,21 @@ class Host(
          val dataTypesRequest = request.data

          if (
            !dataTypesRequest.getIncompleteServiceClassUuids16List().isEmpty() or
              !dataTypesRequest.getIncompleteServiceClassUuids32List().isEmpty() or
              !dataTypesRequest.getIncompleteServiceClassUuids128List().isEmpty()
            !dataTypesRequest.getCompleteServiceClassUuids16List().isEmpty() or
              !dataTypesRequest.getCompleteServiceClassUuids32List().isEmpty() or
              !dataTypesRequest.getCompleteServiceClassUuids128List().isEmpty()
          ) {
            Log.e(TAG, "Incomplete Service Class Uuids not supported")
            Log.e(TAG, "Complete Service Class Uuids not supported")
            throw Status.UNKNOWN.asException()
          }

          for (service_uuid in dataTypesRequest.getCompleteServiceClassUuids16List()) {
          for (service_uuid in dataTypesRequest.getIncompleteServiceClassUuids16List()) {
            advertisingDataBuilder.addServiceUuid(ParcelUuid.fromString(service_uuid))
          }
          for (service_uuid in dataTypesRequest.getCompleteServiceClassUuids32List()) {
          for (service_uuid in dataTypesRequest.getIncompleteServiceClassUuids32List()) {
            advertisingDataBuilder.addServiceUuid(ParcelUuid.fromString(service_uuid))
          }
          for (service_uuid in dataTypesRequest.getCompleteServiceClassUuids128List()) {
          for (service_uuid in dataTypesRequest.getIncompleteServiceClassUuids128List()) {
            advertisingDataBuilder.addServiceUuid(ParcelUuid.fromString(service_uuid))
          }

+126 −33
Original line number Diff line number Diff line
@@ -14,13 +14,15 @@

import avatar
import asyncio
import logging
import grpc
import logging
import mobly

from concurrent import futures
from contextlib import suppress

from mobly import test_runner, base_test
from mobly import base_test, test_runner
from mobly.asserts import *

from bumble.smp import PairingDelegate

@@ -74,8 +76,6 @@ class ExampleTest(base_test.BaseTestClass):
    # the connection still complete.
    @avatar.parameterized([
        (OwnAddressType.PUBLIC, OwnAddressType.PUBLIC),
        (OwnAddressType.PUBLIC, OwnAddressType.RANDOM),
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM),
        (OwnAddressType.RANDOM, OwnAddressType.PUBLIC),
    ])
    def test_le_connect(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType):
@@ -93,9 +93,10 @@ class ExampleTest(base_test.BaseTestClass):
        self.dut.host.SetDiscoverabilityMode(mode=DiscoverabilityMode.NOT_DISCOVERABLE)
        peers = self.ref.host.Inquiry(timeout=3.0)
        try:
            assert not next((x for x in peers if x.address == self.dut.address), None)
            assert_is_none(next((x for x in peers if x.address == self.dut.address), None))
        except grpc.RpcError as e:
            assert e.code() == grpc.StatusCode.DEADLINE_EXCEEDED
            # No peers found; StartInquiry times out
            assert_equal(e.code(), grpc.StatusCode.DEADLINE_EXCEEDED)

    @avatar.parameterized([
        (DiscoverabilityMode.DISCOVERABLE_LIMITED, ),
@@ -104,40 +105,36 @@ class ExampleTest(base_test.BaseTestClass):
    def test_discoverable(self, mode):
        self.dut.host.SetDiscoverabilityMode(mode=mode)
        peers = self.ref.host.Inquiry(timeout=15.0)
        assert next((x for x in peers if x.address == self.dut.address), None)
        assert_is_not_none(next((x for x in peers if x.address == self.dut.address), None))

    @avatar.asynchronous
    async def test_wait_connection(self):
        dut_ref = self.dut.host.WaitConnection(address=self.ref.address)
        ref_dut = await self.ref.host.Connect(address=self.dut.address)
        dut_ref = await dut_ref
        assert ref_dut.connection and dut_ref.connection
        assert_is_not_none(ref_dut.connection)
        assert_is_not_none(dut_ref.connection)

    @avatar.asynchronous
    async def test_wait_any_connection(self):
        dut_ref = self.dut.host.WaitConnection()
        ref_dut = await self.ref.host.Connect(address=self.dut.address)
        dut_ref = await dut_ref
        assert ref_dut.connection and dut_ref.connection
        assert_is_not_none(ref_dut.connection)
        assert_is_not_none(dut_ref.connection)

    def test_scan_response_data(self):
        self.dut.host.StartAdvertising(
            legacy=True,
            data=DataTypes(
                include_shortened_local_name=True,
                tx_power_level=42,
                incomplete_service_class_uuids16=['FDF0']
            ),
            data=DataTypes(include_shortened_local_name=True),
            scan_response_data=DataTypes(include_complete_local_name=True, include_class_of_device=True)
        )

        peers = self.ref.host.Scan()
        scan_response = next((x for x in peers if x.public == self.dut.address))
        assert type(scan_response.data.complete_local_name) == str
        assert type(scan_response.data.shortened_local_name) == str
        assert type(scan_response.data.class_of_device) == int
        assert type(scan_response.data.incomplete_service_class_uuids16[0]) == str
        assert scan_response.data.tx_power_level == 42
        assert_equal(type(scan_response.data.complete_local_name), str)
        assert_equal(type(scan_response.data.shortened_local_name), str)
        assert_equal(type(scan_response.data.class_of_device), int)

    @avatar.parameterized([
        (PairingDelegate.NO_OUTPUT_NO_INPUT, ),
@@ -163,7 +160,7 @@ class ExampleTest(base_test.BaseTestClass):
                    ref_pairing_event = await anext(aiter(on_ref_pairing))

                    if dut_pairing_event.WhichOneof('method') in ('numeric_comparison', 'just_works'):
                        assert ref_pairing_event.WhichOneof('method') in ('numeric_comparison', 'just_works')
                        assert_in(ref_pairing_event.WhichOneof('method'), ('numeric_comparison', 'just_works'))
                        dut_answer_queue.put_nowait(PairingEventAnswer(
                            event=dut_pairing_event,
                            confirm=True,
@@ -173,19 +170,19 @@ class ExampleTest(base_test.BaseTestClass):
                            confirm=True,
                        ))
                    elif dut_pairing_event.WhichOneof('method') == 'passkey_entry_notification':
                        assert ref_pairing_event.WhichOneof('method') == 'passkey_entry_request'
                        assert_equal(ref_pairing_event.WhichOneof('method'), 'passkey_entry_request')
                        ref_answer_queue.put_nowait(PairingEventAnswer(
                            event=ref_pairing_event,
                            passkey=dut_pairing_event.passkey_entry_notification,
                        ))
                    elif dut_pairing_event.WhichOneof('method') == 'passkey_entry_request':
                        assert ref_pairing_event.WhichOneof('method') == 'passkey_entry_notification'
                        assert_equal(ref_pairing_event.WhichOneof('method'), 'passkey_entry_notification')
                        dut_answer_queue.put_nowait(PairingEventAnswer(
                            event=dut_pairing_event,
                            passkey=ref_pairing_event.passkey_entry_notification,
                        ))
                    else:
                        assert False
                        fail()

            finally:
                on_ref_pairing.cancel()
@@ -215,8 +212,6 @@ class ExampleTest(base_test.BaseTestClass):
        (OwnAddressType.PUBLIC, OwnAddressType.PUBLIC, PairingDelegate.DISPLAY_OUTPUT_ONLY),
        (OwnAddressType.PUBLIC, OwnAddressType.PUBLIC, PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT),
        (OwnAddressType.PUBLIC, OwnAddressType.PUBLIC, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT),
        (OwnAddressType.PUBLIC, OwnAddressType.RANDOM, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT),
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT),
        (OwnAddressType.RANDOM, OwnAddressType.PUBLIC, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT),
    ])
    @avatar.asynchronous
@@ -234,9 +229,14 @@ class ExampleTest(base_test.BaseTestClass):
            ref_address = {'random': Address(self.ref.device.random_address)}

        await self.dut.security_storage.DeleteBond(**ref_address)
        await self.dut.host.StartAdvertising(legacy=True, connectable=True, own_address_type=dut_address_type)

        dut = await anext(aiter(self.ref.host.Scan(own_address_type=ref_address_type)))
        await self.dut.host.StartAdvertising(legacy=True, connectable=True, own_address_type=dut_address_type, data=DataTypes(manufacturer_specific_data=b'pause cafe'))
        
        dut = None
        async for peer in aiter(self.ref.host.Scan(own_address_type=ref_address_type)):
            if b'pause cafe' in peer.data.manufacturer_specific_data:
                dut = peer
                break
        assert_is_not_none(dut)
        if dut_address_type in (OwnAddressType.PUBLIC, OwnAddressType.RESOLVABLE_OR_PUBLIC):
            dut_address = {'public': Address(dut.public)}
        else:
@@ -252,7 +252,7 @@ class ExampleTest(base_test.BaseTestClass):
                    ref_pairing_event = await anext(aiter(on_ref_pairing))

                    if dut_pairing_event.WhichOneof('method') in ('numeric_comparison', 'just_works'):
                        assert ref_pairing_event.WhichOneof('method') in ('numeric_comparison', 'just_works')
                        assert_in(ref_pairing_event.WhichOneof('method'), ('numeric_comparison', 'just_works'))
                        dut_answer_queue.put_nowait(PairingEventAnswer(
                            event=dut_pairing_event,
                            confirm=True,
@@ -262,19 +262,19 @@ class ExampleTest(base_test.BaseTestClass):
                            confirm=True,
                        ))
                    elif dut_pairing_event.WhichOneof('method') == 'passkey_entry_notification':
                        assert ref_pairing_event.WhichOneof('method') == 'passkey_entry_request'
                        assert_equal(ref_pairing_event.WhichOneof('method'), 'passkey_entry_request')
                        ref_answer_queue.put_nowait(PairingEventAnswer(
                            event=ref_pairing_event,
                            passkey=dut_pairing_event.passkey_entry_notification,
                        ))
                    elif dut_pairing_event.WhichOneof('method') == 'passkey_entry_request':
                        assert ref_pairing_event.WhichOneof('method') == 'passkey_entry_notification'
                        assert_equal(ref_pairing_event.WhichOneof('method'), 'passkey_entry_notification')
                        dut_answer_queue.put_nowait(PairingEventAnswer(
                            event=dut_pairing_event,
                            passkey=ref_pairing_event.passkey_entry_notification,
                        ))
                    else:
                        assert False
                        fail()

            finally:
                on_ref_pairing.cancel()
@@ -298,6 +298,99 @@ class ExampleTest(base_test.BaseTestClass):
            self.ref.host.WaitDisconnection(connection=ref_dut)
        )

    @avatar.parameterized([
        (OwnAddressType.PUBLIC, OwnAddressType.PUBLIC, PairingDelegate.NO_OUTPUT_NO_INPUT),
        (OwnAddressType.PUBLIC, OwnAddressType.PUBLIC, PairingDelegate.KEYBOARD_INPUT_ONLY),
        (OwnAddressType.PUBLIC, OwnAddressType.PUBLIC, PairingDelegate.DISPLAY_OUTPUT_ONLY),
        (OwnAddressType.PUBLIC, OwnAddressType.PUBLIC, PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT),
        (OwnAddressType.PUBLIC, OwnAddressType.PUBLIC, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT),
        (OwnAddressType.RANDOM, OwnAddressType.PUBLIC, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT),
    ])
    @avatar.asynchronous
    async def test_le_pairing(self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
        ref_io_capability
    ):
        raise mobly.signals.TestSkip("FIXME: Fix OnPairing communication between AOSP & Bumble")
        # override reference device IO capability
        self.ref.device.io_capability = ref_io_capability

        if ref_address_type in (OwnAddressType.PUBLIC, OwnAddressType.RESOLVABLE_OR_PUBLIC):
            ref_address = {'public': self.ref.address}
        else:
            ref_address = {'random': Address(self.ref.device.random_address)}

        await self.dut.security_storage.DeleteBond(**ref_address)
        await self.dut.host.StartAdvertising(legacy=True, connectable=True, own_address_type=dut_address_type, data=DataTypes(manufacturer_specific_data=b'pause cafe'))
        
        dut = None
        async for peer in aiter(self.ref.host.Scan(own_address_type=ref_address_type)):
            if b'pause cafe' in peer.data.manufacturer_specific_data:
                dut = peer
                break
        assert_is_not_none(dut)
        if dut_address_type in (OwnAddressType.PUBLIC, OwnAddressType.RESOLVABLE_OR_PUBLIC):
            dut_address = {'public': Address(dut.public)}
        else:
            dut_address = {'random': Address(dut.random)}

        async def handle_pairing_events():
            on_ref_pairing = self.ref.security.OnPairing((ref_answer_queue := AsyncQueue()))
            on_dut_pairing = self.dut.security.OnPairing((dut_answer_queue := AsyncQueue()))

            try:
                while True:
                    dut_pairing_event = await anext(aiter(on_dut_pairing))
                    ref_pairing_event = await anext(aiter(on_ref_pairing))

                    if dut_pairing_event.WhichOneof('method') in ('numeric_comparison', 'just_works'):
                        assert_in(ref_pairing_event.WhichOneof('method'), ('numeric_comparison', 'just_works'))
                        logging.error('JUST WORK')
                        dut_answer_queue.put_nowait(PairingEventAnswer(
                            event=dut_pairing_event,
                            confirm=True,
                        ))
                        ref_answer_queue.put_nowait(PairingEventAnswer(
                            event=ref_pairing_event,
                            confirm=True,
                        ))
                    elif dut_pairing_event.WhichOneof('method') == 'passkey_entry_notification':
                        assert_equal(ref_pairing_event.WhichOneof('method'), 'passkey_entry_request')
                        ref_answer_queue.put_nowait(PairingEventAnswer(
                            event=ref_pairing_event,
                            passkey=dut_pairing_event.passkey_entry_notification,
                        ))
                    elif dut_pairing_event.WhichOneof('method') == 'passkey_entry_request':
                        assert_equal(ref_pairing_event.WhichOneof('method'), 'passkey_entry_notification')
                        dut_answer_queue.put_nowait(PairingEventAnswer(
                            event=dut_pairing_event,
                            passkey=ref_pairing_event.passkey_entry_notification,
                        ))
                    else:
                        fail()

            finally:
                on_ref_pairing.cancel()
                on_dut_pairing.cancel()

        pairing = asyncio.create_task(handle_pairing_events())
        ref_dut = (await self.ref.host.ConnectLE(own_address_type=ref_address_type, **dut_address)).connection
        dut_ref = (await self.dut.host.WaitLEConnection(**ref_address)).connection

        await asyncio.gather(
            self.ref.security.Secure(connection=ref_dut, le=LESecurityLevel.LE_LEVEL4),
            self.dut.security.WaitSecurity(connection=dut_ref, le=LESecurityLevel.LE_LEVEL3)
        )

        pairing.cancel()
        with suppress(asyncio.CancelledError, futures.CancelledError):
            await pairing

        await asyncio.gather(
            self.dut.host.Disconnect(connection=dut_ref),
            self.ref.host.WaitDisconnection(connection=ref_dut)
        )

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)