Loading android/pandora/mmi2grpc/mmi2grpc/__init__.py +8 −0 Original line number Diff line number Diff line Loading @@ -30,6 +30,7 @@ from mmi2grpc.hfp import HFPProxy from mmi2grpc.hid import HIDProxy from mmi2grpc.hogp import HOGPProxy from mmi2grpc.l2cap import L2CAPProxy from mmi2grpc.rfcomm import RFCOMMProxy from mmi2grpc.sdp import SDPProxy from mmi2grpc.sm import SMProxy from mmi2grpc._helpers import format_proxy Loading Loading @@ -71,6 +72,7 @@ class IUT: self._hid = None self._hogp = None self._l2cap = None self._rfcomm = None self._sdp = None self._sm = None Loading @@ -96,6 +98,7 @@ class IUT: self._l2cap = None self._hid = None self._hogp = None self._rfcomm = None self._sdp = None self._sm = None Loading Loading @@ -192,6 +195,11 @@ class IUT: if not self._l2cap: self._l2cap = L2CAPProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}')) return self._l2cap.interact(test, interaction, description, pts_address) # Handles RFCOMM MMIs. if profile in ('RFCOMM'): if not self._rfcomm: self._rfcomm = RFCOMMProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}')) return self._rfcomm.interact(test, interaction, description, pts_address) # Handles SDP MMIs. if profile in ('SDP'): if not self._sdp: Loading android/pandora/mmi2grpc/mmi2grpc/rfcomm.py 0 → 100644 +226 −0 Original line number Diff line number Diff line # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Rfcomm proxy module.""" from mmi2grpc._helpers import assert_description from mmi2grpc._proxy import ProfileProxy from pandora_experimental.rfcomm_grpc import RFCOMM from pandora_experimental.host_grpc import Host import sys import threading import os import socket class RFCOMMProxy(ProfileProxy): # The UUID for Serial-Port Profile SPP_UUID = "00001101-0000-1000-8000-00805f9b34fb" # TSPX_SERVICE_NAME_TESTER SERVICE_NAME = "COM5" def __init__(self, channel: str): super().__init__(channel) self.rfcomm = RFCOMM(channel) self.host = Host(channel) self.server = None self.connection = None @assert_description def TSC_RFCOMM_mmi_iut_initiate_slc(self, pts_addr: bytes, test: str, **kwargs): """ Take action to initiate an RFCOMM service level connection (l2cap). """ try: self.connection = self.rfcomm.ConnectToServer(address=pts_addr, uuid=self.SPP_UUID).connection except Exception as e: if test == "RFCOMM/DEVA/RFC/BV-01-C": print(f'{test}: PTS disconnected as expected', file=sys.stderr) return "OK" else: print(f'{test}: PTS disconnected unexpectedly', file=sys.stderr) raise e return "OK" @assert_description def TSC_RFCOMM_mmi_iut_accept_slc(self, pts_addr: bytes, **kwargs): """ Take action to accept the RFCOMM service level connection from the tester. """ self.server = self.rfcomm.StartServer(uuid=self.SPP_UUID, name=self.SERVICE_NAME).server self.host.WaitConnection(address=pts_addr) return "OK" @assert_description def TSC_RFCOMM_mmi_iut_accept_sabm(self, **kwargs): """ Take action to accept the SABM operation initiated by the tester. Note: Make sure that the RFCOMM server channel is set correctly in TSPX_server_channel_iut """ self.connection = self.rfcomm.AcceptConnection(server=self.server).connection return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_PN(self, **kwargs): """ Take action to respond PN. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_sabm_control_channel(self, **kwargs): """ Take action to initiate an SABM operation for the RFCOMM control channel. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_PN(self, **kwargs): """ Take action to initiate PN. """ return "OK" def TSC_RFCOMM_mmi_iut_initiate_sabm_data_channel(self, **kwargs): """ Take action to initiate an SABM operation for an RFCOMM data channel. Note: RFCOMM server channel can be found on PTS's SDP record """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_accept_disc(self, **kwargs): """ Take action to accept the DISC operation initiated by the tester. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_accept_data_link_connection(self, **kwargs): """ Take action to accept a new DLC initiated by the tester. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_close_session(self, **kwargs): """ Take action to close the RFCOMM session. """ self.rfcomm.Disconnect(connection=self.connection) return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_RLS(self, **kwargs): """ Take action to respond RLS command. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_MSC(self, **kwargs): """ Take action to initiate MSC command. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_RPN(self, **kwargs): """ Take action to respond RPN. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_NSC(self, **kwargs): """ Take action to respond NSC. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_close_dlc(self, **kwargs): """ Take action to close the DLC. """ self.rfcomm.Disconnect(connection=self.connection) return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_Test(self, **kwargs): """ Take action to respond Test. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_MSC(self, **kwargs): """ Take action to respond MSC. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_send_data(self, **kwargs): """ Take action to send data on the open DLC on PTS with at least 2 frames. """ self.rfcomm.Send(connection=self.connection, data=b'Some data to send') self.rfcomm.Send(connection=self.connection, data=b'More data to send') return "OK" @assert_description def TSC_RFCOMM_mmi_user_wait_no_uih_data(self, **kwargs): """ Please wait while the tester confirms no data is sent ... """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_RLS_framing_error(self, **kwargs): """ Take action to initiate RLS command with Framing Error status. """ return "OK" android/pandora/server/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ java_library_static { "grpc-java-lite", "guava", "opencensus-java-api", "kotlin-test", "kotlinx_coroutines", "pandora-grpc-java", "pandora-proto-java", Loading android/pandora/server/configs/PtsBotTest.xml +1 −0 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ <option name="profile" value="HID/HOS" /> <option name="profile" value="HOGP" /> <option name="profile" value="L2CAP/LE" /> <option name="profile" value="RFCOMM" /> <option name="profile" value="SDP" /> <option name="profile" value="SM" /> </test> Loading android/pandora/server/configs/pts_bot_tests_config.json +21 −2 Original line number Diff line number Diff line Loading @@ -295,6 +295,20 @@ "L2CAP/LE/CPU/BI-02-C", "L2CAP/LE/CPU/BV-02-C", "L2CAP/LE/REJ/BI-01-C", "RFCOMM/DEVA/RFC/BV-01-C", "RFCOMM/DEVB/RFC/BV-02-C", "RFCOMM/DEVA-DEVB/RFC/BV-03-C", "RFCOMM/DEVA-DEVB/RFC/BV-04-C", "RFCOMM/DEVA/RFC/BV-05-C", "RFCOMM/DEVB/RFC/BV-06-C", "RFCOMM/DEVA-DEVB/RFC/BV-07-C", "RFCOMM/DEVA-DEVB/RFC/BV-08-C", "RFCOMM/DEVA-DEVB/RFC/BV-11-C", "RFCOMM/DEVA-DEVB/RFC/BV-13-C", "RFCOMM/DEVA-DEVB/RFC/BV-15-C", "RFCOMM/DEVA-DEVB/RFC/BV-17-C", "RFCOMM/DEVA-DEVB/RFC/BV-19-C", "RFCOMM/DEVA-DEVB/RFC/BV-25-C,", "SDP/SR/BRW/BV-02-C", "SDP/SR/SA/BI-01-C", "SDP/SR/SA/BI-02-C", Loading Loading @@ -630,6 +644,8 @@ "L2CAP/LE/CID/BV-02-C", "L2CAP/LE/CPU/BV-01-C", "L2CAP/LE/REJ/BI-02-C", "RFCOMM/DEVA-DEVB/RFC/BV-21-C", "RFCOMM/DEVA-DEVB/RFC/BV-22-C", "SM/CEN/PKE/BV-01-C", "SM/CEN/SCCT/BV-03-C", "SM/CEN/SCCT/BV-05-C", Loading Loading @@ -1625,7 +1641,7 @@ "TSPC_SPP_0_2": true, "TSPC_SPP_1_1": true, "TSPC_SPP_1_2": true, "TSPC_SPP_2_1": true, "TSPC_SPP_2_1": false, "TSPC_SPP_2_1b": true, "TSPC_SPP_3_1": true, "TSPC_SPP_3_2": true, Loading Loading @@ -1671,7 +1687,10 @@ "PAN": {}, "PBAP": {}, "PROD": {}, "RFCOMM": {}, "RFCOMM": { "TSPX_server_channel_iut": "7", "TSPX_security_enabled": "FALSE" }, "SAP": {}, "SCPP": {}, "SDP": {}, Loading Loading
android/pandora/mmi2grpc/mmi2grpc/__init__.py +8 −0 Original line number Diff line number Diff line Loading @@ -30,6 +30,7 @@ from mmi2grpc.hfp import HFPProxy from mmi2grpc.hid import HIDProxy from mmi2grpc.hogp import HOGPProxy from mmi2grpc.l2cap import L2CAPProxy from mmi2grpc.rfcomm import RFCOMMProxy from mmi2grpc.sdp import SDPProxy from mmi2grpc.sm import SMProxy from mmi2grpc._helpers import format_proxy Loading Loading @@ -71,6 +72,7 @@ class IUT: self._hid = None self._hogp = None self._l2cap = None self._rfcomm = None self._sdp = None self._sm = None Loading @@ -96,6 +98,7 @@ class IUT: self._l2cap = None self._hid = None self._hogp = None self._rfcomm = None self._sdp = None self._sm = None Loading Loading @@ -192,6 +195,11 @@ class IUT: if not self._l2cap: self._l2cap = L2CAPProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}')) return self._l2cap.interact(test, interaction, description, pts_address) # Handles RFCOMM MMIs. if profile in ('RFCOMM'): if not self._rfcomm: self._rfcomm = RFCOMMProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}')) return self._rfcomm.interact(test, interaction, description, pts_address) # Handles SDP MMIs. if profile in ('SDP'): if not self._sdp: Loading
android/pandora/mmi2grpc/mmi2grpc/rfcomm.py 0 → 100644 +226 −0 Original line number Diff line number Diff line # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Rfcomm proxy module.""" from mmi2grpc._helpers import assert_description from mmi2grpc._proxy import ProfileProxy from pandora_experimental.rfcomm_grpc import RFCOMM from pandora_experimental.host_grpc import Host import sys import threading import os import socket class RFCOMMProxy(ProfileProxy): # The UUID for Serial-Port Profile SPP_UUID = "00001101-0000-1000-8000-00805f9b34fb" # TSPX_SERVICE_NAME_TESTER SERVICE_NAME = "COM5" def __init__(self, channel: str): super().__init__(channel) self.rfcomm = RFCOMM(channel) self.host = Host(channel) self.server = None self.connection = None @assert_description def TSC_RFCOMM_mmi_iut_initiate_slc(self, pts_addr: bytes, test: str, **kwargs): """ Take action to initiate an RFCOMM service level connection (l2cap). """ try: self.connection = self.rfcomm.ConnectToServer(address=pts_addr, uuid=self.SPP_UUID).connection except Exception as e: if test == "RFCOMM/DEVA/RFC/BV-01-C": print(f'{test}: PTS disconnected as expected', file=sys.stderr) return "OK" else: print(f'{test}: PTS disconnected unexpectedly', file=sys.stderr) raise e return "OK" @assert_description def TSC_RFCOMM_mmi_iut_accept_slc(self, pts_addr: bytes, **kwargs): """ Take action to accept the RFCOMM service level connection from the tester. """ self.server = self.rfcomm.StartServer(uuid=self.SPP_UUID, name=self.SERVICE_NAME).server self.host.WaitConnection(address=pts_addr) return "OK" @assert_description def TSC_RFCOMM_mmi_iut_accept_sabm(self, **kwargs): """ Take action to accept the SABM operation initiated by the tester. Note: Make sure that the RFCOMM server channel is set correctly in TSPX_server_channel_iut """ self.connection = self.rfcomm.AcceptConnection(server=self.server).connection return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_PN(self, **kwargs): """ Take action to respond PN. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_sabm_control_channel(self, **kwargs): """ Take action to initiate an SABM operation for the RFCOMM control channel. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_PN(self, **kwargs): """ Take action to initiate PN. """ return "OK" def TSC_RFCOMM_mmi_iut_initiate_sabm_data_channel(self, **kwargs): """ Take action to initiate an SABM operation for an RFCOMM data channel. Note: RFCOMM server channel can be found on PTS's SDP record """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_accept_disc(self, **kwargs): """ Take action to accept the DISC operation initiated by the tester. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_accept_data_link_connection(self, **kwargs): """ Take action to accept a new DLC initiated by the tester. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_close_session(self, **kwargs): """ Take action to close the RFCOMM session. """ self.rfcomm.Disconnect(connection=self.connection) return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_RLS(self, **kwargs): """ Take action to respond RLS command. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_MSC(self, **kwargs): """ Take action to initiate MSC command. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_RPN(self, **kwargs): """ Take action to respond RPN. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_NSC(self, **kwargs): """ Take action to respond NSC. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_close_dlc(self, **kwargs): """ Take action to close the DLC. """ self.rfcomm.Disconnect(connection=self.connection) return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_Test(self, **kwargs): """ Take action to respond Test. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_respond_MSC(self, **kwargs): """ Take action to respond MSC. """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_send_data(self, **kwargs): """ Take action to send data on the open DLC on PTS with at least 2 frames. """ self.rfcomm.Send(connection=self.connection, data=b'Some data to send') self.rfcomm.Send(connection=self.connection, data=b'More data to send') return "OK" @assert_description def TSC_RFCOMM_mmi_user_wait_no_uih_data(self, **kwargs): """ Please wait while the tester confirms no data is sent ... """ return "OK" @assert_description def TSC_RFCOMM_mmi_iut_initiate_RLS_framing_error(self, **kwargs): """ Take action to initiate RLS command with Framing Error status. """ return "OK"
android/pandora/server/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ java_library_static { "grpc-java-lite", "guava", "opencensus-java-api", "kotlin-test", "kotlinx_coroutines", "pandora-grpc-java", "pandora-proto-java", Loading
android/pandora/server/configs/PtsBotTest.xml +1 −0 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ <option name="profile" value="HID/HOS" /> <option name="profile" value="HOGP" /> <option name="profile" value="L2CAP/LE" /> <option name="profile" value="RFCOMM" /> <option name="profile" value="SDP" /> <option name="profile" value="SM" /> </test> Loading
android/pandora/server/configs/pts_bot_tests_config.json +21 −2 Original line number Diff line number Diff line Loading @@ -295,6 +295,20 @@ "L2CAP/LE/CPU/BI-02-C", "L2CAP/LE/CPU/BV-02-C", "L2CAP/LE/REJ/BI-01-C", "RFCOMM/DEVA/RFC/BV-01-C", "RFCOMM/DEVB/RFC/BV-02-C", "RFCOMM/DEVA-DEVB/RFC/BV-03-C", "RFCOMM/DEVA-DEVB/RFC/BV-04-C", "RFCOMM/DEVA/RFC/BV-05-C", "RFCOMM/DEVB/RFC/BV-06-C", "RFCOMM/DEVA-DEVB/RFC/BV-07-C", "RFCOMM/DEVA-DEVB/RFC/BV-08-C", "RFCOMM/DEVA-DEVB/RFC/BV-11-C", "RFCOMM/DEVA-DEVB/RFC/BV-13-C", "RFCOMM/DEVA-DEVB/RFC/BV-15-C", "RFCOMM/DEVA-DEVB/RFC/BV-17-C", "RFCOMM/DEVA-DEVB/RFC/BV-19-C", "RFCOMM/DEVA-DEVB/RFC/BV-25-C,", "SDP/SR/BRW/BV-02-C", "SDP/SR/SA/BI-01-C", "SDP/SR/SA/BI-02-C", Loading Loading @@ -630,6 +644,8 @@ "L2CAP/LE/CID/BV-02-C", "L2CAP/LE/CPU/BV-01-C", "L2CAP/LE/REJ/BI-02-C", "RFCOMM/DEVA-DEVB/RFC/BV-21-C", "RFCOMM/DEVA-DEVB/RFC/BV-22-C", "SM/CEN/PKE/BV-01-C", "SM/CEN/SCCT/BV-03-C", "SM/CEN/SCCT/BV-05-C", Loading Loading @@ -1625,7 +1641,7 @@ "TSPC_SPP_0_2": true, "TSPC_SPP_1_1": true, "TSPC_SPP_1_2": true, "TSPC_SPP_2_1": true, "TSPC_SPP_2_1": false, "TSPC_SPP_2_1b": true, "TSPC_SPP_3_1": true, "TSPC_SPP_3_2": true, Loading Loading @@ -1671,7 +1687,10 @@ "PAN": {}, "PBAP": {}, "PROD": {}, "RFCOMM": {}, "RFCOMM": { "TSPX_server_channel_iut": "7", "TSPX_security_enabled": "FALSE" }, "SAP": {}, "SCPP": {}, "SDP": {}, Loading