Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 78daff40 authored by Treehugger Robot's avatar Treehugger Robot Committed by Automerger Merge Worker
Browse files

Merge "Avatar: Add ASHA dual device test - test_music_start_dual_device" am: c07ac1c5

parents 180493da c07ac1c5
Loading
Loading
Loading
Loading
+191 −66
Original line number Diff line number Diff line
@@ -161,6 +161,46 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
            + "".join([("{:02x}".format(x)) for x in truncated_hisyncid])
        )

    def get_le_psm_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[int]:
        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
        le_psm_future = asyncio.get_running_loop().create_future()

        def le_psm_handler(connection: Connection, data: int) -> None:
            le_psm_future.set_result(data)

        asha_service.on('le_psm_out', le_psm_handler)
        return le_psm_future

    def get_read_only_properties_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[bytes]:
        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
        read_only_properties_future = asyncio.get_running_loop().create_future()

        def read_only_properties_handler(connection: Connection, data: bytes) -> None:
            read_only_properties_future.set_result(data)

        asha_service.on('read_only_properties', read_only_properties_handler)
        return read_only_properties_future

    def get_start_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[dict[str, int]]:
        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
        start_future = asyncio.get_running_loop().create_future()

        def start_command_handler(connection: Connection, data: dict[str, int]) -> None:
            start_future.set_result(data)

        asha_service.on('start', start_command_handler)
        return start_future

    def get_stop_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[Connection]:
        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
        stop_future = asyncio.get_running_loop().create_future()

        def stop_command_handler(connection: Connection) -> None:
            stop_future.set_result(connection)

        asha_service.on('stop', stop_command_handler)
        return stop_future

    @avatar.parameterized(
        (RANDOM, Ear.LEFT),
        (RANDOM, Ear.RIGHT),
@@ -683,34 +723,28 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        Verify that DUT sends a correct AudioControlPoint `Start` command (codec=1,
        audiotype=0, volume=<volume set on DUT>, otherstate=<state of Ref aux if dual devices>).
        """
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=RANDOM, ear=Ear.LEFT)

        ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT)

        # DUT initiates connection to Ref.
        dut_ref, _ = await self.dut_connect_to_ref(advertisement, ref, RANDOM)

        asha_service = next((x for x in self.ref_left.device.gatt_server.attributes if isinstance(x, AshaGattService)))

        # check DUT read le_psm
        le_psm_future = asyncio.get_running_loop().create_future()

        def le_psm_handler(connection: Connection, data: int) -> None:
            le_psm_future.set_result(data)

        asha_service.on('le_psm_out', le_psm_handler)
        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
            # DUT initiates connection to ref_device.
            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
            advertisement.cancel()

        # check DUT read read_only_properties
        read_only_properties_future = asyncio.get_running_loop().create_future()
            return dut_ref, ref_dut

        def read_only_properties_handler(connection: Connection, data: bytes) -> None:
            read_only_properties_future.set_result(data)
        dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
        le_psm_future = self.get_le_psm_future(self.ref_left)
        read_only_properties_future = self.get_read_only_properties_future(self.ref_left)

        asha_service.on('read_only_properties', read_only_properties_handler)
        # DUT starts pairing with the ref_left
        (secure, wait_security) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
        )

        # DUT starts pairing with the Ref.
        # FIXME: assert the security Level on ref side
        await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3)
        assert_equal(secure.result_variant(), 'success')
        assert_equal(wait_security.result_variant(), 'success')

        le_psm_out_result = await asyncio.wait_for(le_psm_future, timeout=3.0)
        assert_is_not_none(le_psm_out_result)
@@ -719,13 +753,7 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        assert_is_not_none(read_only_properties_result)

        dut_asha = AioAsha(self.dut.aio.channel)

        start_future = asyncio.get_running_loop().create_future()

        def start_command_handler(connection: Connection, data: dict[str, int]) -> None:
            start_future.set_result(data)

        asha_service.on('start', start_command_handler)
        start_future = self.get_start_future(self.ref_left)

        logging.info("send start")
        await dut_asha.WaitPeripheral(connection=dut_ref)
@@ -735,7 +763,6 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]

        logging.info(f"start_result:{start_result}")
        assert_is_not_none(start_result)
        assert_true(isinstance(start_result, dict), "")
        assert_equal(start_result['codec'], 1)
        assert_equal(start_result['audiotype'], 0)
        assert_is_not_none(start_result['volume'])
@@ -760,9 +787,14 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        # DUT initiates connection to Ref.
        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)

        # DUT starts pairing with the Ref.
        # FIXME: assert the security Level on ref side
        await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3)
        # DUT starts pairing with the ref_left
        (secure, wait_security) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
        )

        assert_equal(secure.result_variant(), 'success')
        assert_equal(wait_security.result_variant(), 'success')

        asha_service = next((x for x in self.ref_left.device.gatt_server.attributes if isinstance(x, AshaGattService)))
        dut_asha = AioAsha(self.dut.aio.channel)
@@ -793,26 +825,30 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        DUT stops media streaming on Ref.
        Verify that DUT sends a correct AudioControlPoint `Stop` command.
        """
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=RANDOM, ear=Ear.LEFT)

        ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT)

        # DUT initiates connection to Ref.
        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
            # DUT initiates connection to ref_device.
            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
            advertisement.cancel()

        # DUT starts pairing with the Ref.
        # FIXME: assert the security Level on ref side
        await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3)
            return dut_ref, ref_dut

        asha_service = next((x for x in self.ref_left.device.gatt_server.attributes if isinstance(x, AshaGattService)))
        dut_asha = AioAsha(self.dut.aio.channel)
        dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT)

        stop_future = asyncio.get_running_loop().create_future()
        # DUT starts pairing with the ref_left
        (secure, wait_security) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
        )

        def stop_command_handler(connection: Connection) -> None:
            stop_future.set_result(connection)
        assert_equal(secure.result_variant(), 'success')
        assert_equal(wait_security.result_variant(), 'success')

        asha_service.on('stop', stop_command_handler)
        dut_asha = AioAsha(self.dut.aio.channel)

        stop_future = self.get_stop_future(self.ref_left)

        await dut_asha.WaitPeripheral(connection=dut_ref)
        await dut_asha.Start(connection=dut_ref)
@@ -843,26 +879,30 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        DUT starts media streaming again.
        Verify that DUT sends a correct AudioControlPoint `Start` command.
        """
        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=RANDOM, ear=Ear.LEFT)

        ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT)
        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
            # DUT initiates connection to ref_device.
            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
            advertisement.cancel()

        # DUT initiates connection to Ref.
        dut_ref, _ = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
            return dut_ref, ref_dut

        # DUT starts pairing with the Ref.
        # FIXME: assert the security Level on ref side
        await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3)
        dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT)

        asha_service = next((x for x in self.ref_left.device.gatt_server.attributes if isinstance(x, AshaGattService)))
        dut_asha = AioAsha(self.dut.aio.channel)
        # DUT starts pairing with the ref_left
        (secure, wait_security) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
        )

        stop_future = asyncio.get_running_loop().create_future()
        assert_equal(secure.result_variant(), 'success')
        assert_equal(wait_security.result_variant(), 'success')

        def stop_command_handler(connection: Connection) -> None:
            stop_future.set_result(connection)
        dut_asha = AioAsha(self.dut.aio.channel)

        asha_service.on('stop', stop_command_handler)
        stop_future = self.get_stop_future(self.ref_left)

        await dut_asha.WaitPeripheral(connection=dut_ref)
        await dut_asha.Start(connection=dut_ref)
@@ -874,12 +914,7 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        # restart music streaming
        logging.info("restart music streaming")

        start_future = asyncio.get_running_loop().create_future()

        def start_command_handler(connection: Connection, data: dict[str, int]) -> None:
            start_future.set_result(data)

        asha_service.on('start', start_command_handler)
        start_future = self.get_start_future(self.ref_left)

        await dut_asha.WaitPeripheral(connection=dut_ref)
        _, start_result = await asyncio.gather(
@@ -889,6 +924,96 @@ class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
        logging.info(f"start_result:{start_result}")
        assert_is_not_none(start_result)

    @asynchronous
    async def test_music_start_dual_device(self) -> None:
        """
        DUT discovers Ref.
        DUT initiates connection to Ref.
        Verify that DUT and Ref are bonded and connected.
        DUT starts media streaming.
        Verify that DUT sends a correct AudioControlPoint `Start` command (codec=1,
        audiotype=0, volume=<volume set on DUT>, otherstate=<state of Ref aux if dual devices>).
        """

        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
            # DUT initiates connection to ref_device.
            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
            advertisement.cancel()

            return dut_ref, ref_dut

        # connect ref_left
        dut_ref_left, ref_left_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
        le_psm_future_left = self.get_le_psm_future(self.ref_left)
        read_only_properties_future_left = self.get_read_only_properties_future(self.ref_left)

        # DUT starts pairing with the ref_left
        (secure_left, wait_security_left) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
        )

        assert_equal(secure_left.result_variant(), 'success')
        assert_equal(wait_security_left.result_variant(), 'success')

        le_psm_out_result_left = await asyncio.wait_for(le_psm_future_left, timeout=3.0)
        assert_is_not_none(le_psm_out_result_left)

        read_only_properties_result_left = await asyncio.wait_for(read_only_properties_future_left, timeout=3.0)
        assert_is_not_none(read_only_properties_result_left)

        dut_asha = AioAsha(self.dut.aio.channel)
        start_future_left = self.get_start_future(self.ref_left)

        logging.info("send start")
        await dut_asha.WaitPeripheral(connection=dut_ref_left)
        _, start_result_left = await asyncio.gather(
            dut_asha.Start(connection=dut_ref_left), asyncio.wait_for(start_future_left, timeout=3.0)
        )

        logging.info(f"start_result_left:{start_result_left}")
        assert_is_not_none(start_result_left)
        assert_equal(start_result_left['codec'], 1)
        assert_equal(start_result_left['audiotype'], 0)
        assert_is_not_none(start_result_left['volume'])
        assert_equal(start_result_left['otherstate'], 0)

        # connect ref_right
        dut_ref_right, ref_right_dut = await ref_device_connect(self.ref_right, Ear.RIGHT)
        le_psm_future_right = self.get_le_psm_future(self.ref_right)
        read_only_properties_future_right = self.get_read_only_properties_future(self.ref_right)

        # DUT starts pairing with the ref_right
        (secure_right, wait_security_right) = await asyncio.gather(
            self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3),
            self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3),
        )

        assert_equal(secure_right.result_variant(), 'success')
        assert_equal(wait_security_right.result_variant(), 'success')

        le_psm_out_result_right = await asyncio.wait_for(le_psm_future_right, timeout=3.0)
        assert_is_not_none(le_psm_out_result_right)

        read_only_properties_result_right = await asyncio.wait_for(read_only_properties_future_right, timeout=3.0)
        assert_is_not_none(read_only_properties_result_right)

        start_future_right = self.get_start_future(self.ref_right)

        logging.info("send start_right")
        await dut_asha.WaitPeripheral(connection=dut_ref_right)
        start_result_right = await asyncio.wait_for(start_future_right, timeout=10.0)

        logging.info(f"start_result_right:{start_result_right}")
        assert_is_not_none(start_result_right)
        assert_equal(start_result_right['codec'], 1)
        assert_equal(start_result_right['audiotype'], 0)
        assert_is_not_none(start_result_right['volume'])
        # ref_left already connected, otherstate = 1
        assert_equal(start_result_right['otherstate'], 1)


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)