Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0dcdecf2 authored by Henri Chataing's avatar Henri Chataing Committed by Gerrit Code Review
Browse files

Merge "a2dp: Implement first a2dp avatar test" into main

parents 25a910f4 e9d9e863
Loading
Loading
Loading
Loading
+7 −0
Original line number Original line Diff line number Diff line
@@ -161,6 +161,11 @@ class A2dp(val context: Context) : A2DPImplBase(), Closeable {
                throw RuntimeException("Device is not connected, cannot start")
                throw RuntimeException("Device is not connected, cannot start")
            }
            }


            // Configure the selected device as active device if it is not
            // already.
            bluetoothA2dp.setActiveDevice(device)

            // Play an audio track.
            audioTrack!!.play()
            audioTrack!!.play()


            // If A2dp is not already playing, wait for it
            // If A2dp is not already playing, wait for it
@@ -281,6 +286,7 @@ class A2dp(val context: Context) : A2DPImplBase(), Closeable {
                    )
                    )
                }
                }
            }
            }

            override fun onError(t: Throwable) {
            override fun onError(t: Throwable) {
                t.printStackTrace()
                t.printStackTrace()
                val sw = StringWriter()
                val sw = StringWriter()
@@ -289,6 +295,7 @@ class A2dp(val context: Context) : A2DPImplBase(), Closeable {
                    Status.UNKNOWN.withCause(t).withDescription(sw.toString()).asException()
                    Status.UNKNOWN.withCause(t).withDescription(sw.toString()).asException()
                )
                )
            }
            }

            override fun onCompleted() {
            override fun onCompleted() {
                responseObserver.onNext(PlaybackAudioResponse.getDefaultInstance())
                responseObserver.onNext(PlaybackAudioResponse.getDefaultInstance())
                responseObserver.onCompleted()
                responseObserver.onCompleted()
+257 −0
Original line number Original line Diff line number Diff line
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import avatar
import itertools
import logging
import math
import numpy as np
import os

from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
from bumble import avdtp
from bumble.a2dp import (
    A2DP_SBC_CODEC_TYPE,
    SBC_DUAL_CHANNEL_MODE,
    SBC_JOINT_STEREO_CHANNEL_MODE,
    SBC_LOUDNESS_ALLOCATION_METHOD,
    SBC_MONO_CHANNEL_MODE,
    SBC_SNR_ALLOCATION_METHOD,
    SBC_STEREO_CHANNEL_MODE,
    SbcMediaCodecInformation,
    make_audio_sink_service_sdp_records,
)
from bumble.avdtp import (
    AVDTP_AUDIO_MEDIA_TYPE,
    AVDTP_OPEN_STATE,
    AVDTP_STREAMING_STATE,
    Listener,
    MediaCodecCapabilities,
)
from bumble.pairing import PairingDelegate
from mobly import base_test, test_runner
from mobly.asserts import assert_equal  # type: ignore
from mobly.asserts import assert_in  # type: ignore
from mobly.asserts import assert_is_none  # type: ignore
from mobly.asserts import assert_is_not_none  # type: ignore
from mobly.asserts import fail  # type: ignore
from pandora.a2dp_grpc_aio import A2DP
from pandora.a2dp_pb2 import PlaybackAudioRequest, Sink, Source
from pandora.host_pb2 import Connection
from pandora.security_pb2 import LEVEL2
from threading import Thread
from typing import Optional


async def initiate_pairing(device, address) -> Connection:
    """Connect and pair a remote device."""

    result = await device.aio.host.Connect(address=address)
    connection = result.connection
    assert connection

    bond = await device.aio.security.Secure(connection=connection, classic=LEVEL2)
    assert bond.success

    return connection


async def accept_pairing(device, address) -> Connection:
    """Accept connection and pairing from a remote device."""

    result = await device.aio.host.WaitConnection(address=address)
    connection = result.connection
    assert connection

    bond = await device.aio.security.WaitSecurity(connection=connection, classic=LEVEL2)
    assert bond.success

    return connection


async def open_source(device, connection) -> Source:
    """Initiate AVDTP connection from Android device."""

    result = await device.a2dp.OpenSource(connection=connection)
    source = result.source
    assert source

    return source


def codec_capabilities():
    """Codec capabilities for the Bumble sink devices."""

    return MediaCodecCapabilities(
        media_type=AVDTP_AUDIO_MEDIA_TYPE,
        media_codec_type=A2DP_SBC_CODEC_TYPE,
        media_codec_information=SbcMediaCodecInformation.from_lists(
            sampling_frequencies=[48000, 44100, 32000, 16000],
            channel_modes=[
                SBC_MONO_CHANNEL_MODE,
                SBC_DUAL_CHANNEL_MODE,
                SBC_STEREO_CHANNEL_MODE,
                SBC_JOINT_STEREO_CHANNEL_MODE,
            ],
            block_lengths=[4, 8, 12, 16],
            subbands=[4, 8],
            allocation_methods=[
                SBC_LOUDNESS_ALLOCATION_METHOD,
                SBC_SNR_ALLOCATION_METHOD,
            ],
            minimum_bitpool_value=2,
            maximum_bitpool_value=53,
        ),
    )


class AudioSignal:
    """Audio signal generator and verifier."""

    SINE_FREQUENCY = 440
    SINE_DURATION = 0.1

    def __init__(self, a2dp: A2DP, source: Source, amplitude, fs):
        """Init AudioSignal class.

        Args:
            a2dp: A2DP profile interface.
            source: Source connection object to send the data to.
            amplitude: amplitude of the signal to generate.
            fs: sampling rate of the signal to generate.
        """
        self.a2dp = a2dp
        self.source = source
        self.amplitude = amplitude
        self.fs = fs
        self.task = None

    def start(self):
        """Generates the audio signal and send it to the transport."""
        self.task = asyncio.create_task(self._run())

    async def _run(self):
        sine = self._generate_sine(self.SINE_FREQUENCY, self.SINE_DURATION)

        # Interleaved audio.
        stereo = np.zeros(sine.size * 2, dtype=sine.dtype)
        stereo[0::2] = sine

        # Send 4 second of audio.
        audio = itertools.repeat(stereo.tobytes(), int(4 / self.SINE_DURATION))

        for frame in audio:
            await self.a2dp.PlaybackAudio(PlaybackAudioRequest(data=frame, source=self.source))

    def _generate_sine(self, f, duration):
        sine = self.amplitude * np.sin(2 * np.pi * np.arange(self.fs * duration) * (f / self.fs))
        s16le = (sine * 32767).astype('<i2')
        return s16le


class A2dpTest(base_test.BaseTestClass):  # type: ignore[misc]
    """A2DP test suite."""

    devices: Optional[PandoraDevices] = None

    # pandora devices.
    dut: PandoraDevice
    ref1: PandoraDevice
    ref2: PandoraDevice

    @avatar.asynchronous
    async def setup_class(self) -> None:
        self.devices = PandoraDevices(self)
        self.dut, self.ref1, self.ref2, *_ = self.devices

        if not isinstance(self.ref1, BumblePandoraDevice):
            raise signals.TestAbortClass('Test require Bumble as reference device(s)')
        if not isinstance(self.ref2, BumblePandoraDevice):
            raise signals.TestAbortClass('Test require Bumble as reference device(s)')

        # Enable BR/EDR mode and SSP for Bumble devices.
        for device in self.devices:
            if isinstance(device, BumblePandoraDevice):
                device.config.setdefault('classic_enabled', True)
                device.config.setdefault('classic_ssp_enabled', True)
                device.config.setdefault('classic_smp_enabled', False)
                device.server_config.io_capability = PairingDelegate.NO_OUTPUT_NO_INPUT

        await asyncio.gather(self.dut.reset(), self.ref1.reset(), self.ref2.reset())

        self.dut.a2dp = A2DP(channel=self.dut.aio.channel)

        handle = 0x00010001
        self.ref1.device.sdp_service_records = {handle: make_audio_sink_service_sdp_records(handle)}
        self.ref2.device.sdp_service_records = {handle: make_audio_sink_service_sdp_records(handle)}

        self.ref1.a2dp = Listener.for_device(self.ref1.device)
        self.ref2.a2dp = Listener.for_device(self.ref2.device)
        self.ref1.a2dp_sink = None
        self.ref2.a2dp_sink = None

        def on_ref1_avdtp_connection(server):
            self.ref1.a2dp_sink = server.add_sink(codec_capabilities())

        def on_ref2_avdtp_connection(server):
            self.ref2.a2dp_sink = server.add_sink(codec_capabilities())

        self.ref1.a2dp.on('connection', on_ref1_avdtp_connection)
        self.ref2.a2dp.on('connection', on_ref2_avdtp_connection)

    def teardown_class(self) -> None:
        if self.devices:
            self.devices.stop_all()

    @avatar.asynchronous
    async def setup_test(self) -> None:
        pass

    @avatar.asynchronous
    async def test_connect_and_stream(self) -> None:
        """Basic A2DP connection and streaming test.
        This test wants to be a template to be reused for other tests.

        1. Pair and Connect RD1
        2. Start streaming
        3. Check AVDTP status on RD1
        4. Stop streaming
        5. Check AVDTP status on RD1
        """
        # Connect and pair RD1.
        dut_ref1, ref1_dut = await asyncio.gather(
            initiate_pairing(self.dut, self.ref1.address),
            accept_pairing(self.ref1, self.dut.address),
        )

        # Connect AVDTP to RD1.
        dut_ref1_source = await open_source(self.dut, dut_ref1)
        assert_is_not_none(self.ref1.a2dp_sink)
        assert_is_not_none(self.ref1.a2dp_sink.stream)
        assert_in(self.ref1.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE])

        # Start streaming to RD1.
        await self.dut.a2dp.Start(source=dut_ref1_source)
        audio = AudioSignal(self.dut.a2dp, dut_ref1_source, 0.8, 44100)
        assert_equal(self.ref1.a2dp_sink.stream.state, AVDTP_STREAMING_STATE)

        # Stop streaming to RD1.
        await self.dut.a2dp.Suspend(source=dut_ref1_source)
        assert_equal(self.ref1.a2dp_sink.stream.state, AVDTP_OPEN_STATE)


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    test_runner.main()  # type: ignore
+5 −2
Original line number Original line Diff line number Diff line
@@ -13,6 +13,8 @@ from typing import List, Tuple


_BUMBLE_BTSNOOP_FMT = 'bumble_btsnoop_{pid}_{instance}.log'
_BUMBLE_BTSNOOP_FMT = 'bumble_btsnoop_{pid}_{instance}.log'


import a2dp_test

# Import test cases modules.
# Import test cases modules.
import asha_test
import asha_test
import avatar.cases.host_test
import avatar.cases.host_test
@@ -21,14 +23,15 @@ import avatar.cases.le_security_test
import avatar.cases.security_test
import avatar.cases.security_test
import gatt_test
import gatt_test
import hfpclient_test
import hfpclient_test
import sdp_test
import pairing.smp_test as smp_test
import pairing.smp_test as smp_test
import sdp_test


_TEST_CLASSES_LIST = [
_TEST_CLASSES_LIST = [
    avatar.cases.host_test.HostTest,
    avatar.cases.host_test.HostTest,
    avatar.cases.le_host_test.LeHostTest,
    avatar.cases.le_host_test.LeHostTest,
    avatar.cases.security_test.SecurityTest,
    avatar.cases.security_test.SecurityTest,
    avatar.cases.le_security_test.LeSecurityTest,
    avatar.cases.le_security_test.LeSecurityTest,
    a2dp_test.A2dpTest,
    sdp_test.SdpTest,
    sdp_test.SdpTest,
    smp_test.SmpTest,
    smp_test.SmpTest,
    gatt_test.GattTest,
    gatt_test.GattTest,