Loading system/gd/cert/async_subprocess_logger.py 0 → 100644 +90 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from contextlib import ExitStack import concurrent.futures import logging import subprocess from cert.os_utils import TerminalColor class AsyncSubprocessLogger: """ An asynchronous logger for subprocesses.Popen object's STDOUT Contains threading functionality that allows asynchronous handling of lines from STDOUT from subprocess.Popen """ WAIT_TIMEOUT_SECONDS = 10 def __init__(self, process: subprocess.Popen, log_file_paths, log_to_stdout=False, tag=None, color: TerminalColor = None): """ :param process: a subprocess.Popen object with STDOUT :param log_file_paths: list of log files to redirect log to :param log_to_stdout: whether to dump logs to stdout in the format of "[tag] logline" :param tag: tag to be used in above format :param color: when dumping to stdout, what color to use for tag """ if not process: raise ValueError("process cannot be None") if not process.stdout: raise ValueError("process.stdout cannot be None") if log_to_stdout: if not tag or type(tag) is not str: raise ValueError("When logging to stdout, log tag must be set") self.log_file_paths = log_file_paths self.log_to_stdout = log_to_stdout self.tag = tag self.color = color self.process = process self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) self.future = self.executor.submit(self.__logging_loop) def stop(self): """ Stop this logger and this object can no longer be used after this call """ try: result = self.future.result(timeout=self.WAIT_TIMEOUT_SECONDS) if result: logging.error( "logging thread produced an error when executing: %s" % str(result)) except concurrent.futures.TimeoutError: logging.error("logging thread failed to finish on time") self.executor.shutdown(wait=False) def __logging_loop(self): with ExitStack() as stack: log_files = [ stack.enter_context(open(file_path, 'w')) for file_path in self.log_file_paths ] for line in self.process.stdout: for log_file in log_files: log_file.write(line) if self.log_to_stdout: if self.color: print("[%s%s%s] %s" % (self.color, self.tag, TerminalColor.END, line.strip())) else: print("[%s] %s" % (self.tag, line.strip())) system/gd/cert/gd_base_test.py +7 −24 Original line number Diff line number Diff line Loading @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import concurrent.futures import importlib import logging import os Loading @@ -29,6 +28,7 @@ from acts import asserts, signals from acts.context import get_current_context from acts.base_test import BaseTestClass from cert.async_subprocess_logger import AsyncSubprocessLogger from cert.os_utils import get_gd_root from cert.os_utils import read_crash_snippet_and_log_tail from cert.os_utils import is_subprocess_alive Loading Loading @@ -94,10 +94,11 @@ class GdBaseTestClass(BaseTestClass): is_subprocess_alive(self.rootcanal_process), msg="root-canal stopped immediately after running") self.rootcanal_logging_executor = concurrent.futures.ThreadPoolExecutor( max_workers=1) self.rootcanal_logging_future = self.rootcanal_logging_executor.submit( self.__rootcanal_logging_loop) self.rootcanal_logger = AsyncSubprocessLogger( self.rootcanal_process, [self.rootcanal_logpath], log_to_stdout=self.verbose_mode, tag="root_canal", color=TerminalColor.MAGENTA) # Modify the device config to include the correct root-canal port for gd_device_config in self.controller_configs.get("GdDevice"): Loading @@ -109,14 +110,6 @@ class GdBaseTestClass(BaseTestClass): self.dut = self.gd_devices[1] self.cert = self.gd_devices[0] def __rootcanal_logging_loop(self): with open(self.rootcanal_logpath, 'w') as rootcanal_log: for line in self.rootcanal_process.stdout: rootcanal_log.write(line) if self.verbose_mode: print("[%s%s%s] %s" % (TerminalColor.MAGENTA, "root_canal", TerminalColor.END, line.strip())) def teardown_class(self): if self.rootcanal_running: stop_signal = signal.SIGINT Loading @@ -138,17 +131,7 @@ class GdBaseTestClass(BaseTestClass): return_code = -65536 if return_code != 0 and return_code != -stop_signal: logging.error("rootcanal stopped with code: %d" % return_code) try: result = self.rootcanal_logging_future.result( timeout=self.SUBPROCESS_WAIT_TIMEOUT_SECONDS) if result: logging.error( "rootcanal process logging thread produced an error when executing: %s" % str(result)) except concurrent.futures.TimeoutError: logging.error( "rootcanal process logging thread failed to finish on time") self.rootcanal_logging_executor.shutdown(wait=False) self.rootcanal_logger.stop() def setup_test(self): self.dut.rootservice.StartStack( Loading system/gd/cert/gd_device.py +7 −24 Original line number Diff line number Diff line Loading @@ -15,7 +15,6 @@ # limitations under the License. from abc import ABC import concurrent.futures import inspect import logging import os Loading @@ -36,6 +35,7 @@ from acts.controllers.adb import AdbError from google.protobuf import empty_pb2 as empty_proto from cert.async_subprocess_logger import AsyncSubprocessLogger from cert.os_utils import get_gd_root from cert.os_utils import read_crash_snippet_and_log_tail from cert.os_utils import is_subprocess_alive Loading Loading @@ -184,14 +184,6 @@ class GdDeviceBase(ABC): else: self.terminal_color = TerminalColor.YELLOW def __backing_process_logging_loop(self): with open(self.backing_process_log_path, 'w') as backing_process_log: for line in self.backing_process.stdout: backing_process_log.write(line) if self.verbose_mode: print("[%s%s%s] %s" % (self.terminal_color, self.label, TerminalColor.END, line.strip())) def setup(self): """Set up this device for test, must run before using this device - After calling this, teardown() must be called when test finishes Loading Loading @@ -229,10 +221,11 @@ class GdDeviceBase(ABC): # Wait for process to be ready signal_socket.accept() self.backing_process_logging_executor = concurrent.futures.ThreadPoolExecutor( max_workers=1) self.backing_process_logging_future = self.backing_process_logging_executor.submit( self.__backing_process_logging_loop) self.backing_process_logger = AsyncSubprocessLogger( self.backing_process, [self.backing_process_log_path], log_to_stdout=self.verbose_mode, tag=self.label, color=self.terminal_color) # Setup gRPC management channels self.grpc_root_server_channel = grpc.insecure_channel( Loading Loading @@ -304,17 +297,7 @@ class GdDeviceBase(ABC): if return_code not in [-stop_signal, 0]: logging.error("backing process %s stopped with code: %d" % (self.label, return_code)) try: result = self.backing_process_logging_future.result( timeout=self.WAIT_CHANNEL_READY_TIMEOUT_SECONDS) if result: logging.error( "backing process logging thread produced an error when executing: %s" % str(result)) except concurrent.futures.TimeoutError: logging.error( "backing process logging thread failed to finish on time") self.backing_process_logging_executor.shutdown(wait=False) self.backing_process_logger.stop() def wait_channel_ready(self): future = grpc.channel_ready_future(self.grpc_channel) Loading Loading
system/gd/cert/async_subprocess_logger.py 0 → 100644 +90 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from contextlib import ExitStack import concurrent.futures import logging import subprocess from cert.os_utils import TerminalColor class AsyncSubprocessLogger: """ An asynchronous logger for subprocesses.Popen object's STDOUT Contains threading functionality that allows asynchronous handling of lines from STDOUT from subprocess.Popen """ WAIT_TIMEOUT_SECONDS = 10 def __init__(self, process: subprocess.Popen, log_file_paths, log_to_stdout=False, tag=None, color: TerminalColor = None): """ :param process: a subprocess.Popen object with STDOUT :param log_file_paths: list of log files to redirect log to :param log_to_stdout: whether to dump logs to stdout in the format of "[tag] logline" :param tag: tag to be used in above format :param color: when dumping to stdout, what color to use for tag """ if not process: raise ValueError("process cannot be None") if not process.stdout: raise ValueError("process.stdout cannot be None") if log_to_stdout: if not tag or type(tag) is not str: raise ValueError("When logging to stdout, log tag must be set") self.log_file_paths = log_file_paths self.log_to_stdout = log_to_stdout self.tag = tag self.color = color self.process = process self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) self.future = self.executor.submit(self.__logging_loop) def stop(self): """ Stop this logger and this object can no longer be used after this call """ try: result = self.future.result(timeout=self.WAIT_TIMEOUT_SECONDS) if result: logging.error( "logging thread produced an error when executing: %s" % str(result)) except concurrent.futures.TimeoutError: logging.error("logging thread failed to finish on time") self.executor.shutdown(wait=False) def __logging_loop(self): with ExitStack() as stack: log_files = [ stack.enter_context(open(file_path, 'w')) for file_path in self.log_file_paths ] for line in self.process.stdout: for log_file in log_files: log_file.write(line) if self.log_to_stdout: if self.color: print("[%s%s%s] %s" % (self.color, self.tag, TerminalColor.END, line.strip())) else: print("[%s] %s" % (self.tag, line.strip()))
system/gd/cert/gd_base_test.py +7 −24 Original line number Diff line number Diff line Loading @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import concurrent.futures import importlib import logging import os Loading @@ -29,6 +28,7 @@ from acts import asserts, signals from acts.context import get_current_context from acts.base_test import BaseTestClass from cert.async_subprocess_logger import AsyncSubprocessLogger from cert.os_utils import get_gd_root from cert.os_utils import read_crash_snippet_and_log_tail from cert.os_utils import is_subprocess_alive Loading Loading @@ -94,10 +94,11 @@ class GdBaseTestClass(BaseTestClass): is_subprocess_alive(self.rootcanal_process), msg="root-canal stopped immediately after running") self.rootcanal_logging_executor = concurrent.futures.ThreadPoolExecutor( max_workers=1) self.rootcanal_logging_future = self.rootcanal_logging_executor.submit( self.__rootcanal_logging_loop) self.rootcanal_logger = AsyncSubprocessLogger( self.rootcanal_process, [self.rootcanal_logpath], log_to_stdout=self.verbose_mode, tag="root_canal", color=TerminalColor.MAGENTA) # Modify the device config to include the correct root-canal port for gd_device_config in self.controller_configs.get("GdDevice"): Loading @@ -109,14 +110,6 @@ class GdBaseTestClass(BaseTestClass): self.dut = self.gd_devices[1] self.cert = self.gd_devices[0] def __rootcanal_logging_loop(self): with open(self.rootcanal_logpath, 'w') as rootcanal_log: for line in self.rootcanal_process.stdout: rootcanal_log.write(line) if self.verbose_mode: print("[%s%s%s] %s" % (TerminalColor.MAGENTA, "root_canal", TerminalColor.END, line.strip())) def teardown_class(self): if self.rootcanal_running: stop_signal = signal.SIGINT Loading @@ -138,17 +131,7 @@ class GdBaseTestClass(BaseTestClass): return_code = -65536 if return_code != 0 and return_code != -stop_signal: logging.error("rootcanal stopped with code: %d" % return_code) try: result = self.rootcanal_logging_future.result( timeout=self.SUBPROCESS_WAIT_TIMEOUT_SECONDS) if result: logging.error( "rootcanal process logging thread produced an error when executing: %s" % str(result)) except concurrent.futures.TimeoutError: logging.error( "rootcanal process logging thread failed to finish on time") self.rootcanal_logging_executor.shutdown(wait=False) self.rootcanal_logger.stop() def setup_test(self): self.dut.rootservice.StartStack( Loading
system/gd/cert/gd_device.py +7 −24 Original line number Diff line number Diff line Loading @@ -15,7 +15,6 @@ # limitations under the License. from abc import ABC import concurrent.futures import inspect import logging import os Loading @@ -36,6 +35,7 @@ from acts.controllers.adb import AdbError from google.protobuf import empty_pb2 as empty_proto from cert.async_subprocess_logger import AsyncSubprocessLogger from cert.os_utils import get_gd_root from cert.os_utils import read_crash_snippet_and_log_tail from cert.os_utils import is_subprocess_alive Loading Loading @@ -184,14 +184,6 @@ class GdDeviceBase(ABC): else: self.terminal_color = TerminalColor.YELLOW def __backing_process_logging_loop(self): with open(self.backing_process_log_path, 'w') as backing_process_log: for line in self.backing_process.stdout: backing_process_log.write(line) if self.verbose_mode: print("[%s%s%s] %s" % (self.terminal_color, self.label, TerminalColor.END, line.strip())) def setup(self): """Set up this device for test, must run before using this device - After calling this, teardown() must be called when test finishes Loading Loading @@ -229,10 +221,11 @@ class GdDeviceBase(ABC): # Wait for process to be ready signal_socket.accept() self.backing_process_logging_executor = concurrent.futures.ThreadPoolExecutor( max_workers=1) self.backing_process_logging_future = self.backing_process_logging_executor.submit( self.__backing_process_logging_loop) self.backing_process_logger = AsyncSubprocessLogger( self.backing_process, [self.backing_process_log_path], log_to_stdout=self.verbose_mode, tag=self.label, color=self.terminal_color) # Setup gRPC management channels self.grpc_root_server_channel = grpc.insecure_channel( Loading Loading @@ -304,17 +297,7 @@ class GdDeviceBase(ABC): if return_code not in [-stop_signal, 0]: logging.error("backing process %s stopped with code: %d" % (self.label, return_code)) try: result = self.backing_process_logging_future.result( timeout=self.WAIT_CHANNEL_READY_TIMEOUT_SECONDS) if result: logging.error( "backing process logging thread produced an error when executing: %s" % str(result)) except concurrent.futures.TimeoutError: logging.error( "backing process logging thread failed to finish on time") self.backing_process_logging_executor.shutdown(wait=False) self.backing_process_logger.stop() def wait_channel_ready(self): future = grpc.channel_ready_future(self.grpc_channel) Loading