Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7d3ab27a authored by Myles Watson's avatar Myles Watson
Browse files

Pandora: Implement RFCOMM/

RFCOMM/DEVA/
RFCOMM/DEVB/
RFCOMM/DEVA-DEVB/

There are two tests that are still skipped.

I couldn't find the IXIT value referenced in the test spec,
which would allow the tests to pass:

The role of DevA/DevB taken on by the IUT does not
matter to achieve a pass verdict for some test cases
for this Protocol as indicated in the test purpose and is
specified in the test case identifier for role agnostic
tests as detailed in Table 4.1. The role of the IUT/Lower
Tester for these tests is specified in the IXIT [7] in
order to enable the correct test environment conditions
to provoke the Lower Tester.

Since we don't support hooking up a physical serial_port,
TSPC_SPP_2_1 is set to false, which disables
RFCOMM/DEVA-DEVB/RFC/BV-14-C

Bug: 240483035
Test: atest pts-bot:RFCOMM/ -v
Change-Id: I7df1d38f1a5501a42030d6dfeff5560eb1124fe2
parent 65341b98
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@ from mmi2grpc.hfp import HFPProxy
from mmi2grpc.hid import HIDProxy
from mmi2grpc.hogp import HOGPProxy
from mmi2grpc.l2cap import L2CAPProxy
from mmi2grpc.rfcomm import RFCOMMProxy
from mmi2grpc.sdp import SDPProxy
from mmi2grpc.sm import SMProxy
from mmi2grpc._helpers import format_proxy
@@ -71,6 +72,7 @@ class IUT:
        self._hid = None
        self._hogp = None
        self._l2cap = None
        self._rfcomm = None
        self._sdp = None
        self._sm = None

@@ -96,6 +98,7 @@ class IUT:
        self._l2cap = None
        self._hid = None
        self._hogp = None
        self._rfcomm = None
        self._sdp = None
        self._sm = None

@@ -192,6 +195,11 @@ class IUT:
            if not self._l2cap:
                self._l2cap = L2CAPProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'))
            return self._l2cap.interact(test, interaction, description, pts_address)
        # Handles RFCOMM MMIs.
        if profile in ('RFCOMM'):
            if not self._rfcomm:
                self._rfcomm = RFCOMMProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'))
            return self._rfcomm.interact(test, interaction, description, pts_address)
        # Handles SDP MMIs.
        if profile in ('SDP'):
            if not self._sdp:
+226 −0
Original line number Diff line number Diff line
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Rfcomm proxy module."""

from mmi2grpc._helpers import assert_description
from mmi2grpc._proxy import ProfileProxy

from pandora_experimental.rfcomm_grpc import RFCOMM
from pandora_experimental.host_grpc import Host

import sys
import threading
import os
import socket


class RFCOMMProxy(ProfileProxy):

    # The UUID for Serial-Port Profile
    SPP_UUID = "00001101-0000-1000-8000-00805f9b34fb"
    # TSPX_SERVICE_NAME_TESTER
    SERVICE_NAME = "COM5"

    def __init__(self, channel: str):
        super().__init__(channel)
        self.rfcomm = RFCOMM(channel)
        self.host = Host(channel)
        self.server = None
        self.connection = None

    @assert_description
    def TSC_RFCOMM_mmi_iut_initiate_slc(self, pts_addr: bytes, test: str, **kwargs):
        """
        Take action to initiate an RFCOMM service level connection (l2cap).
        """

        try:
            self.connection = self.rfcomm.ConnectToServer(address=pts_addr, uuid=self.SPP_UUID).connection
        except Exception as e:
            if test == "RFCOMM/DEVA/RFC/BV-01-C":
                print(f'{test}: PTS disconnected as expected', file=sys.stderr)
                return "OK"
            else:
                print(f'{test}: PTS disconnected unexpectedly', file=sys.stderr)
                raise e
        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_accept_slc(self, pts_addr: bytes, **kwargs):
        """
        Take action to accept the RFCOMM service level connection from the
        tester.
        """

        self.server = self.rfcomm.StartServer(uuid=self.SPP_UUID, name=self.SERVICE_NAME).server

        self.host.WaitConnection(address=pts_addr)

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_accept_sabm(self, **kwargs):
        """
        Take action to accept the SABM operation initiated by the tester.

        Note:
        Make sure that the RFCOMM server channel is set correctly in
        TSPX_server_channel_iut
        """

        self.connection = self.rfcomm.AcceptConnection(server=self.server).connection
        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_respond_PN(self, **kwargs):
        """
        Take action to respond PN.
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_initiate_sabm_control_channel(self, **kwargs):
        """
        Take action to initiate an SABM operation for the RFCOMM control
        channel.
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_initiate_PN(self, **kwargs):
        """
        Take action to initiate PN.
        """

        return "OK"

    def TSC_RFCOMM_mmi_iut_initiate_sabm_data_channel(self, **kwargs):
        """
        Take action to initiate an SABM operation for an RFCOMM data channel.
        Note: RFCOMM server channel can be found on PTS's SDP record
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_accept_disc(self, **kwargs):
        """
        Take action to accept the DISC operation initiated by the tester.
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_accept_data_link_connection(self, **kwargs):
        """
        Take action to accept a new DLC initiated by the tester.
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_initiate_close_session(self, **kwargs):
        """
        Take action to close the RFCOMM session.
        """

        self.rfcomm.Disconnect(connection=self.connection)

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_respond_RLS(self, **kwargs):
        """
        Take action to respond RLS command.
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_initiate_MSC(self, **kwargs):
        """
        Take action to initiate MSC command.
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_respond_RPN(self, **kwargs):
        """
        Take action to respond RPN.
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_respond_NSC(self, **kwargs):
        """
        Take action to respond NSC.
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_initiate_close_dlc(self, **kwargs):
        """
        Take action to close the DLC.
        """

        self.rfcomm.Disconnect(connection=self.connection)

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_respond_Test(self, **kwargs):
        """
        Take action to respond Test.
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_respond_MSC(self, **kwargs):
        """
        Take action to respond MSC.
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_send_data(self, **kwargs):
        """
        Take action to send data on the open DLC on PTS with at least 2 frames.
        """

        self.rfcomm.Send(connection=self.connection, data=b'Some data to send')
        self.rfcomm.Send(connection=self.connection, data=b'More data to send')
        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_user_wait_no_uih_data(self, **kwargs):
        """
        Please wait while the tester confirms no data is sent ...
        """

        return "OK"

    @assert_description
    def TSC_RFCOMM_mmi_iut_initiate_RLS_framing_error(self, **kwargs):
        """
        Take action to initiate RLS command with Framing Error status.
        """

        return "OK"
+1 −0
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ java_library_static {
        "grpc-java-lite",
        "guava",
        "opencensus-java-api",
        "kotlin-test",
        "kotlinx_coroutines",
        "pandora-grpc-java",
        "pandora-proto-java",
+1 −0
Original line number Diff line number Diff line
@@ -41,6 +41,7 @@
        <option name="profile" value="HID/HOS" />
        <option name="profile" value="HOGP" />
        <option name="profile" value="L2CAP/LE" />
        <option name="profile" value="RFCOMM" />
        <option name="profile" value="SDP" />
        <option name="profile" value="SM" />
    </test>
+21 −2
Original line number Diff line number Diff line
@@ -295,6 +295,20 @@
    "L2CAP/LE/CPU/BI-02-C",
    "L2CAP/LE/CPU/BV-02-C",
    "L2CAP/LE/REJ/BI-01-C",
    "RFCOMM/DEVA/RFC/BV-01-C",
    "RFCOMM/DEVB/RFC/BV-02-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-03-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-04-C",
    "RFCOMM/DEVA/RFC/BV-05-C",
    "RFCOMM/DEVB/RFC/BV-06-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-07-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-08-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-11-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-13-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-15-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-17-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-19-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-25-C,",
    "SDP/SR/BRW/BV-02-C",
    "SDP/SR/SA/BI-01-C",
    "SDP/SR/SA/BI-02-C",
@@ -630,6 +644,8 @@
    "L2CAP/LE/CID/BV-02-C",
    "L2CAP/LE/CPU/BV-01-C",
    "L2CAP/LE/REJ/BI-02-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-21-C",
    "RFCOMM/DEVA-DEVB/RFC/BV-22-C",
    "SM/CEN/PKE/BV-01-C",
    "SM/CEN/SCCT/BV-03-C",
    "SM/CEN/SCCT/BV-05-C",
@@ -1625,7 +1641,7 @@
    "TSPC_SPP_0_2": true,
    "TSPC_SPP_1_1": true,
    "TSPC_SPP_1_2": true,
    "TSPC_SPP_2_1": true,
    "TSPC_SPP_2_1": false,
    "TSPC_SPP_2_1b": true,
    "TSPC_SPP_3_1": true,
    "TSPC_SPP_3_2": true,
@@ -1671,7 +1687,10 @@
    "PAN": {},
    "PBAP": {},
    "PROD": {},
    "RFCOMM": {},
    "RFCOMM": {
      "TSPX_server_channel_iut": "7",
      "TSPX_security_enabled": "FALSE"
    },
    "SAP": {},
    "SCPP": {},
    "SDP": {},
Loading