Loading android/pandora/mmi2grpc/mmi2grpc/__init__.py +13 −8 Original line number Diff line number Diff line Loading @@ -56,27 +56,32 @@ class IUT: # Note: we don't keep a single gRPC channel instance in the IUT class # because reset is allowed to close the gRPC server. with grpc.insecure_channel(f'localhost:{self.port}') as channel: Host(channel).Reset(wait_for_ready=True) self._retry(Host(channel).Reset)(wait_for_ready=True) def __exit__(self, exc_type, exc_value, exc_traceback): self._a2dp = None @property def address(self) -> bytes: """Bluetooth MAC address of the IUT.""" with grpc.insecure_channel(f'localhost:{self.port}') as channel: def _retry(self, func): def wrapper(*args, **kwargs): tries = 0 while True: try: return Host(channel).ReadLocalAddress( wait_for_ready=True).address return func(*args, **kwargs) except grpc.RpcError or grpc._channel._InactiveRpcError: tries += 1 if tries >= MAX_RETRIES: raise else: print('Retry', tries, 'of', MAX_RETRIES) print(f'Retry {func.__name__}: {tries}/{MAX_RETRIES}') time.sleep(1) return wrapper @property def address(self) -> bytes: """Bluetooth MAC address of the IUT.""" with grpc.insecure_channel(f'localhost:{self.port}') as channel: return self._retry( Host(channel).ReadLocalAddress)(wait_for_ready=True).address def interact(self, pts_address: bytes, Loading Loading
android/pandora/mmi2grpc/mmi2grpc/__init__.py +13 −8 Original line number Diff line number Diff line Loading @@ -56,27 +56,32 @@ class IUT: # Note: we don't keep a single gRPC channel instance in the IUT class # because reset is allowed to close the gRPC server. with grpc.insecure_channel(f'localhost:{self.port}') as channel: Host(channel).Reset(wait_for_ready=True) self._retry(Host(channel).Reset)(wait_for_ready=True) def __exit__(self, exc_type, exc_value, exc_traceback): self._a2dp = None @property def address(self) -> bytes: """Bluetooth MAC address of the IUT.""" with grpc.insecure_channel(f'localhost:{self.port}') as channel: def _retry(self, func): def wrapper(*args, **kwargs): tries = 0 while True: try: return Host(channel).ReadLocalAddress( wait_for_ready=True).address return func(*args, **kwargs) except grpc.RpcError or grpc._channel._InactiveRpcError: tries += 1 if tries >= MAX_RETRIES: raise else: print('Retry', tries, 'of', MAX_RETRIES) print(f'Retry {func.__name__}: {tries}/{MAX_RETRIES}') time.sleep(1) return wrapper @property def address(self) -> bytes: """Bluetooth MAC address of the IUT.""" with grpc.insecure_channel(f'localhost:{self.port}') as channel: return self._retry( Host(channel).ReadLocalAddress)(wait_for_ready=True).address def interact(self, pts_address: bytes, Loading