Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c30a99c7 authored by Abel Lucas's avatar Abel Lucas Committed by Automerger Merge Worker
Browse files

Merge "Add Avatar test for b/271052144" am: e6719ebe am: abfbc9db am: dabe2e56

parents 5a2b7c39 dabe2e56
Loading
Loading
Loading
Loading
+91 −5
Original line number Diff line number Diff line
@@ -15,11 +15,16 @@
import asyncio
import logging

from avatar import PandoraDevice, PandoraDevices, asynchronous
from avatar import BumbleDevice, PandoraDevice, PandoraDevices, asynchronous
from bumble.gatt import Characteristic, Service
from bumble.smp import PairingConfig
from mobly import base_test, test_runner
from pandora.host_pb2 import RANDOM, DataTypes
from pandora.host_pb2 import RANDOM, Connection, DataTypes
from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer, SecureResponse
from pandora_experimental.gatt_grpc import GATT
from typing import Optional
from pandora_experimental.gatt_grpc_aio import GATT as AioGATT
from pandora_experimental.gatt_pb2 import SUCCESS, ReadCharacteristicsFromUuidResponse
from typing import Optional, Tuple


class GattTest(base_test.BaseTestClass):  # type: ignore[misc]
@@ -27,11 +32,13 @@ class GattTest(base_test.BaseTestClass): # type: ignore[misc]

    # pandora devices.
    dut: PandoraDevice
    ref: PandoraDevice
    ref: BumbleDevice

    def setup_class(self) -> None:
        self.devices = PandoraDevices(self)
        self.dut, self.ref, *_ = self.devices
        dut, ref = self.devices
        assert isinstance(ref, BumbleDevice)
        self.dut, self.ref = dut, ref

    def teardown_class(self) -> None:
        if self.devices:
@@ -71,6 +78,85 @@ class GattTest(base_test.BaseTestClass): # type: ignore[misc]
        services = gatt.DiscoverServices(ref_dut)
        self.ref.log.info(f'REF services: {services}')

    @asynchronous
    async def test_read_characteristic_while_pairing(self) -> None:
        async def connect_dut_to_ref() -> Tuple[Connection, Connection]:
            ref_advertisement = self.ref.aio.host.Advertise(
                legacy=True,
                connectable=True,
            )

            dut_connection_to_ref = (
                await self.dut.aio.host.ConnectLE(public=self.ref.address, own_address_type=RANDOM)
            ).connection
            assert dut_connection_to_ref

            ref_connection_to_dut = (await anext(aiter(ref_advertisement))).connection
            ref_advertisement.cancel()

            return dut_connection_to_ref, ref_connection_to_dut

        # arrange: set up GATT service on REF side with a characteristic
        # that can only be read after pairing
        SERVICE_UUID = "00005a00-0000-1000-8000-00805f9b34fb"
        CHARACTERISTIC_UUID = "00006a00-0000-1000-8000-00805f9b34fb"
        service = Service(
            SERVICE_UUID,
            [
                Characteristic(
                    CHARACTERISTIC_UUID,
                    Characteristic.READ,
                    Characteristic.READ_REQUIRES_ENCRYPTION,
                    b"Hello, world!",
                ),
            ],
        )
        self.ref.device.add_service(service)  # type:ignore
        # disable MITM requirement on REF side (since it only does just works)
        self.ref.device.pairing_config_factory = lambda _: PairingConfig(
            sc=True, mitm=False, bonding=True
        )  # type: ignore
        # manually handle pairing on the DUT side
        dut_pairing_events = self.dut.aio.security.OnPairing()
        # set up connection
        dut_connection_to_ref, ref_connection_to_dut = await connect_dut_to_ref()

        # act: initiate pairing from REF side (send a security request)
        async def ref_secure() -> SecureResponse:
            return await self.ref.aio.security.Secure(connection=ref_connection_to_dut, le=LE_LEVEL3)

        ref_secure_task = asyncio.create_task(ref_secure())

        # wait for pairing to start
        event = await anext(dut_pairing_events)

        # before acknowledging pairing, start a GATT read
        dut_gatt = AioGATT(self.dut.aio.channel)

        async def dut_read() -> ReadCharacteristicsFromUuidResponse:
            return await dut_gatt.ReadCharacteristicsFromUuid(dut_connection_to_ref, CHARACTERISTIC_UUID, 1, 0xFFFF)

        dut_read_task = asyncio.create_task(dut_read())

        await asyncio.sleep(3)

        # now continue with pairing
        dut_pairing_events.send_nowait(PairingEventAnswer(event=event, confirm=True))

        # android pops up a second pairing notification for some reason, accept it
        event = await anext(dut_pairing_events)
        dut_pairing_events.send_nowait(PairingEventAnswer(event=event, confirm=True))

        # assert: that the read succeeded (so Android re-tried the read after pairing)
        read_response = await dut_read_task
        self.ref.log.info(read_response)
        assert read_response.characteristics_read[0].status == SUCCESS
        assert read_response.characteristics_read[0].value.value == b"Hello, world!"

        # make sure pairing was successful
        ref_secure_res = await ref_secure_task
        assert ref_secure_res.result_variant() == 'success'


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)