Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 27948a51 authored by Jakub Rotkiewicz's avatar Jakub Rotkiewicz
Browse files

avatar: Add AVDTP collision simulation test

Bug: 330246568
Flag: EXEMPT - test only
Test: atest avatar:A2dpTest#test_avdt_signaling_channel_connection_collision

Change-Id: I9878a7b11bf079ddfbe883d505acb3aac20b6c85
parent caf04801
Loading
Loading
Loading
Loading
+127 −14
Original line number Diff line number Diff line
@@ -14,15 +14,14 @@

import asyncio
import avatar
import dataclasses
import itertools
import logging
import math
import numpy as np
import os

from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, pandora
from avatar.pandora_server import AndroidPandoraServer
from bumble import l2cap
import bumble
from bumble.avctp import AVCTP_PSM
from bumble.a2dp import (
    A2DP_SBC_CODEC_TYPE,
@@ -35,29 +34,27 @@ from bumble.a2dp import (
    SbcMediaCodecInformation,
    make_audio_sink_service_sdp_records,
)
from bumble.avdtp import (
    AVDTP_AUDIO_MEDIA_TYPE,
    AVDTP_OPEN_STATE,
    AVDTP_STREAMING_STATE,
    Listener,
    MediaCodecCapabilities,
)
from bumble.avdtp import (AVDTP_AUDIO_MEDIA_TYPE, AVDTP_OPEN_STATE, AVDTP_PSM, AVDTP_STREAMING_STATE, Listener,
                          MediaCodecCapabilities, Protocol)
from bumble.l2cap import (ChannelManager, ClassicChannel, ClassicChannelSpec, L2CAP_Configure_Request,
                          L2CAP_Connection_Response, L2CAP_SIGNALING_CID)
from bumble.pairing import PairingDelegate
from mobly import base_test, test_runner
from mobly.asserts import assert_equal  # type: ignore
from mobly.asserts import assert_in  # type: ignore
from mobly.asserts import assert_is_none  # type: ignore
from mobly.asserts import assert_is_not_none  # type: ignore
from mobly.asserts import fail  # type: ignore
from pandora.a2dp_grpc_aio import A2DP
from pandora.a2dp_pb2 import PlaybackAudioRequest, Sink, Source
from pandora.a2dp_pb2 import PlaybackAudioRequest, Source
from pandora.host_pb2 import Connection
from pandora.security_pb2 import LEVEL2
from threading import Thread
from typing import Optional

logger = logging.getLogger(__name__)

AVRCP_CONNECT_A2DP_WITH_DELAY = 'persist.device_config.aconfig_flags.bluetooth.com.android.bluetooth.flags.avrcp_connect_a2dp_with_delay'


async def initiate_pairing(device, address) -> Connection:
    """Connect and pair a remote device."""

@@ -288,12 +285,128 @@ class A2dpTest(base_test.BaseTestClass): # type: ignore[misc]
        connection = pandora.get_raw_connection(device=self.ref1, connection=ref1_dut)

        # Open AVCTP L2CAP channel
        avctp = await connection.create_l2cap_channel(spec=l2cap.ClassicChannelSpec(AVCTP_PSM))
        avctp = await connection.create_l2cap_channel(spec=ClassicChannelSpec(AVCTP_PSM))
        self.ref1.log.info(f'AVCTP: {avctp}')

        # Wait for AVDTP L2CAP channel
        await asyncio.wait_for(avdtp_future, timeout=10.0)

    @avatar.asynchronous
    async def test_avdt_signaling_channel_connection_collision(self) -> None:
        """Test AVDTP signaling channel connection collision.

        Test steps after DUT and RD1 connected and paired:
        1. RD1 connects DUT over AVDTP - first AVDTP signaling channel
        2. AVDTP signaling channel configuration postponed until DUT tries to initiate AVDTP signaling channel connection
        3. DUT tries connecting RD1 - collision simulated
        4. RD1 rejects AVDTP signaling channel connection request from DUT
        5. RD1 proceeds with first AVDTP signaling channel configuration
        6. Channel established - collision avoided
        """

        @dataclasses.dataclass
        class L2capConfigurationRequest:
            connection: Optional[Connection] = None
            cid: Optional[int] = None
            request: Optional[L2CAP_Configure_Request] = None

        global pending_configuration_request
        pending_configuration_request = L2capConfigurationRequest()

        class TestChannelManager(ChannelManager):

            def __init__(
                self,
                device: BumblePandoraDevice,
            ) -> None:
                super().__init__(
                    device.l2cap_channel_manager.extended_features,
                    device.l2cap_channel_manager.connectionless_mtu,
                )
                self.register_fixed_channel(bumble.smp.SMP_CID, device.on_smp_pdu)
                device.sdp_server.register(self)
                self.register_fixed_channel(bumble.att.ATT_CID, device.on_gatt_pdu)
                self.host = device.host

            def on_l2cap_connection_request(self, connection: Connection, cid: int, request) -> None:
                global pending_configuration_request
                if (request.psm == AVDTP_PSM and pending_configuration_request is not None):
                    logger.info("<< 4. RD1 rejects AVDTP connection request from DUT >>")
                    self.send_control_frame(
                        connection, cid,
                        L2CAP_Connection_Response(
                            identifier=request.identifier,
                            destination_cid=0,
                            source_cid=request.source_cid,
                            result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
                            status=0x0000,
                        ))
                    logger.info("<< 5. RD1 proceeds with first AVDTP channel configuration >>")
                    chan_connection = pending_configuration_request.connection
                    chan_cid = pending_configuration_request.cid
                    chan_request = pending_configuration_request.request
                    pending_configuration_request = None
                    super().on_control_frame(connection=chan_connection, cid=chan_cid, control_frame=chan_request)
                    return
                super().on_l2cap_connection_request(connection, cid, request)

        class TestClassicChannel(ClassicChannel):

            def on_connection_response(self, response):
                assert self.state == self.State.WAIT_CONNECT_RSP
                assert response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL, f"Connection response: {response}"
                self.destination_cid = response.destination_cid
                self._change_state(self.State.WAIT_CONFIG)
                logger.info("<< 2. RD1 connected DUT, configuration postponed >>")

            def on_configure_request(self, request) -> None:
                global pending_configuration_request
                if (pending_configuration_request is not None):
                    logger.info("<< 3. Block RD1 until DUT tries AVDTP channel connection >>")
                    pending_configuration_request.connection = self.connection
                    pending_configuration_request.cid = self.source_cid
                    pending_configuration_request.request = request
                else:
                    super().on_configure_request(request)

        # Override L2CAP Channel Manager to control signaling
        self.ref1.device.l2cap_channel_manager = TestChannelManager(self.ref1.device)

        # Connect and pair DUT -> RD1.
        dut_ref1, ref1_dut = await asyncio.gather(
            initiate_pairing(self.dut, self.ref1.address),
            accept_pairing(self.ref1, self.dut.address),
        )

        # Retrieve Bumble connection object from Pandora connection token
        connection = pandora.get_raw_connection(device=self.ref1, connection=ref1_dut)
        # Find a free CID for a new channel
        connection_channels = self.ref1.device.l2cap_channel_manager.channels.setdefault(connection.handle, {})
        source_cid = self.ref1.device.l2cap_channel_manager.find_free_br_edr_cid(connection_channels)
        assert source_cid is not None, "source_cid is None"

        spec = ClassicChannelSpec(AVDTP_PSM)
        channel = TestClassicChannel(
            self.ref1.device.l2cap_channel_manager,
            connection,
            L2CAP_SIGNALING_CID,
            AVDTP_PSM,
            source_cid,
            spec.mtu,
        )
        connection_channels[source_cid] = channel

        logger.info("<< 1. RD1 connects DUT over AVDTP - first channel >>")
        await channel.connect()
        logger.info(f"<< 6. Channel established: {channel} >>")
        assert channel.state == ClassicChannel.State.OPEN

        # Initiate AVDTP with connected L2CAP signaling channel
        protocol = Protocol(channel)
        protocol.add_sink(codec_capabilities())
        logger.info("<< Test finished! >>")


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    test_runner.main()  # type: ignore