Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0861bbd1 authored by Jakub Rotkiewicz's avatar Jakub Rotkiewicz
Browse files

avatar: Add codec reconfiguration test

Bug: 372300895
Flag: EXEMPT - test only
Test: atest avatar:'A2dpTest#test_codec_reconfiguration' -v
Change-Id: Ide1968078dfb93bd253762260dfdd92ad32a3188
parent 27948a51
Loading
Loading
Loading
Loading
+306 −6
Original line number Diff line number Diff line
@@ -18,6 +18,9 @@ package com.android.pandora

import android.bluetooth.BluetoothA2dp
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothCodecConfig
import android.bluetooth.BluetoothCodecStatus
import android.bluetooth.BluetoothCodecType
import android.bluetooth.BluetoothManager
import android.bluetooth.BluetoothProfile
import android.content.Context
@@ -27,6 +30,7 @@ import android.media.*
import android.util.Log
import com.google.protobuf.BoolValue
import com.google.protobuf.ByteString
import com.google.protobuf.Empty
import io.grpc.Status
import io.grpc.stub.StreamObserver
import java.io.Closeable
@@ -42,6 +46,7 @@ import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.withTimeoutOrNull
import pandora.A2DPGrpc.A2DPImplBase
import pandora.A2DPProto.*

@@ -65,6 +70,7 @@ class A2dp(val context: Context) : A2DPImplBase(), Closeable {
        val intentFilter = IntentFilter()
        intentFilter.addAction(BluetoothA2dp.ACTION_PLAYING_STATE_CHANGED)
        intentFilter.addAction(BluetoothA2dp.ACTION_CONNECTION_STATE_CHANGED)
        intentFilter.addAction(BluetoothA2dp.ACTION_CODEC_CONFIG_CHANGED)

        flow = intentFlow(context, intentFilter, scope).shareIn(scope, SharingStarted.Eagerly)
    }
@@ -76,7 +82,7 @@ class A2dp(val context: Context) : A2DPImplBase(), Closeable {

    override fun openSource(
        request: OpenSourceRequest,
        responseObserver: StreamObserver<OpenSourceResponse>
        responseObserver: StreamObserver<OpenSourceResponse>,
    ) {
        grpcUnary<OpenSourceResponse>(scope, responseObserver) {
            val device = request.connection.toBluetoothDevice(bluetoothAdapter)
@@ -114,7 +120,7 @@ class A2dp(val context: Context) : A2DPImplBase(), Closeable {

    override fun waitSource(
        request: WaitSourceRequest,
        responseObserver: StreamObserver<WaitSourceResponse>
        responseObserver: StreamObserver<WaitSourceResponse>,
    ) {
        grpcUnary<WaitSourceResponse>(scope, responseObserver) {
            val device = request.connection.toBluetoothDevice(bluetoothAdapter)
@@ -183,7 +189,7 @@ class A2dp(val context: Context) : A2DPImplBase(), Closeable {

    override fun suspend(
        request: SuspendRequest,
        responseObserver: StreamObserver<SuspendResponse>
        responseObserver: StreamObserver<SuspendResponse>,
    ) {
        grpcUnary<SuspendResponse>(scope, responseObserver) {
            val device = bluetoothAdapter.getRemoteDevice(request.source.cookie.toString("UTF-8"))
@@ -211,7 +217,7 @@ class A2dp(val context: Context) : A2DPImplBase(), Closeable {

    override fun isSuspended(
        request: IsSuspendedRequest,
        responseObserver: StreamObserver<BoolValue>
        responseObserver: StreamObserver<BoolValue>,
    ) {
        grpcUnary(scope, responseObserver) {
            val device = bluetoothAdapter.getRemoteDevice(request.source.cookie.toString("UTF-8"))
@@ -271,7 +277,7 @@ class A2dp(val context: Context) : A2DPImplBase(), Closeable {
                audioManager.setStreamVolume(
                    AudioManager.STREAM_MUSIC,
                    maxVolume,
                    AudioManager.FLAG_SHOW_UI
                    AudioManager.FLAG_SHOW_UI,
                )
            }
        }
@@ -305,7 +311,7 @@ class A2dp(val context: Context) : A2DPImplBase(), Closeable {

    override fun getAudioEncoding(
        request: GetAudioEncodingRequest,
        responseObserver: StreamObserver<GetAudioEncodingResponse>
        responseObserver: StreamObserver<GetAudioEncodingResponse>,
    ) {
        grpcUnary<GetAudioEncodingResponse>(scope, responseObserver) {
            val device = bluetoothAdapter.getRemoteDevice(request.source.cookie.toString("UTF-8"))
@@ -321,4 +327,298 @@ class A2dp(val context: Context) : A2DPImplBase(), Closeable {
                .build()
        }
    }

    override fun getConfiguration(
        request: GetConfigurationRequest,
        responseObserver: StreamObserver<GetConfigurationResponse>,
    ) {
        grpcUnary<GetConfigurationResponse>(scope, responseObserver) {
            val device = request.connection.toBluetoothDevice(bluetoothAdapter)
            Log.i(TAG, "getConfiguration: device=$device")

            if (bluetoothA2dp.getConnectionState(device) != BluetoothA2dp.STATE_CONNECTED) {
                throw RuntimeException("Device is not connected, cannot getConfiguration")
            }

            val codecStatus = bluetoothA2dp.getCodecStatus(device)
            if (codecStatus == null) {
                throw RuntimeException("Codec status is null")
            }

            val currentCodecConfig = codecStatus.getCodecConfig()
            if (currentCodecConfig == null) {
                throw RuntimeException("Codec configuration is null")
            }

            val supportedCodecTypes = bluetoothA2dp.getSupportedCodecTypes()
            val configuration =
                Configuration.newBuilder()
                    .setId(getProtoCodecId(currentCodecConfig, supportedCodecTypes))
                    .setParameters(getProtoCodecParameters(currentCodecConfig))
                    .build()
            GetConfigurationResponse.newBuilder().setConfiguration(configuration).build()
        }
    }

    override fun setConfiguration(
        request: SetConfigurationRequest,
        responseObserver: StreamObserver<SetConfigurationResponse>,
    ) {
        grpcUnary<SetConfigurationResponse>(scope, responseObserver) {
            val timeoutMillis: Long = 5_000L // milliseconds
            val device = request.connection.toBluetoothDevice(bluetoothAdapter)
            Log.i(TAG, "setConfiguration: device=$device")

            if (bluetoothA2dp.getConnectionState(device) != BluetoothA2dp.STATE_CONNECTED) {
                throw RuntimeException("Device is not connected, cannot getCodecStatus")
            }

            val newCodecConfig = getCodecConfigFromProtoConfiguration(request.configuration)
            if (newCodecConfig == null) {
                throw RuntimeException("New codec configuration is null")
            }

            val codecId = packCodecId(request.configuration.id)

            val a2dpCodecConfigChangedFlow =
                flow
                    .filter { it.getAction() == BluetoothA2dp.ACTION_CODEC_CONFIG_CHANGED }
                    .filter { it.getBluetoothDeviceExtra() == device }
                    .map {
                        it.getParcelableExtra(
                                BluetoothCodecStatus.EXTRA_CODEC_STATUS,
                                BluetoothCodecStatus::class.java,
                            )
                            ?.getCodecConfig()
                    }

            bluetoothA2dp.setCodecConfigPreference(device, newCodecConfig)

            val result =
                withTimeoutOrNull(timeoutMillis) {
                    a2dpCodecConfigChangedFlow
                        .filter { it?.getExtendedCodecType()?.getCodecId() == codecId }
                        .first()
                }
            Log.i(TAG, "Result=$result")
            SetConfigurationResponse.newBuilder().setSuccess(result != null).build()
        }
    }

    private fun unpackCodecId(codecId: Long): CodecId {
        val codecType = (codecId and 0xFF).toInt()
        val vendorId = ((codecId shr 8) and 0xFFFF).toInt()
        val vendorCodecId = ((codecId shr 24) and 0xFFFF).toInt()
        val codecIdBuilder = CodecId.newBuilder()
        when (codecType) {
            0x00 -> {
                codecIdBuilder.setSbc(Empty.getDefaultInstance())
            }
            0x02 -> {
                codecIdBuilder.setMpegAac(Empty.getDefaultInstance())
            }
            0xFF -> {
                val vendor = Vendor.newBuilder().setId(vendorId).setCodecId(vendorCodecId).build()
                codecIdBuilder.setVendor(vendor)
            }
            else -> {
                throw RuntimeException("Unknown codec type")
            }
        }
        return codecIdBuilder.build()
    }

    private fun packCodecId(codecId: CodecId): Long {
        var codecType: Int
        var vendorId: Int = 0
        var vendorCodecId: Int = 0
        when {
            codecId.hasSbc() -> {
                codecType = 0x00
            }
            codecId.hasMpegAac() -> {
                codecType = 0x02
            }
            codecId.hasVendor() -> {
                codecType = 0xFF
                vendorId = codecId.vendor.id
                vendorCodecId = codecId.vendor.codecId
            }
            else -> {
                throw RuntimeException("Unknown codec type")
            }
        }
        return (codecType.toLong() and 0xFF) or
            ((vendorId.toLong() and 0xFFFF) shl 8) or
            ((vendorCodecId.toLong() and 0xFFFF) shl 24)
    }

    private fun getProtoCodecId(
        codecConfig: BluetoothCodecConfig,
        supportedCodecTypes: Collection<BluetoothCodecType>,
    ): CodecId {
        var selectedCodecType: BluetoothCodecType? = null
        for (codecType: BluetoothCodecType in supportedCodecTypes) {
            if (codecType.getCodecId() == codecConfig.getExtendedCodecType()?.getCodecId()) {
                selectedCodecType = codecType
            }
        }
        if (selectedCodecType == null) {
            Log.e(TAG, "getProtoCodecId: selectedCodecType is null")
            return CodecId.newBuilder().build()
        }
        return unpackCodecId(selectedCodecType.getCodecId())
    }

    private fun getProtoCodecParameters(codecConfig: BluetoothCodecConfig): CodecParameters {
        var channelMode: ChannelMode
        var samplingFrequencyHz: Int
        var bitDepth: Int
        when (codecConfig.getSampleRate()) {
            BluetoothCodecConfig.SAMPLE_RATE_NONE -> {
                samplingFrequencyHz = 0
            }
            BluetoothCodecConfig.SAMPLE_RATE_44100 -> {
                samplingFrequencyHz = 44100
            }
            BluetoothCodecConfig.SAMPLE_RATE_48000 -> {
                samplingFrequencyHz = 48000
            }
            BluetoothCodecConfig.SAMPLE_RATE_88200 -> {
                samplingFrequencyHz = 88200
            }
            BluetoothCodecConfig.SAMPLE_RATE_96000 -> {
                samplingFrequencyHz = 96000
            }
            BluetoothCodecConfig.SAMPLE_RATE_176400 -> {
                samplingFrequencyHz = 176400
            }
            BluetoothCodecConfig.SAMPLE_RATE_192000 -> {
                samplingFrequencyHz = 192000
            }
            else -> {
                throw RuntimeException("Unknown sample rate")
            }
        }
        when (codecConfig.getBitsPerSample()) {
            BluetoothCodecConfig.BITS_PER_SAMPLE_NONE -> {
                bitDepth = 0
            }
            BluetoothCodecConfig.BITS_PER_SAMPLE_16 -> {
                bitDepth = 16
            }
            BluetoothCodecConfig.BITS_PER_SAMPLE_24 -> {
                bitDepth = 24
            }
            BluetoothCodecConfig.BITS_PER_SAMPLE_32 -> {
                bitDepth = 32
            }
            else -> {
                throw RuntimeException("Unknown bit depth")
            }
        }
        when (codecConfig.getChannelMode()) {
            BluetoothCodecConfig.CHANNEL_MODE_NONE -> {
                channelMode = ChannelMode.UNKNOWN
            }
            BluetoothCodecConfig.CHANNEL_MODE_MONO -> {
                channelMode = ChannelMode.MONO
            }
            BluetoothCodecConfig.CHANNEL_MODE_STEREO -> {
                channelMode = ChannelMode.STEREO
            }
            else -> {
                throw RuntimeException("Unknown channel mode")
            }
        }
        return CodecParameters.newBuilder()
            .setSamplingFrequencyHz(samplingFrequencyHz)
            .setBitDepth(bitDepth)
            .setChannelMode(channelMode)
            .build()
    }

    private fun getCodecConfigFromProtoConfiguration(
        configuration: Configuration
    ): BluetoothCodecConfig? {
        var selectedCodecType: BluetoothCodecType? = null
        val codecTypes = bluetoothA2dp.getSupportedCodecTypes()
        val codecId = packCodecId(configuration.id)
        var sampleRate: Int
        var bitsPerSample: Int
        var channelMode: Int
        for (codecType: BluetoothCodecType in codecTypes) {
            if (codecType.getCodecId() == codecId) {
                selectedCodecType = codecType
            }
        }
        if (selectedCodecType == null) {
            Log.e(TAG, "getCodecConfigFromProtoConfiguration: selectedCodecType is null")
            return null
        }
        when (configuration.parameters.getSamplingFrequencyHz()) {
            0 -> {
                sampleRate = BluetoothCodecConfig.SAMPLE_RATE_NONE
            }
            44100 -> {
                sampleRate = BluetoothCodecConfig.SAMPLE_RATE_44100
            }
            48000 -> {
                sampleRate = BluetoothCodecConfig.SAMPLE_RATE_48000
            }
            88200 -> {
                sampleRate = BluetoothCodecConfig.SAMPLE_RATE_88200
            }
            96000 -> {
                sampleRate = BluetoothCodecConfig.SAMPLE_RATE_96000
            }
            176400 -> {
                sampleRate = BluetoothCodecConfig.SAMPLE_RATE_176400
            }
            192000 -> {
                sampleRate = BluetoothCodecConfig.SAMPLE_RATE_192000
            }
            else -> {
                throw RuntimeException("Unknown sample rate")
            }
        }
        when (configuration.parameters.getBitDepth()) {
            0 -> {
                bitsPerSample = BluetoothCodecConfig.BITS_PER_SAMPLE_NONE
            }
            16 -> {
                bitsPerSample = BluetoothCodecConfig.BITS_PER_SAMPLE_16
            }
            24 -> {
                bitsPerSample = BluetoothCodecConfig.BITS_PER_SAMPLE_24
            }
            32 -> {
                bitsPerSample = BluetoothCodecConfig.BITS_PER_SAMPLE_32
            }
            else -> {
                throw RuntimeException("Unknown bit depth")
            }
        }
        when (configuration.parameters.getChannelMode()) {
            ChannelMode.UNKNOWN -> {
                channelMode = BluetoothCodecConfig.CHANNEL_MODE_NONE
            }
            ChannelMode.MONO -> {
                channelMode = BluetoothCodecConfig.CHANNEL_MODE_MONO
            }
            ChannelMode.STEREO -> {
                channelMode = BluetoothCodecConfig.CHANNEL_MODE_STEREO
            }
            else -> {
                throw RuntimeException("Unknown channel mode")
            }
        }
        return BluetoothCodecConfig.Builder()
            .setExtendedCodecType(selectedCodecType)
            .setCodecPriority(BluetoothCodecConfig.CODEC_PRIORITY_HIGHEST)
            .setSampleRate(sampleRate)
            .setBitsPerSample(bitsPerSample)
            .setChannelMode(channelMode)
            .build()
    }
}
+145 −8
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ from avatar.pandora_server import AndroidPandoraServer
import bumble
from bumble.avctp import AVCTP_PSM
from bumble.a2dp import (
    A2DP_MPEG_2_4_AAC_CODEC_TYPE,
    MPEG_2_AAC_LC_OBJECT_TYPE,
    A2DP_SBC_CODEC_TYPE,
    SBC_DUAL_CHANNEL_MODE,
    SBC_JOINT_STEREO_CHANNEL_MODE,
@@ -31,11 +33,12 @@ from bumble.a2dp import (
    SBC_MONO_CHANNEL_MODE,
    SBC_SNR_ALLOCATION_METHOD,
    SBC_STEREO_CHANNEL_MODE,
    AacMediaCodecInformation,
    SbcMediaCodecInformation,
    make_audio_sink_service_sdp_records,
)
from bumble.avdtp import (AVDTP_AUDIO_MEDIA_TYPE, AVDTP_OPEN_STATE, AVDTP_PSM, AVDTP_STREAMING_STATE, Listener,
                          MediaCodecCapabilities, Protocol)
                          MediaCodecCapabilities, Protocol, AVDTP_BAD_STATE_ERROR, Suspend_Reject)
from bumble.l2cap import (ChannelManager, ClassicChannel, ClassicChannelSpec, L2CAP_Configure_Request,
                          L2CAP_Connection_Response, L2CAP_SIGNALING_CID)
from bumble.pairing import PairingDelegate
@@ -45,10 +48,10 @@ from mobly.asserts import assert_in # type: ignore
from mobly.asserts import assert_is_not_none  # type: ignore
from mobly.asserts import fail  # type: ignore
from pandora.a2dp_grpc_aio import A2DP
from pandora.a2dp_pb2 import PlaybackAudioRequest, Source
from pandora.a2dp_pb2 import PlaybackAudioRequest, Source, Configuration, STEREO
from pandora.host_pb2 import Connection
from pandora.security_pb2 import LEVEL2
from typing import Optional
from typing import Optional, Tuple

logger = logging.getLogger(__name__)

@@ -91,7 +94,7 @@ async def open_source(device, connection) -> Source:
    return source


def codec_capabilities():
def sbc_codec_capabilities() -> MediaCodecCapabilities:
    """Codec capabilities for the Bumble sink devices."""

    return MediaCodecCapabilities(
@@ -117,6 +120,22 @@ def codec_capabilities():
    )


def aac_codec_capabilities() -> MediaCodecCapabilities:
    """Codec capabilities for the Bumble sink devices."""

    return MediaCodecCapabilities(
        media_type=AVDTP_AUDIO_MEDIA_TYPE,
        media_codec_type=A2DP_MPEG_2_4_AAC_CODEC_TYPE,
        media_codec_information=AacMediaCodecInformation.from_lists(
            object_types=[MPEG_2_AAC_LC_OBJECT_TYPE],
            sampling_frequencies=[48000, 44100],
            channels=[1, 2],
            vbr=1,
            bitrate=256000,
        ),
    )


class AudioSignal:
    """Audio signal generator and verifier."""

@@ -209,10 +228,11 @@ class A2dpTest(base_test.BaseTestClass): # type: ignore[misc]
        self.ref2.a2dp_sink = None

        def on_ref1_avdtp_connection(server):
            self.ref1.a2dp_sink = server.add_sink(codec_capabilities())
            self.ref1.a2dp_sink = server.add_sink(sbc_codec_capabilities())

        def on_ref2_avdtp_connection(server):
            self.ref2.a2dp_sink = server.add_sink(codec_capabilities())
            self.ref2.a2dp_sink = server.add_sink(sbc_codec_capabilities())
            self.ref2.a2dp_sink = server.add_sink(aac_codec_capabilities())

        self.ref1.a2dp.on('connection', on_ref1_avdtp_connection)
        self.ref2.a2dp.on('connection', on_ref2_avdtp_connection)
@@ -275,7 +295,7 @@ class A2dpTest(base_test.BaseTestClass): # type: ignore[misc]

        def on_avdtp_connection(server):
            nonlocal avdtp_future
            self.ref1.a2dp_sink = server.add_sink(codec_capabilities())
            self.ref1.a2dp_sink = server.add_sink(sbc_codec_capabilities())
            self.ref1.log.info(f'Sink: {self.ref1.a2dp_sink}')
            avdtp_future.set_result(None)

@@ -403,9 +423,126 @@ class A2dpTest(base_test.BaseTestClass): # type: ignore[misc]

        # Initiate AVDTP with connected L2CAP signaling channel
        protocol = Protocol(channel)
        protocol.add_sink(codec_capabilities())
        protocol.add_sink(sbc_codec_capabilities())
        logger.info("<< Test finished! >>")

    @avatar.asynchronous
    async def test_reconfigure_codec_success(self) -> None:
        """Basic A2DP connection and codec reconfiguration.

        1. Pair and Connect RD2
        2. Check current codec configuration - should be AAC
        3. Set SBC codec configuration
        """
        # Connect and pair RD2.
        dut_ref2, ref2_dut = await asyncio.gather(
            initiate_pairing(self.dut, self.ref2.address),
            accept_pairing(self.ref2, self.dut.address),
        )

        # Connect AVDTP to RD2.
        dut_ref2_source = await open_source(self.dut, dut_ref2)
        assert_is_not_none(self.ref2.a2dp_sink)
        assert_is_not_none(self.ref2.a2dp_sink.stream)
        assert_in(self.ref2.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE])

        # Get current codec status
        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
        assert configurationResponse.configuration.id.HasField('mpeg_aac')

        new_configuration = Configuration()
        new_configuration.id.sbc.SetInParent()
        new_configuration.parameters.sampling_frequency_hz = 44100
        new_configuration.parameters.bit_depth = 16
        new_configuration.parameters.channel_mode = STEREO

        # Set new codec
        logger.info(f"Switching to codec: {new_configuration}")
        result = await self.dut.a2dp.SetConfiguration(connection=dut_ref2, configuration=new_configuration)
        assert result.success

        # Get current codec status
        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
        assert configurationResponse.configuration.id.HasField('sbc')

    @avatar.asynchronous
    async def test_reconfigure_codec_error_unsupported(self) -> None:
        """Basic A2DP connection and codec reconfiguration failure.

        1. Pair and Connect RD2
        2. Check current codec configuration - should be AAC
        3. Set SBC codec configuration with unsupported parameters
        """
        # Connect and pair RD2.
        dut_ref2, ref2_dut = await asyncio.gather(
            initiate_pairing(self.dut, self.ref2.address),
            accept_pairing(self.ref2, self.dut.address),
        )

        # Connect AVDTP to RD2.
        dut_ref2_source = await open_source(self.dut, dut_ref2)
        assert_is_not_none(self.ref2.a2dp_sink)
        assert_is_not_none(self.ref2.a2dp_sink.stream)
        assert_in(self.ref2.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE])

        # Get current codec status
        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
        assert configurationResponse.configuration.id.HasField('mpeg_aac')

        new_configuration = Configuration()
        new_configuration.id.sbc.SetInParent()
        new_configuration.parameters.sampling_frequency_hz = 176400
        new_configuration.parameters.bit_depth = 24
        new_configuration.parameters.channel_mode = STEREO

        # Set new codec
        logger.info(f"Switching to codec: {new_configuration}")
        result = await self.dut.a2dp.SetConfiguration(connection=dut_ref2, configuration=new_configuration)
        assert result.success == False

        # Get current codec status, assure it did not change
        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
        assert configurationResponse.configuration.id.HasField('mpeg_aac')

    @avatar.asynchronous
    async def test_reconfigure_codec_aac_error(self) -> None:
        # Connect and pair RD2.
        dut_ref2, ref2_dut = await asyncio.gather(
            initiate_pairing(self.dut, self.ref2.address),
            accept_pairing(self.ref2, self.dut.address),
        )

        # Connect AVDTP to RD2.
        dut_ref2_source = await open_source(self.dut, dut_ref2)
        assert_is_not_none(self.ref2.a2dp_sink)
        assert_is_not_none(self.ref2.a2dp_sink.stream)
        assert_in(self.ref2.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE])

        # Get current codec status
        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
        assert configurationResponse.configuration.id.HasField('mpeg_aac')

        new_configuration = Configuration()
        new_configuration.id.sbc.SetInParent()
        new_configuration.parameters.sampling_frequency_hz = 176400
        new_configuration.parameters.bit_depth = 24
        new_configuration.parameters.channel_mode = STEREO

        # Set new codec
        logger.info(f"Switching to codec: {new_configuration}")
        result = await self.dut.a2dp.SetConfiguration(connection=dut_ref2, configuration=new_configuration)
        assert result.success == False

        # Get current codec status, assure it did not change
        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
        assert configurationResponse.configuration.id.HasField('mpeg_aac')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)