Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 37d9db12 authored by Henri Chataing's avatar Henri Chataing Committed by Automerger Merge Worker
Browse files

Merge "PTS-bot: Update mmi2grpc for PTS-setup v8.5.0" into main am: 28780ab6

parents dea64f66 28780ab6
Loading
Loading
Loading
Loading
+5 −5
Original line number Original line Diff line number Diff line
@@ -203,12 +203,12 @@ class IUT:
        # Handles GATT MMIs.
        # Handles GATT MMIs.
        if profile in ("GATT"):
        if profile in ("GATT"):
            if not self._gatt:
            if not self._gatt:
                self._gatt = GATTProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
                self._gatt = GATTProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
            return self._gatt.interact(test, interaction, description, pts_address)
            return self._gatt.interact(test, interaction, description, pts_address)
        # Handles GAP MMIs.
        # Handles GAP MMIs.
        if profile in ("GAP"):
        if profile in ("GAP"):
            if not self._gap:
            if not self._gap:
                self._gap = GAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
                self._gap = GAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
            return self._gap.interact(test, interaction, description, pts_address)
            return self._gap.interact(test, interaction, description, pts_address)
        # Handles HFP MMIs.
        # Handles HFP MMIs.
        if profile in ("HFP"):
        if profile in ("HFP"):
@@ -231,12 +231,12 @@ class IUT:
        # Handles HOGP MMIs.
        # Handles HOGP MMIs.
        if profile in ("HOGP"):
        if profile in ("HOGP"):
            if not self._hogp:
            if not self._hogp:
                self._hogp = HOGPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
                self._hogp = HOGPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
            return self._hogp.interact(test, interaction, description, pts_address)
            return self._hogp.interact(test, interaction, description, pts_address)
        # Instantiates L2CAP proxy and reroutes corresponding MMIs to it.
        # Instantiates L2CAP proxy and reroutes corresponding MMIs to it.
        if profile in ("L2CAP"):
        if profile in ("L2CAP"):
            if not self._l2cap:
            if not self._l2cap:
                self._l2cap = L2CAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
                self._l2cap = L2CAPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
            return self._l2cap.interact(test, interaction, description, pts_address)
            return self._l2cap.interact(test, interaction, description, pts_address)
        # Handles MAP MMIs.
        # Handles MAP MMIs.
        if profile in ("MAP"):
        if profile in ("MAP"):
@@ -271,7 +271,7 @@ class IUT:
        # Handles SM MMIs.
        # Handles SM MMIs.
        if profile in ("SM"):
        if profile in ("SM"):
            if not self._sm:
            if not self._sm:
                self._sm = SMProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"))
                self._sm = SMProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
            return self._sm.interact(test, interaction, description, pts_address)
            return self._sm.interact(test, interaction, description, pts_address)


        # Handles unsupported profiles.
        # Handles unsupported profiles.
+1 −1
Original line number Original line Diff line number Diff line
@@ -74,7 +74,7 @@ def match_description(f):
    """
    """


    def normalize(desc):
    def normalize(desc):
        return desc.replace("\n", " ").replace("\t", "    ").strip()
        return re.sub('\s+', ' ', desc).strip()


    docstring = normalize(textwrap.dedent(f.__doc__))
    docstring = normalize(textwrap.dedent(f.__doc__))
    regex = re.compile(docstring)
    regex = re.compile(docstring)
+20 −0
Original line number Original line Diff line number Diff line
@@ -3,6 +3,7 @@ Copied from tools/rootcanal/scripts/test_channel.py
"""
"""


import socket
import socket
import enum
from time import sleep
from time import sleep




@@ -74,6 +75,12 @@ class TestChannel:
                raise ValueError  # Size must be encodable in one octet.
                raise ValueError  # Size must be encodable in one octet.




class Dongle(enum.Enum):
    DEFAULT = "default"
    LAIRD_BL654 = "laird_bl654"
    CSR_RCK_PTS_DONGLE = "csr_rck_pts_dongle"


class RootCanal:
class RootCanal:


    def __init__(self, port):
    def __init__(self, port):
@@ -86,6 +93,19 @@ class RootCanal:
    def close(self):
    def close(self):
        self.channel.close()
        self.channel.close()


    def select_pts_dongle(self, dongle: Dongle):
        """Use the control port to dynamically reconfigure the controller
        properties for the dongle used by the PTS tester.

        This method will cause a Reset on the controller.
        This method shall exclusively be called from the test_started
        interaction."""
        # The PTS is the device with the highest ID,
        # Android is always first to connect to root-canal.
        (devices, _) = self._read_device_list()
        pts_id = max([id for (id, _) in devices])
        self.channel.send_command("set_device_configuration", [pts_id, dongle.value])

    def move_out_of_range(self):
    def move_out_of_range(self):
        """Space out the connected devices to generate a supervision
        """Space out the connected devices to generate a supervision
        timeout for all existing connections."""
        timeout for all existing connections."""
+13 −3
Original line number Original line Diff line number Diff line
@@ -221,10 +221,11 @@ class A2DPProxy(ProfileProxy):
        IUT?
        IUT?
        """
        """


        result = self.audio.verify()
        # TODO(302136232): audio validation is disabled on cuttlefish (too laggy)
        assert result
        #result = self.audio.verify()
        #assert result


        return "Yes" if result else "No"
        return "Yes"


    @assert_description
    @assert_description
    def TSC_AVDTP_mmi_iut_initiate_get_capabilities(self, **kwargs):
    def TSC_AVDTP_mmi_iut_initiate_get_capabilities(self, **kwargs):
@@ -641,3 +642,12 @@ class A2DPProxy(ProfileProxy):
        """
        """


        return "OK"
        return "OK"

    @assert_description
    def TSC_AVDTP_mmi_iut_initiate_reconfigure(self, **kwargs):
        """
        Send a reconfigure command to PTS.
        """

        # TODO
        return "OK"
+224 −18
Original line number Original line Diff line number Diff line
from threading import Thread
from threading import Thread
from mmi2grpc._helpers import assert_description, match_description
from mmi2grpc._helpers import assert_description, match_description
from mmi2grpc._rootcanal import Dongle
from mmi2grpc._proxy import ProfileProxy
from mmi2grpc._proxy import ProfileProxy
from time import sleep
from time import sleep
import sys


from pandora_experimental.gatt_grpc import GATT
from pandora_experimental.gatt_grpc import GATT
from pandora_experimental.gatt_pb2 import GattServiceParams, GattCharacteristicParams
from pandora_experimental.gatt_pb2 import GattServiceParams, GattCharacteristicParams
from pandora.host_grpc import Host
from pandora.host_grpc import Host
from pandora.host_pb2 import PUBLIC, RANDOM, DISCOVERABLE_GENERAL, NOT_DISCOVERABLE, DISCOVERABLE_LIMITED, NOT_CONNECTABLE, DataTypes
from pandora.host_pb2 import PUBLIC, RANDOM, DISCOVERABLE_GENERAL, NOT_DISCOVERABLE, DISCOVERABLE_LIMITED, NOT_CONNECTABLE, DataTypes
from pandora.security_grpc import Security, SecurityStorage
from pandora.security_grpc import Security, SecurityStorage
from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer
from pandora.security_pb2 import LEVEL1, LEVEL2, LE_LEVEL3, PairingEventAnswer




class GAPProxy(ProfileProxy):
class GAPProxy(ProfileProxy):


    def __init__(self, channel):
    def __init__(self, channel, rootcanal):
        super().__init__(channel)
        super().__init__(channel)
        self.gatt = GATT(channel)
        self.gatt = GATT(channel)
        self.host = Host(channel)
        self.host = Host(channel)
        self.security = Security(channel)
        self.security = Security(channel)
        self.security_storage = SecurityStorage(channel)
        self.security_storage = SecurityStorage(channel)
        self.rootcanal = rootcanal


        self.connection = None
        self.connection = None
        self.pairing_events = None
        self.pairing_events = None
@@ -30,6 +33,24 @@ class GAPProxy(ProfileProxy):


        self._auto_confirm_requests()
        self._auto_confirm_requests()


    def test_started(self, test: str, description: str, pts_addr: bytes):
        if test in [
                "GAP/CONN/CPUP/BV-06-C",
        ]:
            self.rootcanal.select_pts_dongle(Dongle.LAIRD_BL654)
        else:
            self.rootcanal.select_pts_dongle(Dongle.CSR_RCK_PTS_DONGLE)

        if test in [
                "GAP/DM/LEP/BV-07-C",
                "GAP/DM/LEP/BV-08-C",
                "GAP/DM/LEP/BV-11-C",
                "GAP/MOD/CON/BV-01-C",
        ]:
            self.host.SetDiscoverabilityMode(mode=DISCOVERABLE_GENERAL)

        return "OK"

    @match_description
    @match_description
    def TSC_MMI_iut_send_hci_connect_request(self, test: str, pts_addr: bytes, **kwargs):
    def TSC_MMI_iut_send_hci_connect_request(self, test: str, pts_addr: bytes, **kwargs):
        """
        """
@@ -37,7 +58,19 @@ class GAPProxy(ProfileProxy):
        connection( after the IUT discovers the Lower Tester over BR and LE)?.
        connection( after the IUT discovers the Lower Tester over BR and LE)?.
        """
        """


        if test in {"GAP/SEC/AUT/BV-02-C", "GAP/SEC/SEM/BV-05-C", "GAP/SEC/SEM/BV-08-C"}:
        if test in [
                "GAP/IDLE/BON/BV-03-C",
                "GAP/IDLE/BON/BV-04-C",
                "GAP/IDLE/BON/BV-05-C",
                "GAP/IDLE/BON/BV-06-C",
                "GAP/SEC/AUT/BV-02-C",
                "GAP/SEC/SEM/BV-05-C",
                "GAP/SEC/SEM/BV-08-C",
                "GAP/SEC/SEM/BV-50-C",
                "GAP/SEC/SEM/BI-27-C",
                "GAP/SEC/SEM/BI-32-C",
                "GAP/EST/LIE/BV-02-C",
        ]:
            # we connect then pair, so we have to pair directly in this MMI
            # we connect then pair, so we have to pair directly in this MMI
            self.pairing_events = self.security.OnPairing()
            self.pairing_events = self.security.OnPairing()
            self.connection = self.host.Connect(address=pts_addr).connection
            self.connection = self.host.Connect(address=pts_addr).connection
@@ -143,8 +176,13 @@ class GAPProxy(ProfileProxy):
            # so we have immediately auto-connected back to it
            # so we have immediately auto-connected back to it
            return "OK"
            return "OK"


        if test in {"GAP/CONN/GCEP/BV-02-C", "GAP/DM/LEP/BV-06-C", "GAP/CONN/GCEP/BV-01-C"}:
        if test in [
            # use identity address
                "GAP/CONN/DCEP/BV-03-C",
                "GAP/CONN/GCEP/BV-02-C",
                "GAP/DM/LEP/BV-06-C",
                "GAP/CONN/GCEP/BV-01-C",
        ]:
            # PTS is not advertising with the local name, use identity address
            address = pts_addr
            address = pts_addr
        else:
        else:
            # the PTS sometimes decides to advertise with an RPA, so we do a scan to find its real address
            # the PTS sometimes decides to advertise with an RPA, so we do a scan to find its real address
@@ -157,7 +195,9 @@ class GAPProxy(ProfileProxy):
                    scans.cancel()
                    scans.cancel()
                    break
                    break


        self.pairing_events = self.security.OnPairing()
        self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=address).connection
        self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=address).connection

        if test in {"GAP/BOND/BON/BV-04-C"}:
        if test in {"GAP/BOND/BON/BV-04-C"}:
            self.security.Secure(connection=self.connection, le=LE_LEVEL3)
            self.security.Secure(connection=self.connection, le=LE_LEVEL3)


@@ -299,11 +339,11 @@ class GAPProxy(ProfileProxy):


        return "OK"
        return "OK"


    @assert_description
    @match_description
    def TSC_MMI_iut_start_general_inquiry_found(self, pts_addr: bytes, **kwargs):
    def TSC_MMI_iut_start_general_inquiry_found(self, pts_addr: bytes, **kwargs):
        """
        """
        Please start general inquiry. Click 'Yes' If IUT does discovers PTS and
        Please start general inquiry. Click 'Yes' If IUT does discovers PTS and
        ready for PTS to initiate LE create connection otherwise click 'No'.
        ready for PTS to initiate (a|LE) create connection otherwise click 'No'.
        """
        """


        inquiry_responses = self.host.Inquiry()
        inquiry_responses = self.host.Inquiry()
@@ -329,7 +369,7 @@ class GAPProxy(ProfileProxy):
    @match_description
    @match_description
    def TSC_MMI_iut_confirm_device_discovery(self, test: str, pts_addr: bytes, **kwargs):
    def TSC_MMI_iut_confirm_device_discovery(self, test: str, pts_addr: bytes, **kwargs):
        """
        """
        Please confirm that IUT has discovered PTS and retrieved its name (?P<name>[a-zA-Z\-0-9]*)
        Please confirm that IUT has discovered PTS and retrieved its name '?(?P<name>[a-zA-Z\-0-9]*)'?\.?
        """
        """
        #Verifying if the BD Address matches in Inquiry
        #Verifying if the BD Address matches in Inquiry
        inquiry_responses = self.host.Inquiry()
        inquiry_responses = self.host.Inquiry()
@@ -546,14 +586,18 @@ class GAPProxy(ProfileProxy):


        return "OK"
        return "OK"


    @assert_description
    @match_description
    def TSC_MMI_iut_start_bonding_procedure_bondable(self, test: str, pts_addr: bytes, **kwargs):
    def TSC_MMI_iut_start_bonding_procedure_bondable(self, test: str, pts_addr: bytes, **kwargs):
        """
        """
        Please start the Bonding Procedure in bondable mode.
        (Please start the Bonding Procedure in bondable mode.|Please configure the IUT into LE Security and start pairing process.)
        """
        """


        if not self.pairing_events:
            self.pairing_events = self.security.OnPairing()
            self.pairing_events = self.security.OnPairing()

        if not self.connection:
            self.connection = next(self.advertise).connection
            self.connection = next(self.advertise).connection

        if test == "GAP/DM/BON/BV-01-C":
        if test == "GAP/DM/BON/BV-01-C":
            # we already started in the previous test
            # we already started in the previous test
            return "OK"
            return "OK"
@@ -561,8 +605,15 @@ class GAPProxy(ProfileProxy):
        if test not in {"GAP/SEC/AUT/BV-21-C"}:
        if test not in {"GAP/SEC/AUT/BV-21-C"}:
            self.security_storage.DeleteBond(public=pts_addr)
            self.security_storage.DeleteBond(public=pts_addr)


        if test in ["GAP/SEC/SEM/BV-53-C"]:
            self.security.Secure(connection=self.connection, classic=LEVEL1)
        else:

            def secure():
                self.security.Secure(connection=self.connection, le=LE_LEVEL3)
                self.security.Secure(connection=self.connection, le=LE_LEVEL3)


            Thread(target=secure).start()

        return "OK"
        return "OK"


    @assert_description
    @assert_description
@@ -617,7 +668,7 @@ class GAPProxy(ProfileProxy):


        return "OK"
        return "OK"


    def TSC_MMI_make_iut_general_discoverable(self, **kwargs):
    def TSC_MMI_make_iut_general_discoverable(self, test: str, **kwargs):
        """
        """
        Please make IUT general discoverable. Press OK to continue.
        Please make IUT general discoverable. Press OK to continue.
        """
        """
@@ -630,6 +681,11 @@ class GAPProxy(ProfileProxy):
            connectable=True,
            connectable=True,
        )
        )


        if test in [
                "GAP/SEC/SEM/BI-31-C",
        ]:
            self.pairing_events = self.security.OnPairing()

        return "OK"
        return "OK"


    @assert_description
    @assert_description
@@ -729,13 +785,17 @@ class GAPProxy(ProfileProxy):


        return "OK"
        return "OK"


    @assert_description
    @match_description
    def TSC_MMI_iut_enter_non_connectable_mode(self, **kwargs):
    def TSC_MMI_iut_enter_non_connectable_mode(self, **kwargs):
        """
        """
        Please enter Non-Connectable mode.
        Please enter (Non-Connectable|non connectable) mode( and genrate advertising report event)?.
        """
        """


        self.host.SetConnectabilityMode(mode=NOT_CONNECTABLE)
        self.advertise = self.host.Advertise(
            data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL),
            own_address_type=PUBLIC,
            connectable=False,
        )


        return "OK"
        return "OK"


@@ -877,11 +937,33 @@ class GAPProxy(ProfileProxy):


        return "OK"
        return "OK"


    @assert_description
    @match_description
    def TSC_MMI_iut_send_ll_connection_update_request(self, **kwargs):
    def TSC_MMI_iut_send_ll_connection_update_request(self, **kwargs):
        """
        """
        Please send a LL Connection Parameter Update request using valid
        Please send a LL Connection Parameter Update request using valid
        parameters.
        parameters.
        (With 0x0032 value set in TSPX_conn_update_int_min
         0x0046
        value set in TSPX_conn_update_int_max
         0x0001 value set in
        TSPX_conn_update_peripheral_latency and
         0x01F4 value set in
        TSPX_conn_update_supervision_timeout)?
        """

        return "OK"

    @assert_description
    def TSC_MMI_iut_send_le_connection_update_request(self, **kwargs):
        """
        Please start a Connection Update procedure using valid parameters.
        With 0x0032 value set in TSPX_conn_update_int_min
         0x0046 value set in
        TSPX_conn_update_int_max
         0x0001 value set in
        TSPX_conn_update_peripheral_latency and
         0x01F4 value set in
        TSPX_conn_update_supervision_timeout
        """
        """


        return "OK"
        return "OK"
@@ -909,6 +991,15 @@ class GAPProxy(ProfileProxy):


        return handle_format(response.service.characteristics[0].handle)
        return handle_format(response.service.characteristics[0].handle)


    @assert_description
    def TSC_MMI_iut_remove_bonding(self, pts_addr: bytes, **kwargs):
        """
        Please have Upper Tester remove the bonding information of the PTS.
        Press OK to continue.
        """

        self.security_storage.DeleteBond(public=pts_addr)

        return "OK"
        return "OK"


    @assert_description
    @assert_description
@@ -934,6 +1025,121 @@ class GAPProxy(ProfileProxy):


        return "OK"
        return "OK"


    @match_description
    def TSC_MMI_helper_do_not_find_confirm(self, pts_addr: bytes, passkey: str, **kwargs):
        """
        Please confirm the following number matches IUT: (?P<passkey>[0-9]+).
        """

        for event in self.pairing_events:
            if event.address == pts_addr and event.numeric_comparison == int(passkey):
                self.pairing_events.send(PairingEventAnswer(event=event, confirm=True))
                return "OK"

        assert False

    @assert_description
    def _mmi_208(self, **kwargs):
        """
        Please configure the IUT into LE Security Mode 1 Level 4 and start
        pairing process.
        """

        def secure():
            self.pairing_events = self.security.OnPairing()
            self.security.Secure(connection=self.connection, le=LE_LEVEL3)

        Thread(target=secure).start()

        return "OK"

    @assert_description
    def _mmi_252(self, **kwargs):
        """
        Please send L2CAP Connection Response with Security Blocked to PTS.
        """

        # TODO

        return "OK"

    @assert_description
    def _mmi_261(self, **kwargs):
        """
        Please bring IUT to Security Mode 4 level 2. Press OK to continue.
        """

        # TODO

        return "OK"

    @assert_description
    def _mmi_263(self, **kwargs):
        """
        Please bring IUT to Security Mode 4 level 4. Press OK to continue.
        """

        # TODO

        return "OK"

    @assert_description
    def _mmi_264(self, **kwargs):
        """
        Please send L2CAP Connection Request to PTS.
        """

        # TODO

        return "OK"

    @assert_description
    def _mmi_265(self, **kwargs):
        """
        Please initiate a link encryption with the Lower Tester.
        """

        # TODO

        return "OK"

    @assert_description
    def _mmi_273(self, **kwargs):
        """
        Please trigger channel creation. Expect to perform link encryption
        before channel creation.
        """

        self.security.Secure(connection=self.connection, classic=LEVEL2)

        return "OK"

    @assert_description
    def _mmi_20001(self, **kwargs):
        """
        Please prepare IUT into a connectable mode.

        Description: Verify that
        the Implementation Under Test (IUT) can accept GATT connect request from
        PTS.
        """

        return "OK"

    @assert_description
    def _mmi_20115(self, **kwargs):
        """
        Please initiate a GATT disconnection to the PTS.

        Description: Verify
        that the Implementation Under Test (IUT) can initiate GATT disconnect
        request to PTS.
        """

        # TODO

        return "OK"

    def _auto_confirm_requests(self, times=None):
    def _auto_confirm_requests(self, times=None):


        def task():
        def task():
Loading