Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit fe3db122 authored by Yuyang Huang's avatar Yuyang Huang Committed by Gerrit Code Review
Browse files

Merge "Add ASHA avatar tests"

parents 4fac27a6 a9f65b49
Loading
Loading
Loading
Loading
+397 −0
Original line number Diff line number Diff line
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging
from typing import List, Optional, Tuple

import time
from avatar import PandoraDevices
from avatar import parameterized
from avatar.aio import asynchronous
from avatar.bumble_server.security import PairingDelegate
from avatar.pandora_client import BumblePandoraClient, PandoraClient
from bumble.gatt import GATT_ASHA_SERVICE
from mobly import base_test, test_runner
from mobly.asserts import assert_equal  # type: ignore
from mobly.asserts import assert_in  # type: ignore
from pandora.host_grpc import AdvertiseResponse
from pandora.host_grpc import Connection
from pandora.host_grpc import DataTypes
from pandora.host_grpc import OwnAddressType
from pandora.host_grpc import ScanningResponse
from pandora.security_grpc import LESecurityLevel

ASHA_UUID = GATT_ASHA_SERVICE.to_hex_str()
HISYCNID: List[int] = [0x01, 0x02, 0x03, 0x04, 0x5, 0x6, 0x7, 0x8]
CAPABILITY: int = 0x0
COMPLETE_LOCAL_NAME: str = "Bumble"


class ASHATest(base_test.BaseTestClass):  # type: ignore[misc]
    devices: Optional[PandoraDevices] = None
    dut: PandoraClient
    ref: BumblePandoraClient

    def setup_class(self) -> None:
        self.devices = PandoraDevices(self)
        dut, ref = self.devices
        assert isinstance(ref, BumblePandoraClient)
        self.dut, self.ref = dut, ref

    def teardown_class(self) -> None:
        if self.devices:
            self.devices.stop_all()

    @asynchronous
    async def setup_test(self) -> None:
        await asyncio.gather(self.dut.reset(), self.ref.reset())
        # ASHA hearing aid's IO capability is NO_OUTPUT_NO_INPUT
        setattr(self.ref.device, "io_capability", PairingDelegate.NO_OUTPUT_NO_INPUT)

    def advertise(self, ref_address_type: OwnAddressType) -> AdvertiseResponse:
        """
        Ref device starts to advertise
        :return: Ref device's advertise response
        """
        # Ref starts advertising with ASHA service data
        self.ref.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID)
        return self.ref.host.Advertise(
            legacy=True,
            connectable=True,
            data=DataTypes(
                complete_local_name=COMPLETE_LOCAL_NAME,
                incomplete_service_class_uuids16=[ASHA_UUID],
            ),
            own_address_type=ref_address_type,
        )

    def scan(self, dut_address_type: OwnAddressType) -> ScanningResponse:
        """
        DUT starts to scan for the Ref device.
        :return: ScanningResponse
        """
        scan_result = self.dut.host.Scan(own_address_type=dut_address_type)
        ref = next(
            (
                x
                for x in scan_result
                if ASHA_UUID in x.data.incomplete_service_class_uuids16
            )
        )
        scan_result.cancel()

        assert ref
        return ref

    def connect(
        self, advertisement, ref: ScanningResponse, dut_address_type: OwnAddressType
    ) -> Tuple[Connection, Connection]:
        """
        Helper method for Dut connects to Ref
        :return: a Tuple (DUT to REF connection, REF to DUT connection)
        """
        # DUT connects to Ref
        dut_ref = self.dut.host.ConnectLE(
            own_address_type=dut_address_type, **ref.address_asdict()
        ).connection
        ref_dut = (next(advertisement)).connection
        assert dut_ref
        assert ref_dut

        advertisement.cancel()
        return dut_ref, ref_dut

    @asynchronous
    async def setup_test(self) -> None:
        async def reset(device: PandoraClient) -> None:
            await device.aio.host.FactoryReset()
            device.address = (await device.aio.host.ReadLocalAddress(wait_for_ready=True)).address  # type: ignore[assignment]

        await asyncio.gather(reset(self.dut), reset(self.ref))

    def test_advertising_advertisment_data(self) -> None:
        """
        Ref starts ASHA advertisements with service data in advertisement data.
        DUT starts a service discovery.
        Verify Ref is correctly discovered by DUT as a hearing aid device.
        """
        protocol_version = 0x01
        truncated_hisyncid = HISYCNID[:4]

        advertisement = self.advertise(ref_address_type=OwnAddressType.RANDOM)

        # DUT starts a service discovery
        scan_result = self.scan(dut_address_type=OwnAddressType.RANDOM)
        advertisement.cancel()

        # Verify Ref is correctly discovered by DUT as a hearing aid device
        assert_in(ASHA_UUID, scan_result.data.service_data_uuid16)
        assert_equal(type(scan_result.data.complete_local_name), str)
        expected_advertisement_data = (
            "{:02x}".format(protocol_version)
            + "{:02x}".format(CAPABILITY)
            + "".join([("{:02x}".format(x)) for x in truncated_hisyncid])
        )
        assert_equal(
            expected_advertisement_data,
            (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(),
        )

    def test_advertising_scan_response(self) -> None:
        """
        Ref starts ASHA advertisements with service data in scan response data.
        DUT starts a service discovery.
        Verify Ref is correctly discovered by DUT as a hearing aid device.
        """
        protocol_version = 0x01
        truncated_hisyncid = HISYCNID[:4]

        self.ref.asha.Register(capability=CAPABILITY, hisyncid=HISYCNID)

        # advertise with ASHA service data in scan response
        advertisement = self.ref.host.Advertise(
            legacy=True,
            scan_response_data=DataTypes(
                complete_local_name=COMPLETE_LOCAL_NAME,
                complete_service_class_uuids16=[ASHA_UUID],
            ),
        )

        scan_result = self.scan(dut_address_type=OwnAddressType.RANDOM)
        advertisement.cancel()

        # Verify Ref is correctly discovered by DUT as a hearing aid device.
        assert_in(ASHA_UUID, scan_result.data.service_data_uuid16)
        expected_advertisement_data = (
            "{:02x}".format(protocol_version)
            + "{:02x}".format(CAPABILITY)
            + "".join([("{:02x}".format(x)) for x in truncated_hisyncid])
        )
        assert_equal(
            expected_advertisement_data,
            (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(),
        )

    @parameterized(
        (OwnAddressType.RANDOM, OwnAddressType.PUBLIC),
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM),
    )  # type: ignore[misc]
    def test_pairing(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
    ) -> None:
        """
        DUT discovers Ref.
        DUT initiates connection to Ref.
        Verify that DUT and Ref are bonded and connected.
        """
        advertisement = self.advertise(ref_address_type=ref_address_type)

        ref = self.scan(dut_address_type=dut_address_type)

        # DUT initiates connection to Ref.
        dut_ref, ref_dut = self.connect(advertisement, ref, dut_address_type)
        assert dut_ref

        # DUT starts pairing with the Ref.
        secure = self.dut.security.Secure(
            connection=dut_ref, le=LESecurityLevel.LE_LEVEL3
        )

        assert_equal(secure.WhichOneof("result"), "success")

    @parameterized(
        (OwnAddressType.RANDOM, OwnAddressType.PUBLIC),
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM),
    )  # type: ignore[misc]
    def test_unbonding(
        self,
        dut_address_type: OwnAddressType = OwnAddressType.RANDOM,
        ref_address_type: OwnAddressType = OwnAddressType.RANDOM,
    ) -> None:
        """
        DUT removes bond with Ref.
        Verify that DUT and Ref are disconnected and unbonded.
        """
        from mobly.signals import TestSkip

        raise TestSkip("update rootcanal to retry")

        advertisement = self.advertise(ref_address_type=ref_address_type)
        ref = self.scan(dut_address_type=ref_address_type)

        dut_ref, ref_dut = self.connect(advertisement, ref, dut_address_type)

        secure = self.dut.security.Secure(
            connection=dut_ref, le=LESecurityLevel.LE_LEVEL3
        )

        assert_equal(secure.WhichOneof("result"), "success")
        self.dut.host.Disconnect(dut_ref)
        self.ref.host.WaitDisconnection(ref_dut)

        # delete the bond
        if dut_address_type == OwnAddressType.PUBLIC:
            self.dut.security_storage.DeleteBond(public=self.ref.address)
        else:
            self.dut.security_storage.DeleteBond(random=self.ref.random_address)

        # DUT connect to REF again
        dut_ref = (
            self.dut.host.ConnectLE(
                own_address_type=dut_address_type, **ref.address_asdict()
            )
        ).connection
        # TODO very likely there is a bug in android here
        logging.debug("result should come out")

        advertisement.cancel()
        assert dut_ref

        secure = self.dut.security.Secure(
            connection=dut_ref, le=LESecurityLevel.LE_LEVEL3
        )

        assert_equal(secure.WhichOneof("result"), "success")

    @parameterized(
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM),
        (OwnAddressType.RANDOM, OwnAddressType.PUBLIC),
    )  # type: ignore[misc]
    def test_connection(
        self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType
    ) -> None:
        """
        DUT discovers Ref.
        DUT initiates connection to Ref.
        Verify that DUT and Ref are connected.
        """
        advertisement = self.advertise(ref_address_type=ref_address_type)
        ref = self.scan(dut_address_type=dut_address_type)
        dut_ref, ref_dut = self.connect(advertisement, ref, dut_address_type)
        assert dut_ref
        assert ref_dut

    @parameterized(
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM),
        (OwnAddressType.RANDOM, OwnAddressType.PUBLIC),
    )  # type: ignore[misc]
    def test_disconnect_initiator(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
    ) -> None:
        """
        DUT initiates disconnection to Ref.
        Verify that DUT and Ref are disconnected.
        """
        advertisement = self.advertise(ref_address_type=ref_address_type)
        ref = self.scan(dut_address_type=dut_address_type)
        dut_ref, ref_dut = self.connect(advertisement, ref, dut_address_type)

        self.dut.host.Disconnect(connection=dut_ref)

    @parameterized(
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM),
        (OwnAddressType.RANDOM, OwnAddressType.PUBLIC),
    )  # type: ignore[misc]
    def test_disconnect_acceptor(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
    ) -> None:
        """
        Ref initiates disconnection to DUT (typically when put back in its box).
        Verify that Ref is disconnected.
        """
        advertisement = self.advertise(ref_address_type=ref_address_type)
        ref = self.scan(dut_address_type=dut_address_type)
        dut_ref, ref_dut = self.connect(advertisement, ref, dut_address_type)
        assert dut_ref
        assert ref_dut
        self.ref.host.Disconnect(connection=ref_dut)

    @parameterized(
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM, 0),
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM, 0.5),
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM, 1),
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM, 5),
    )  # type: ignore[misc]
    def test_reconnection(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
        reconnection_gap: float,
    ) -> None:
        """
        DUT initiates disconnection to the Ref.
        Verify that DUT and Ref are disconnected.
        DUT reconnects to Ref after various certain time.
        Verify that DUT and Ref are connected.
        """

        def connect_and_disconnect():
            advertisement = self.advertise(ref_address_type=ref_address_type)
            ref = self.scan(dut_address_type=dut_address_type)
            dut_ref, ref_dut = self.connect(advertisement, ref, dut_address_type)
            self.dut.host.Disconnect(connection=dut_ref)

        connect_and_disconnect()
        # simulating reconnect interval
        time.sleep(reconnection_gap)
        connect_and_disconnect()

    @parameterized(
        (OwnAddressType.RANDOM, OwnAddressType.RANDOM),
        (OwnAddressType.RANDOM, OwnAddressType.PUBLIC),
    )  # type: ignore[misc]
    def test_auto_connection(
        self,
        dut_address_type: OwnAddressType,
        ref_address_type: OwnAddressType,
    ) -> None:
        """
        Ref initiates disconnection to DUT.
        Ref starts sending ASHA advertisements.
        Verify that DUT auto-connects to Ref.
        """
        advertisement = self.advertise(ref_address_type=ref_address_type)
        ref = self.scan(dut_address_type=dut_address_type)

        # manually connect and not cancel advertisement
        dut_ref = self.dut.host.ConnectLE(
            own_address_type=dut_address_type, **ref.address_asdict()
        ).connection
        ref_dut = next(advertisement).connection
        assert dut_ref
        assert ref_dut

        # pairing
        secure = self.dut.security.Secure(
            connection=dut_ref, le=LESecurityLevel.LE_LEVEL3
        )
        assert_equal(secure.WhichOneof("result"), "success")

        self.ref.host.Disconnect(connection=ref_dut)

        ref_dut = next(advertisement).connection
        advertisement.cancel()
        assert ref_dut


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)
    test_runner.main()  # type: ignore