Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit eaef62d1 authored by Duo Ho's avatar Duo Ho Committed by Gerrit Code Review
Browse files

Merge "Avatar: Add ASHA test - test_music_audio_playback"

parents 52cf6c59 c63c27f9
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@
    <target_preparer class="com.android.tradefed.targetprep.PythonVirtualenvPreparer">
        <option name="dep-module" value="grpcio==1.51.1" />
        <option name="dep-module" value="cryptography==35" />
        <option name="dep-module" value="numpy" />
    </target_preparer>
    <test class="com.android.tradefed.testtype.mobly.MoblyBinaryHostTest">
        <option name="mobly-par-file-name" value="avatar" />
+77 −2
Original line number Diff line number Diff line
@@ -16,7 +16,11 @@ import asyncio
import avatar
import enum
import grpc
import inspect
import itertools
import logging
import math
import numpy as np

from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous
from bumble import pandora as bumble_server
@@ -28,18 +32,22 @@ from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_false  # type: ignore
from mobly.asserts import assert_in  # type: ignore
from mobly.asserts import assert_is_not_none  # type: ignore
from mobly.asserts import assert_not_equal  # type: ignore
from mobly.asserts import assert_true  # type: ignore
from pandora._utils import AioStream
from pandora._utils import AioStream, Stream
from pandora.host_pb2 import PUBLIC, RANDOM, AdvertiseResponse, Connection, DataTypes, OwnAddressType, ScanningResponse
from pandora.security_pb2 import LE_LEVEL3
from pandora_experimental.asha_grpc_aio import Asha as AioAsha, add_AshaServicer_to_server
from typing import ByteString, List, Optional, Tuple
from pandora_experimental.asha_pb2 import PlaybackAudioRequest
from typing import AsyncIterator, ByteString, List, Optional, Tuple

ASHA_UUID = GATT_ASHA_SERVICE.to_hex_str('-')
HISYCNID: List[int] = [0x01, 0x02, 0x03, 0x04, 0x5, 0x6, 0x7, 0x8]
COMPLETE_LOCAL_NAME: str = "Bumble"
AUDIO_SIGNAL_AMPLITUDE = 0.8
AUDIO_SIGNAL_SAMPLING_RATE = 44100
SINE_FREQUENCY = 440
SINE_DURATION = 0.1


class Ear(enum.IntEnum):
@@ -217,6 +225,24 @@ class AshaTest(base_test.BaseTestClass): # type: ignore[misc]

        return audio_data

    async def generate_sine(self, connection: Connection) -> AsyncIterator[PlaybackAudioRequest]:
        # generate sine wave audio
        sine = AUDIO_SIGNAL_AMPLITUDE * np.sin(
            2
            * np.pi
            * np.arange(AUDIO_SIGNAL_SAMPLING_RATE * SINE_DURATION)
            * (SINE_FREQUENCY / AUDIO_SIGNAL_SAMPLING_RATE)
        )
        s16le = (sine * 32767).astype('<i2')

        # Interleaved audio.
        stereo = np.zeros(s16le.size * 2, dtype=sine.dtype)
        stereo[0::2] = s16le

        # Send 4 second of audio.
        for _ in range(0, int(4 / SINE_DURATION)):
            yield PlaybackAudioRequest(connection=connection, data=stereo.tobytes())

    @avatar.parameterized(
        (RANDOM, Ear.LEFT),
        (RANDOM, Ear.RIGHT),
@@ -1026,6 +1052,55 @@ class AshaTest(base_test.BaseTestClass): # type: ignore[misc]
        # ref_left already connected, otherstate = 1
        assert_equal(start_result_right['otherstate'], 1)

    @asynchronous
    async def test_music_audio_playback(self) -> None:
        """
        DUT discovers Ref.
        DUT initiates connection to Ref.
        Verify that DUT and Ref are bonded and connected.
        DUT is streaming media to Ref using playback API.
        Verify that Ref has received audio data.
        """

        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
            # DUT initiates connection to ref_device.
            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
            advertisement.cancel()

            return dut_ref, ref_dut

        dut_ref_left, ref_left_dut = await ref_device_connect(self.ref_left, Ear.LEFT)

        # DUT starts pairing with the ref_left
        (secure_left, wait_security_left) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
        )

        assert_equal(secure_left.result_variant(), 'success')
        assert_equal(wait_security_left.result_variant(), 'success')

        dut_asha = AioAsha(self.dut.aio.channel)
        ref_asha = AioAsha(self.ref_left.aio.channel)

        await dut_asha.WaitPeripheral(connection=dut_ref_left)
        await dut_asha.Start(connection=dut_ref_left)

        # Clear audio data before start audio playback testing
        await self.get_audio_data(ref_asha=ref_asha, connection=ref_left_dut, timeout=10)

        generated_audio = self.generate_sine(connection=dut_ref_left)

        _, audio_data = await asyncio.gather(
            dut_asha.PlaybackAudio(generated_audio),
            self.get_audio_data(ref_asha=ref_asha, connection=ref_left_dut, timeout=10),
        )

        assert_not_equal(len(audio_data), 0)
        # TODO(duoho): decode audio_data and verify the content


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)