Loading android/pandora/mmi2grpc/mmi2grpc/__init__.py +24 −4 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ __version__ = "0.0.1" from threading import Thread from typing import List import time import sys Loading @@ -33,6 +34,7 @@ from pandora.host_grpc import Host GRPC_PORT = 8999 MAX_RETRIES = 10 GRPC_SERVER_INIT_TIMEOUT = 10 # seconds class IUT: Loading Loading @@ -95,16 +97,34 @@ class IUT: @property def address(self) -> bytes: """Bluetooth MAC address of the IUT.""" """Bluetooth MAC address of the IUT. Raises a timeout exception after GRPC_SERVER_INIT_TIMEOUT seconds. """ mut_address = None def read_local_address(): with grpc.insecure_channel(f'localhost:{self.port}') as channel: return self._retry(Host(channel).ReadLocalAddress)(wait_for_ready=True).address nonlocal mut_address mut_address = self._retry( Host(channel).ReadLocalAddress)(wait_for_ready=True).address thread = Thread(target=read_local_address) thread.start() thread.join(timeout=GRPC_SERVER_INIT_TIMEOUT) if not mut_address: raise Exception("Pandora gRPC server timeout") else: return mut_address def interact(self, pts_address: bytes, profile: str, test: str, interaction: str, description: str, style: str, **kwargs) -> str: """Routes MMI calls to corresponding profile proxy. Args: pts_address: Bluetooth MAC addres of the PTS in bytes. pts_address: Bluetooth MAC address of the PTS in bytes. profile: Bluetooth profile. test: PTS test id. interaction: MMI name. Loading Loading
android/pandora/mmi2grpc/mmi2grpc/__init__.py +24 −4 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ __version__ = "0.0.1" from threading import Thread from typing import List import time import sys Loading @@ -33,6 +34,7 @@ from pandora.host_grpc import Host GRPC_PORT = 8999 MAX_RETRIES = 10 GRPC_SERVER_INIT_TIMEOUT = 10 # seconds class IUT: Loading Loading @@ -95,16 +97,34 @@ class IUT: @property def address(self) -> bytes: """Bluetooth MAC address of the IUT.""" """Bluetooth MAC address of the IUT. Raises a timeout exception after GRPC_SERVER_INIT_TIMEOUT seconds. """ mut_address = None def read_local_address(): with grpc.insecure_channel(f'localhost:{self.port}') as channel: return self._retry(Host(channel).ReadLocalAddress)(wait_for_ready=True).address nonlocal mut_address mut_address = self._retry( Host(channel).ReadLocalAddress)(wait_for_ready=True).address thread = Thread(target=read_local_address) thread.start() thread.join(timeout=GRPC_SERVER_INIT_TIMEOUT) if not mut_address: raise Exception("Pandora gRPC server timeout") else: return mut_address def interact(self, pts_address: bytes, profile: str, test: str, interaction: str, description: str, style: str, **kwargs) -> str: """Routes MMI calls to corresponding profile proxy. Args: pts_address: Bluetooth MAC addres of the PTS in bytes. pts_address: Bluetooth MAC address of the PTS in bytes. profile: Bluetooth profile. test: PTS test id. interaction: MMI name. Loading