Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 57c50b56 authored by Duo Ho's avatar Duo Ho Committed by Automerger Merge Worker
Browse files

Merge "Avatar: Add ASHA test - test_music_audio_playback" am: eaef62d1

parents a27b7792 eaef62d1
Loading
Loading
Loading
Loading
+1 −0
Original line number Original line Diff line number Diff line
@@ -35,6 +35,7 @@
    <target_preparer class="com.android.tradefed.targetprep.PythonVirtualenvPreparer">
    <target_preparer class="com.android.tradefed.targetprep.PythonVirtualenvPreparer">
        <option name="dep-module" value="grpcio==1.51.1" />
        <option name="dep-module" value="grpcio==1.51.1" />
        <option name="dep-module" value="cryptography==35" />
        <option name="dep-module" value="cryptography==35" />
        <option name="dep-module" value="numpy" />
    </target_preparer>
    </target_preparer>
    <test class="com.android.tradefed.testtype.mobly.MoblyBinaryHostTest">
    <test class="com.android.tradefed.testtype.mobly.MoblyBinaryHostTest">
        <option name="mobly-par-file-name" value="avatar" />
        <option name="mobly-par-file-name" value="avatar" />
+77 −2
Original line number Original line Diff line number Diff line
@@ -16,7 +16,11 @@ import asyncio
import avatar
import avatar
import enum
import enum
import grpc
import grpc
import inspect
import itertools
import logging
import logging
import math
import numpy as np


from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous
from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous
from bumble import pandora as bumble_server
from bumble import pandora as bumble_server
@@ -28,18 +32,22 @@ from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_false  # type: ignore
from mobly.asserts import assert_false  # type: ignore
from mobly.asserts import assert_in  # type: ignore
from mobly.asserts import assert_in  # type: ignore
from mobly.asserts import assert_is_not_none  # type: ignore
from mobly.asserts import assert_is_not_none  # type: ignore
from mobly.asserts import assert_not_equal  # type: ignore
from mobly.asserts import assert_true  # type: ignore
from mobly.asserts import assert_true  # type: ignore
from pandora._utils import AioStream
from pandora._utils import AioStream, Stream
from pandora.host_pb2 import PUBLIC, RANDOM, AdvertiseResponse, Connection, DataTypes, OwnAddressType, ScanningResponse
from pandora.host_pb2 import PUBLIC, RANDOM, AdvertiseResponse, Connection, DataTypes, OwnAddressType, ScanningResponse
from pandora.security_pb2 import LE_LEVEL3
from pandora.security_pb2 import LE_LEVEL3
from pandora_experimental.asha_grpc_aio import Asha as AioAsha, add_AshaServicer_to_server
from pandora_experimental.asha_grpc_aio import Asha as AioAsha, add_AshaServicer_to_server
from typing import ByteString, List, Optional, Tuple
from pandora_experimental.asha_pb2 import PlaybackAudioRequest
from typing import AsyncIterator, ByteString, List, Optional, Tuple


ASHA_UUID = GATT_ASHA_SERVICE.to_hex_str('-')
ASHA_UUID = GATT_ASHA_SERVICE.to_hex_str('-')
HISYCNID: List[int] = [0x01, 0x02, 0x03, 0x04, 0x5, 0x6, 0x7, 0x8]
HISYCNID: List[int] = [0x01, 0x02, 0x03, 0x04, 0x5, 0x6, 0x7, 0x8]
COMPLETE_LOCAL_NAME: str = "Bumble"
COMPLETE_LOCAL_NAME: str = "Bumble"
AUDIO_SIGNAL_AMPLITUDE = 0.8
AUDIO_SIGNAL_AMPLITUDE = 0.8
AUDIO_SIGNAL_SAMPLING_RATE = 44100
AUDIO_SIGNAL_SAMPLING_RATE = 44100
SINE_FREQUENCY = 440
SINE_DURATION = 0.1




class Ear(enum.IntEnum):
class Ear(enum.IntEnum):
@@ -217,6 +225,24 @@ class AshaTest(base_test.BaseTestClass): # type: ignore[misc]


        return audio_data
        return audio_data


    async def generate_sine(self, connection: Connection) -> AsyncIterator[PlaybackAudioRequest]:
        # generate sine wave audio
        sine = AUDIO_SIGNAL_AMPLITUDE * np.sin(
            2
            * np.pi
            * np.arange(AUDIO_SIGNAL_SAMPLING_RATE * SINE_DURATION)
            * (SINE_FREQUENCY / AUDIO_SIGNAL_SAMPLING_RATE)
        )
        s16le = (sine * 32767).astype('<i2')

        # Interleaved audio.
        stereo = np.zeros(s16le.size * 2, dtype=sine.dtype)
        stereo[0::2] = s16le

        # Send 4 second of audio.
        for _ in range(0, int(4 / SINE_DURATION)):
            yield PlaybackAudioRequest(connection=connection, data=stereo.tobytes())

    @avatar.parameterized(
    @avatar.parameterized(
        (RANDOM, Ear.LEFT),
        (RANDOM, Ear.LEFT),
        (RANDOM, Ear.RIGHT),
        (RANDOM, Ear.RIGHT),
@@ -1026,6 +1052,55 @@ class AshaTest(base_test.BaseTestClass): # type: ignore[misc]
        # ref_left already connected, otherstate = 1
        # ref_left already connected, otherstate = 1
        assert_equal(start_result_right['otherstate'], 1)
        assert_equal(start_result_right['otherstate'], 1)


    @asynchronous
    async def test_music_audio_playback(self) -> None:
        """
        DUT discovers Ref.
        DUT initiates connection to Ref.
        Verify that DUT and Ref are bonded and connected.
        DUT is streaming media to Ref using playback API.
        Verify that Ref has received audio data.
        """

        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
            # DUT initiates connection to ref_device.
            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
            advertisement.cancel()

            return dut_ref, ref_dut

        dut_ref_left, ref_left_dut = await ref_device_connect(self.ref_left, Ear.LEFT)

        # DUT starts pairing with the ref_left
        (secure_left, wait_security_left) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
        )

        assert_equal(secure_left.result_variant(), 'success')
        assert_equal(wait_security_left.result_variant(), 'success')

        dut_asha = AioAsha(self.dut.aio.channel)
        ref_asha = AioAsha(self.ref_left.aio.channel)

        await dut_asha.WaitPeripheral(connection=dut_ref_left)
        await dut_asha.Start(connection=dut_ref_left)

        # Clear audio data before start audio playback testing
        await self.get_audio_data(ref_asha=ref_asha, connection=ref_left_dut, timeout=10)

        generated_audio = self.generate_sine(connection=dut_ref_left)

        _, audio_data = await asyncio.gather(
            dut_asha.PlaybackAudio(generated_audio),
            self.get_audio_data(ref_asha=ref_asha, connection=ref_left_dut, timeout=10),
        )

        assert_not_equal(len(audio_data), 0)
        # TODO(duoho): decode audio_data and verify the content



if __name__ == "__main__":
if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)
    logging.basicConfig(level=logging.DEBUG)