Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 19b284c5 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "[Pandora] Implement HID-over-GATT tests"

parents bd6cfb89 37270834
Loading
Loading
Loading
Loading
+10 −4
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ from mmi2grpc.a2dp import A2DPProxy
from mmi2grpc.avrcp import AVRCPProxy
from mmi2grpc.gatt import GATTProxy
from mmi2grpc.hfp import HFPProxy
from mmi2grpc.hogp import HOGPProxy
from mmi2grpc.sdp import SDPProxy
from mmi2grpc.sm import SMProxy
from mmi2grpc._helpers import format_proxy
@@ -62,6 +63,7 @@ class IUT:
        self._hfp = None
        self._sdp = None
        self._sm = None
        self._hogp = None

    def __enter__(self):
        """Resets the IUT when starting a PTS test."""
@@ -77,6 +79,7 @@ class IUT:
        self._hfp = None
        self._sdp = None
        self._sm = None
        self._hogp = None

    def _retry(self, func):

@@ -106,8 +109,7 @@ class IUT:
        def read_local_address():
            with grpc.insecure_channel(f'localhost:{self.port}') as channel:
                nonlocal mut_address
                mut_address = self._retry(
                    Host(channel).ReadLocalAddress)(wait_for_ready=True).address
                mut_address = self._retry(Host(channel).ReadLocalAddress)(wait_for_ready=True).address

        thread = Thread(target=read_local_address)
        thread.start()
@@ -118,7 +120,6 @@ class IUT:
        else:
            return mut_address


    def interact(self, pts_address: bytes, profile: str, test: str, interaction: str, description: str, style: str,
                 **kwargs) -> str:
        """Routes MMI calls to corresponding profile proxy.
@@ -163,6 +164,11 @@ class IUT:
            if not self._sm:
                self._sm = SMProxy(grpc.insecure_channel(f'localhost:{self.port}'))
            return self._sm.interact(test, interaction, description, pts_address)
        # Handles HOGP MMIs.
        if profile in ('HOGP'):
            if not self._hogp:
                self._hogp = HOGPProxy(grpc.insecure_channel(f'localhost:{self.port}'))
            return self._hogp.interact(test, interaction, description, pts_address)

        # Handles unsupported profiles.
        code = format_proxy(profile, interaction, description)
+61 −34
Original line number Diff line number Diff line
@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helper functions.

Facilitates the implementation of a new profile proxy or a PTS MMI.
@@ -20,6 +19,7 @@ Facilitates the implementation of a new profile proxy or a PTS MMI.
import functools
import textwrap
import unittest
import re

DOCSTRING_WIDTH = 80 - 8  # 80 cols - 8 indentation spaces

@@ -37,10 +37,10 @@ def assert_description(f):
        AssertionError: the docstring of the function does not match the MMI
            description.
    """

    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        description = textwrap.fill(
            kwargs['description'], DOCSTRING_WIDTH, replace_whitespace=False)
        description = textwrap.fill(kwargs['description'], DOCSTRING_WIDTH, replace_whitespace=False)
        docstring = textwrap.dedent(f.__doc__ or '')

        if docstring.strip() != description.strip():
@@ -50,22 +50,51 @@ def assert_description(f):
            # Generate AssertionError.
            test = unittest.TestCase()
            test.maxDiff = None
            test.assertMultiLineEqual(
                docstring.strip(),
                description.strip(),
            test.assertMultiLineEqual(docstring.strip(), description.strip(),
                                      f'description does not match with function docstring of'
                                      f'{f.__name__}')

        return f(*args, **kwargs)

    return wrapper


def match_description(f):
    """Extracts parameters from PTS MMI descriptions.

    Similar to assert_description, but treats the description as an (indented)
    regex that can be used to extract named capture groups from the PTS command.

    Args:
        f: function implementing a PTS MMI.

    Raises:
        AssertionError: the docstring of the function does not match the MMI
            description.
    """

    def normalize(desc):
        return desc.replace("\n", " ").replace("\t", "    ").strip()

    docstring = normalize(textwrap.dedent(f.__doc__))
    regex = re.compile(docstring)

    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        description = normalize(kwargs['description'])
        match = regex.fullmatch(description)

        assert match is not None, f'description does not match with function docstring of {f.__name__}:\n{repr(description)}\n!=\n{repr(docstring)}'

        return f(*args, **kwargs, **match.groupdict())

    return wrapper


def format_function(mmi_name, mmi_description):
    """Returns the base format of a function implementing a PTS MMI."""
    wrapped_description = textwrap.fill(
        mmi_description, DOCSTRING_WIDTH, replace_whitespace=False)
    return (
        f'@assert_description\n'
    wrapped_description = textwrap.fill(mmi_description, DOCSTRING_WIDTH, replace_whitespace=False)
    return (f'@assert_description\n'
            f'def {mmi_name}(self, **kwargs):\n'
            f'    """\n'
            f'{textwrap.indent(wrapped_description, "    ")}\n'
@@ -76,10 +105,8 @@ def format_function(mmi_name, mmi_description):

def format_proxy(profile, mmi_name, mmi_description):
    """Returns the base format of a profile proxy including a given MMI."""
    wrapped_function = textwrap.indent(
        format_function(mmi_name, mmi_description), '    ')
    return (
        f'from mmi2grpc._helpers import assert_description\n'
    wrapped_function = textwrap.indent(format_function(mmi_name, mmi_description), '    ')
    return (f'from mmi2grpc._helpers import assert_description\n'
            f'from mmi2grpc._proxy import ProfileProxy\n'
            f'\n'
            f'from pandora.{profile.lower()}_grpc import {profile}\n'
+298 −0
Original line number Diff line number Diff line
import textwrap
import uuid
import re

from mmi2grpc._helpers import assert_description, match_description
from mmi2grpc._proxy import ProfileProxy

from pandora.host_grpc import Host
from pandora.security_grpc import Security
from pandora.gatt_grpc import GATT

BASE_UUID = uuid.UUID("00000000-0000-1000-8000-00805F9B34FB")


def short_uuid(full: uuid.UUID) -> int:
    return (uuid.UUID(full).int - BASE_UUID.int) >> 96


class HOGPProxy(ProfileProxy):

    def __init__(self, channel):
        super().__init__()
        self.host = Host(channel)
        self.security = Security(channel)
        self.gatt = GATT(channel)
        self.connection = None
        self.pairing_stream = None
        self.characteristic_reads = {}

    @assert_description
    def IUT_INITIATE_CONNECTION(self, pts_addr: bytes, **kwargs):
        """
        Please initiate a GATT connection to the PTS.

        Description: Verify that
        the Implementation Under Test (IUT) can initiate a GATT connect request
        to the PTS.
        """

        self.connection = self.host.ConnectLE(address=pts_addr).connection
        self.pairing_stream = self.security.OnPairing()
        self.security.Pair(connection=self.connection)

        return "OK"

    @match_description
    def _mmi_2004(self, pts_addr: bytes, passkey: str, **kwargs):
        """
        Please confirm that 6 digit number is matched with (?P<passkey>[0-9]*).
        """
        received = []
        for event in self.pairing_stream:
            if event.address == pts_addr and event.numeric_comparison == int(passkey):
                self.pairing_stream.send(
                    event=event,
                    confirm=True,
                )
                self.pairing_stream.close()
                return "OK"
            received.append(event.numeric_comparison)

        assert False, f"mismatched passcode: expected {passkey}, received {received}"

    @match_description
    def IUT_SEND_WRITE_REQUEST(self, handle: str, properties: str, **kwargs):
        r"""
        Please send write request to handle (?P<handle>\S*) with following value.
        Client
        Characteristic Configuration:
             Properties: \[0x00(?P<properties>\S*)\]
        """

        self.gatt.WriteCharacteristicDescriptorFromHandle(
            connection=self.connection,
            handle=int(handle, base=16),
            value=bytes([int(f"0x{properties}", base=16), 0]),
        )

        return "OK"

    @match_description
    def USER_CONFIRM_CHARACTERISTIC(self, body: str, **kwargs):
        r"""
        Please verify that following attribute handle/UUID pair was returned
        containing the UUID for the (.*)\.

        (?P<body>.*)
        """

        PATTERN = re.compile(
            r"""
                Attribute Handle = (\S*)
                Characteristic Properties = (?P<properties>\S*)
                Handle = (?P<handle>\S*)
                UUID = (?P<uuid>\S*)
                """,
            re.VERBOSE,
        )

        targets = set()

        for match in PATTERN.finditer(body):
            targets.add((
                int(match.group("properties"), base=16),
                int(match.group("handle"), base=16),
                int(match.group("uuid"), base=16),
            ))

        assert len(targets) == body.count("Characteristic Properties"), "safety check that regex is matching something"

        services = self.gatt.DiscoverServices(connection=self.connection).services

        for service in services:
            for characteristic in service.characteristics:
                uuid_16 = short_uuid(characteristic.uuid)
                key = (characteristic.properties, characteristic.handle, uuid_16)
                if key in targets:
                    targets.remove(key)

        assert not targets, f"could not find handles: {targets}"

        return "OK"

    @match_description
    def USER_CONFIRM_CHARACTERISTIC_DESCRIPTOR(self, body: str, **kwargs):
        r"""
        Please verify that following attribute handle/UUID pair was returned
        containing the UUID for the (.*)\.

        (?P<body>.*)
        """

        PATTERN = re.compile(rf"handle = (?P<handle>\S*)\s* uuid = (?P<uuid>\S*)")

        targets = set()

        for match in PATTERN.finditer(body):
            targets.add((
                int(match.group("handle"), base=16),
                int(match.group("uuid"), base=16),
            ))

        assert len(targets) == body.count("uuid = "), "safety check that regex is matching something"

        services = self.gatt.DiscoverServices(connection=self.connection).services

        for service in services:
            for characteristic in service.characteristics:
                for descriptor in characteristic.descriptors:
                    uuid_16 = short_uuid(descriptor.uuid)
                    key = (descriptor.handle, uuid_16)
                    if key in targets:
                        targets.remove(key)

        assert not targets, f"could not find handles: {targets}"

        return "OK"

    @match_description
    def USER_CONFIRM_SERVICE_HANDLE(self, service_name: str, body: str, **kwargs):
        r"""
        Please confirm the following handles for (?P<service_name>.*)\.

        (?P<body>.*)
        """

        PATTERN = re.compile(r"Start Handle: (?P<start_handle>\S*)     End Handle: (?P<end_handle>\S*)")

        SERVICE_UUIDS = {
            "Device Information": 0x180A,
            "Battery Service": 0x180F,
            "Human Interface Device": 0x1812,
        }

        target_uuid = SERVICE_UUIDS[service_name]

        services = self.gatt.DiscoverServices(connection=self.connection).services

        assert len(
            PATTERN.findall(body)) == body.count("Start Handle:"), "safety check that regex is matching something"

        for match in PATTERN.finditer(body):
            start_handle = match.group("start_handle")

            for service in services:
                if service.handle == int(start_handle, base=16):
                    assert (short_uuid(service.uuid) == target_uuid), "service UUID does not match expected type"
                    break
            else:
                assert False, f"cannot find service with start handle {start_handle}"

        return "OK"

    @assert_description
    def _mmi_1(self, **kwargs):
        """
        Please confirm that the IUT ignored the received Notification and did
        not report the values to the Upper Tester.
        """

        # TODO

        return "OK"

    @match_description
    def IUT_CONFIG_NOTIFICATION(self, value: str, **kwargs):
        r"""
        Please write to Client Characteristic Configuration Descriptor of Report
        characteristic to enable notification.

        Descriptor handle value: (?P<value>\S*)
        """

        self.gatt.WriteCharacteristicDescriptorFromHandle(
            connection=self.connection,
            handle=int(value, base=16),
            value=bytes([0x01, 0x00]),
        )

        return "OK"

    @match_description
    def IUT_READ_CHARACTERISTIC(self, test: str, characteristic_name: str, handle: str, **kwargs):
        r"""
        Please send Read Request to read (?P<characteristic_name>.*) characteristic with handle =
        (?P<handle>\S*).
        """

        TESTS_READING_CHARACTERISTIC_NOT_DESCRIPTORS = [
            "HOGP/RH/HGRF/BV-01-I",
            "HOGP/RH/HGRF/BV-10-I",
            "HOGP/RH/HGRF/BV-12-I",
        ]

        action = (self.gatt.ReadCharacteristicFromHandle if test in TESTS_READING_CHARACTERISTIC_NOT_DESCRIPTORS else
                  self.gatt.ReadCharacteristicDescriptorFromHandle)

        handle = int(handle, base=16)
        self.characteristic_reads[handle] = action(
            connection=self.connection,
            handle=handle,
        ).readValue.value

        return "OK"

    @match_description
    def USER_CONFIRM_READ_RESULT(self, characteristic_name: str, body: str, **kwargs):
        r"""
        Please verify following (?P<characteristic_name>.*) Characteristic value is Read.

        (?P<body>.*)
        """

        blocks = re.split("Handle:", body)

        HEX = "[0-9A-F]"
        PATTERN = re.compile(f"0x{HEX*2}(?:{HEX*2})?")

        num_checks = 0

        for block in blocks:
            data = PATTERN.findall(block)
            if not data:
                continue

            # first hex value is the handle, rest is the expected data
            handle, *data = data

            handle = int(handle, base=16)

            actual = self.characteristic_reads[handle]

            expected = []
            for word in data:
                if len(word) == len("0x0000"):
                    first = int(word[2:4], base=16)
                    second = int(word[4:6], base=16)

                    if "bytes in LSB order" in body:
                        little = first
                        big = second
                    else:
                        little = second
                        big = first

                    expected.append(little)
                    expected.append(big)
                else:
                    expected.append(int(word, base=16))

            expected = bytes(expected)

            num_checks += 1
            assert (expected == actual), f"Got unexpected value for handle {handle}: {repr(expected)} != {repr(actual)}"

        assert (body.count("Handle:") == num_checks), "safety check that regex is matching something"

        return "OK"
+1 −1
Original line number Diff line number Diff line
@@ -35,6 +35,6 @@
        <option name="profile" value="SM/CEN/EKS" />
        <option name="profile" value="SM/CEN/JW" />
        <option name="profile" value="SM/CEN/KDU" />

        <option name="profile" value="HOGP/RH" />
    </test>
</configuration>
+26 −0
Original line number Diff line number Diff line
@@ -77,6 +77,32 @@
    "GATT/CL/GAR/BV-04-C",
    "GATT/CL/GAR/BV-06-C",
    "GATT/CL/GAR/BV-07-C",
    "HOGP/RH/HGCF/BV-01-I",
    "HOGP/RH/HGDC/BV-01-I",
    "HOGP/RH/HGDC/BV-02-I",
    "HOGP/RH/HGDC/BV-03-I",
    "HOGP/RH/HGDC/BV-04-I",
    "HOGP/RH/HGDC/BV-05-I",
    "HOGP/RH/HGDC/BV-06-I",
    "HOGP/RH/HGDC/BV-07-I",
    "HOGP/RH/HGDC/BV-14-I",
    "HOGP/RH/HGDC/BV-15-I",
    "HOGP/RH/HGDC/BV-16-I",
    "HOGP/RH/HGDR/BV-01-I",
    "HOGP/RH/HGDS/BV-01-I",
    "HOGP/RH/HGDS/BV-02-I",
    "HOGP/RH/HGDS/BV-03-I",
    "HOGP/RH/HGNF/BI-01-I",
    "HOGP/RH/HGNF/BI-02-I",
    "HOGP/RH/HGNF/BV-01-I",
    "HOGP/RH/HGRF/BV-01-I",
    "HOGP/RH/HGRF/BV-02-I",
    "HOGP/RH/HGRF/BV-04-I",
    "HOGP/RH/HGRF/BV-06-I",
    "HOGP/RH/HGRF/BV-08-I",
    "HOGP/RH/HGRF/BV-05-I",
    "HOGP/RH/HGRF/BV-10-I",
    "HOGP/RH/HGRF/BV-12-I",
    "SDP/SR/BRW/BV-02-C",
    "SDP/SR/SA/BI-01-C",
    "SDP/SR/SA/BI-02-C",
Loading