Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9c62e210 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Gatt: Add tests to discover included services" into main

parents c79ea218 9dbd7e52
Loading
Loading
Loading
Loading
+46 −22
Original line number Diff line number Diff line
@@ -19,7 +19,8 @@ import logging

from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
from bumble import pandora as bumble_server
from bumble.gatt import Characteristic, Service
from bumble.gatt import (Characteristic, Service, GATT_VOLUME_CONTROL_SERVICE, GATT_AUDIO_INPUT_CONTROL_SERVICE,
                         GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, UUID)
from bumble.l2cap import L2CAP_Control_Frame
from bumble.pairing import PairingConfig
from bumble_experimental.gatt import GATTService
@@ -47,8 +48,7 @@ class GattTest(base_test.BaseTestClass): # type: ignore[misc]
    def setup_class(self) -> None:
        # Register experimental bumble servicers hook.
        bumble_server.register_servicer_hook(
            lambda bumble, _, server: add_GATTServicer_to_server(GATTService(bumble.device), server)
        )
            lambda bumble, _, server: add_GATTServicer_to_server(GATTService(bumble.device), server))

        self.devices = PandoraDevices(self)
        self.dut, self.ref, *_ = self.devices
@@ -99,9 +99,8 @@ class GattTest(base_test.BaseTestClass): # type: ignore[misc]
            connectable=True,
        )

        dut_connection_to_ref = (
            await self.dut.aio.host.ConnectLE(public=self.ref.address, own_address_type=RANDOM)
        ).connection
        dut_connection_to_ref = (await self.dut.aio.host.ConnectLE(public=self.ref.address,
                                                                   own_address_type=RANDOM)).connection
        assert_is_not_none(dut_connection_to_ref)
        assert dut_connection_to_ref

@@ -135,8 +134,7 @@ class GattTest(base_test.BaseTestClass): # type: ignore[misc]
        self.ref.device.add_service(service)  # type:ignore
        # disable MITM requirement on REF side (since it only does just works)
        self.ref.device.pairing_config_factory = lambda _: PairingConfig(  # type:ignore
            sc=True, mitm=False, bonding=True
        )
            sc=True, mitm=False, bonding=True)
        # manually handle pairing on the DUT side
        dut_pairing_events = self.dut.aio.security.OnPairing()
        # set up connection
@@ -264,8 +262,7 @@ class GattTest(base_test.BaseTestClass): # type: ignore[misc]
        connection = self.ref.device.lookup_connection(int.from_bytes(ref_dut.cookie.value, 'big'))
        assert connection

        connection_request = L2CAP_Control_Frame.from_bytes(
            (
        connection_request = L2CAP_Control_Frame.from_bytes((
            b"\x17"  # code of L2CAP_CREDIT_BASED_CONNECTION_REQ
            b"\x01"  # identifier
            b"\x0a\x00"  # data length
@@ -274,19 +271,46 @@ class GattTest(base_test.BaseTestClass): # type: ignore[misc]
            b"\x64\x00"  # MPS
            b"\x64\x00"  # initial credit
            b"\x40\x00"  # source cid[0]
            )
        )
        ))

        fut = asyncio.get_running_loop().create_future()
        setattr(self.ref.device.l2cap_channel_manager, "on_[0x18]", lambda _, _1, frame: fut.set_result(frame))
        self.ref.device.l2cap_channel_manager.send_control_frame(  # type:ignore
            connection, 0x05, connection_request
        )
            connection, 0x05, connection_request)
        control_frame = await fut

        assert_equal(bytes(control_frame)[10], 0x05)  # All connections refused – insufficient authentication
        assert_true(await is_connected(self.ref, ref_dut), "Device is no longer connected")

    @avatar.parameterized(
        ('primary_service',),
        ('secondary_service',),
    )
    def test_discover_included_service(self, attribute_type: str) -> None:
        PRIMARY_SERVICE_UUID = GATT_VOLUME_CONTROL_SERVICE
        INCLUDED_SERVICE_UUID = GATT_AUDIO_INPUT_CONTROL_SERVICE

        is_primary_service = True if attribute_type == 'primary_service' else False
        included_service = Service(INCLUDED_SERVICE_UUID, [], primary=is_primary_service)
        primary_service = Service(PRIMARY_SERVICE_UUID, [], included_services=[included_service])
        self.ref.device.add_service(included_service)  # type: ignore
        self.ref.device.add_service(primary_service)  # type: ignore

        advertise = self.ref.host.Advertise(legacy=True, connectable=True)
        dut_ref_connection = self.dut.host.ConnectLE(public=self.ref.address, own_address_type=RANDOM).connection
        assert dut_ref_connection
        advertise.cancel()  # type: ignore

        dut_gatt = GATT(self.dut.channel)  # type: ignore
        services = dut_gatt.DiscoverServices(dut_ref_connection).services

        filtered_services = [service for service in services if service.uuid == PRIMARY_SERVICE_UUID]
        assert len(filtered_services) == 1
        primary_service = filtered_services[0]

        included_services_uuids = [included_service.uuid for included_service in primary_service.included_services]
        assert_in(INCLUDED_SERVICE_UUID, included_services_uuids)


async def is_connected(device: PandoraDevice, connection: Connection) -> bool:
    try: