Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 997de039 authored by Henri Chataing's avatar Henri Chataing
Browse files

Pandora: Add bindings for VCP profile tests

- Import the ICS from the publicly available google
  device specification
- Implement VCPProxy with VCP interactions
- Implement barebone le_audio pandora_experimental
  interface and server support

Bug: 291996279
Test: atest pts-bot:VCP
Change-Id: I7a29ac4591222623ff66a635747b6e23013b670b
parent b8670987
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@ from mmi2grpc.pbap import PBAPProxy
from mmi2grpc.rfcomm import RFCOMMProxy
from mmi2grpc.sdp import SDPProxy
from mmi2grpc.sm import SMProxy
from mmi2grpc.vcp import VCPProxy
from mmi2grpc._helpers import format_proxy
from mmi2grpc._rootcanal import RootCanal
from mmi2grpc._modem import Modem
@@ -89,6 +90,7 @@ class IUT:
        self._rfcomm = None
        self._sdp = None
        self._sm = None
        self._vcp = None

    def __enter__(self):
        """Resets the IUT when starting a PTS test."""
@@ -125,6 +127,7 @@ class IUT:
        self._rfcomm = None
        self._sdp = None
        self._sm = None
        self._vcp = None

    def _retry(self, func):

@@ -273,6 +276,11 @@ class IUT:
            if not self._sm:
                self._sm = SMProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
            return self._sm.interact(test, interaction, description, pts_address)
        # HandlesVCP MMIs.
        if profile in ("VCP"):
            if not self._vcp:
                self._vcp = VCPProxy(grpc.insecure_channel(f"localhost:{self.pandora_server_port}"), self.rootcanal)
            return self._vcp.interact(test, interaction, description, pts_address)

        # Handles unsupported profiles.
        code = format_proxy(profile, interaction, description)
+2 −0
Original line number Diff line number Diff line
@@ -145,6 +145,7 @@ class SDPProxy(ProfileProxy):
            "HandsfreeAudioGateway",
            "GenericAudio",
            "Message Access Server",
            "TMAS",
            "NAP",
            "PANU",
            "Phonebook Access - PSE",
@@ -155,6 +156,7 @@ class SDPProxy(ProfileProxy):
            "Generic Attribute service",
            "A/V_RemoteControlController",
            "Android Auto Compatibility",
            "TMAS",
        ]
        movable_services = [
            "Message Access Server",
+121 −0
Original line number Diff line number Diff line
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""VCP proxy module."""
import threading

from mmi2grpc._helpers import assert_description, match_description
from mmi2grpc._proxy import ProfileProxy
from mmi2grpc._rootcanal import Dongle

from pandora.security_grpc import Security
from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer
from pandora.host_grpc import Host
from pandora.host_pb2 import PUBLIC, RANDOM
from pandora_experimental.le_audio_grpc import LeAudio


class VCPProxy(ProfileProxy):

    def __init__(self, channel, rootcanal):
        super().__init__(channel)
        self.host = Host(channel)
        self.security = Security(channel)
        self.le_audio = LeAudio(channel)
        self.rootcanal = rootcanal
        self.connection = None
        self.pairing_stream = None

    def test_started(self, test: str, description: str, pts_addr: bytes):
        self.rootcanal.select_pts_dongle(Dongle.LAIRD_BL654)

        return "OK"

    @assert_description
    def IUT_INITIATE_CONNECTION(self, pts_addr: bytes, **kwargs):
        """
        Please initiate a GATT connection to the PTS.

        Description: Verify that
        the Implementation Under Test (IUT) can initiate a GATT connect request
        to the PTS.
        """
        self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=pts_addr).connection
        self.pairing_stream = self.security.OnPairing()

        def secure():
            self.security.Secure(connection=self.connection, le=LE_LEVEL3)

        threading.Thread(target=secure).start()
        return "OK"

    @match_description
    def _mmi_2004(self, pts_addr: bytes, passkey: str, **kwargs):
        """
        Please confirm that 6 digit number is matched with (?P<passkey>[0-9]*).
        """
        received = []
        for event in self.pairing_stream:
            if event.address == pts_addr and event.numeric_comparison == int(passkey):
                self.pairing_stream.send(PairingEventAnswer(
                    event=event,
                    confirm=True,
                ))
                return "OK"
            received.append(event.numeric_comparison)

        assert False, f"mismatched passcode: expected {passkey}, received {received}"

    @match_description
    def IUT_INITIATE_DISCOVER_CHARACTERISTIC(self, **kwargs):
        """
        Please take action to discover the
        (Volume Control Point|Volume State|Volume Flags|Offset State|Volume Offset Control Point)
        characteristic from the Volume (Offset)? Control. Discover the primary service if needed.
        Description: Verify that the Implementation Under Test \(IUT\) can send
        Discover All Characteristics command.
        """
        return "OK"

    @match_description
    def IUT_READ_CHARACTERISTIC(self, name: str, handle: str, **kwargs):
        """
        Please send Read Request to read (?P<name>(Volume State|Volume Flags|Offset State)) characteristic with handle
        = (?P<handle>(0x[0-9A-Fa-f]{4})).
        """
        return "OK"

    @assert_description
    def USER_CONFIRM_SUPPORTED_CHARACTERISTIC(self, characteristics: str, **kwargs):
        """
        Please verify that for each supported characteristic, attribute
        handle/UUID pair(s) is returned to the upper tester.(?P<characteristics>(.|\n)*)
        """

        return "OK"

    @match_description
    def IUT_CONFIG_NOTIFICATION(self, name: str, **kwargs):
        """
        Please write to Client Characteristic Configuration Descriptor of
        (?P<name>(Volume State|Offset State)) characteristic to enable notification.
        """
        return "OK"

    @assert_description
    def _mmi_20501(self, **kwargs):
        """
        Please start general inquiry. Click 'Yes' If IUT does discovers PTS
        otherwise click 'No'.
        """
        return "OK"
+1 −0
Original line number Diff line number Diff line
@@ -56,5 +56,6 @@
        <option name="profile" value="RFCOMM" />
        <option name="profile" value="SDP" />
        <option name="profile" value="SM" />
        <option name="profile" value="VCP" />
    </test>
</configuration>
+59 −3
Original line number Diff line number Diff line
@@ -1012,7 +1012,21 @@
    "SM/PER/SCPK/BI-04-C",
    "SM/PER/SCPK/BV-03-C",
    "SM/PER/SIE/BV-01-C",
    "SM/PER/SIP/BV-01-C"
    "SM/PER/SIP/BV-01-C",
    "VCP/VC/CGGIT/CHA/BV-01-C",
    "VCP/VC/CGGIT/CHA/BV-02-C",
    "VCP/VC/CGGIT/CHA/BV-03-C",
    "VCP/VC/CGGIT/CHA/BV-04-C",
    "VCP/VC/CGGIT/SER/BV-01-C",
    "VCP/VC/CGGIT/SER/BV-02-C",
    "VCP/VC/SPE/BI-05-C",
    "VCP/VC/SPE/BI-06-C",
    "VCP/VC/SPE/BI-13-C",
    "VCP/VC/SPE/BI-15-C",
    "VCP/VC/SPE/BI-16-C",
    "VCP/VC/VCCP/BV-05-C",
    "VCP/VC/VCCP/BV-06-C",
    "VCP/VC/VOCP/BV-01-C"
  ],
  "ics": {
    "TSPC_4.0HCI_1a_2": true,
@@ -2113,7 +2127,48 @@
    "TSPC_SPP_4_5": true,
    "TSPC_SPP_4_6": true,
    "TSPC_SUM ICS_31_22": true,
    "TSPC_SUM ICS_52_1": true
    "TSPC_SUM ICS_52_1": true,
    "TSPC_VCP_1_2": true,
    "TSPC_VCP_2_2": true,
    "TSPC_VCP_8_1": true,
    "TSPC_VCP_10_1": true,
    "TSPC_VCP_10_2": true,
    "TSPC_VCP_11_1": true,
    "TSPC_VCP_11_2": true,
    "TSPC_VCP_11_3": true,
    "TSPC_VCP_12_1": true,
    "TSPC_VCP_12_11": true,
    "TSPC_VCP_12_12": true,
    "TSPC_VCP_12_2": true,
    "TSPC_VCP_12_3": true,
    "TSPC_VCP_12_4": true,
    "TSPC_VCP_12_6": true,
    "TSPC_VCP_13_1": true,
    "TSPC_VCP_13_3": true,
    "TSPC_VCP_14_1": true,
    "TSPC_VCP_14_2": true,
    "TSPC_VCP_14_3": true,
    "TSPC_VCP_14_4": true,
    "TSPC_VCP_14_6": true,
    "TSPC_VCP_17_1": true,
    "TSPC_VCP_17_10": true,
    "TSPC_VCP_17_11": true,
    "TSPC_VCP_17_2": true,
    "TSPC_VCP_17_3": true,
    "TSPC_VCP_17_4": true,
    "TSPC_VCP_17_5": true,
    "TSPC_VCP_17_6": true,
    "TSPC_VCP_17_7": true,
    "TSPC_VCP_17_8": true,
    "TSPC_VCP_17_9": true,
    "TSPC_VCP_18_1": true,
    "TSPC_VCP_18_12": true,
    "TSPC_VCP_18_2": true,
    "TSPC_VCP_18_3": true,
    "TSPC_VCP_18_6": true,
    "TSPC_VCP_18_8": true,
    "TSPC_VCP_19_1": true,
    "TSPC_VCP_19_2": true
  },
  "ixit": {
    "default": {},
@@ -2161,6 +2216,7 @@
    "SDP": {},
    "SM": {},
    "SPP": {},
    "SUM ICS": {}
    "SUM ICS": {},
    "VCP": {}
  }
}
Loading