Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ccf855e6 authored by Jack He's avatar Jack He
Browse files

Cert: Merge backing process logging code into AsyncSubprocessLogger

* AsyncSubprocessLogger a generic way of logging from subprocess.stdout
  using a separate thread executor to multiple files and stdout

Bug: 156038378
Tag: #gd-refactor
Test: gd/cert/run --host
Change-Id: I8bccb1272f5a38e578c039be12b745fabf83b92d
parent 51d03dde
Loading
Loading
Loading
Loading
+90 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from contextlib import ExitStack
import concurrent.futures
import logging
import subprocess
from cert.os_utils import TerminalColor


class AsyncSubprocessLogger:
    """
    An asynchronous logger for subprocesses.Popen object's STDOUT

    Contains threading functionality that allows asynchronous handling of lines
    from STDOUT from subprocess.Popen
    """
    WAIT_TIMEOUT_SECONDS = 10

    def __init__(self,
                 process: subprocess.Popen,
                 log_file_paths,
                 log_to_stdout=False,
                 tag=None,
                 color: TerminalColor = None):
        """
        :param process: a subprocess.Popen object with STDOUT
        :param log_file_paths: list of log files to redirect log to
        :param log_to_stdout: whether to dump logs to stdout in the format of
                              "[tag] logline"
        :param tag: tag to be used in above format
        :param color: when dumping to stdout, what color to use for tag
        """
        if not process:
            raise ValueError("process cannot be None")
        if not process.stdout:
            raise ValueError("process.stdout cannot be None")
        if log_to_stdout:
            if not tag or type(tag) is not str:
                raise ValueError("When logging to stdout, log tag must be set")
        self.log_file_paths = log_file_paths
        self.log_to_stdout = log_to_stdout
        self.tag = tag
        self.color = color
        self.process = process
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
        self.future = self.executor.submit(self.__logging_loop)

    def stop(self):
        """
        Stop this logger and this object can no longer be used after this call
        """
        try:
            result = self.future.result(timeout=self.WAIT_TIMEOUT_SECONDS)
            if result:
                logging.error(
                    "logging thread produced an error when executing: %s" %
                    str(result))
        except concurrent.futures.TimeoutError:
            logging.error("logging thread failed to finish on time")
        self.executor.shutdown(wait=False)

    def __logging_loop(self):
        with ExitStack() as stack:
            log_files = [
                stack.enter_context(open(file_path, 'w'))
                for file_path in self.log_file_paths
            ]
            for line in self.process.stdout:
                for log_file in log_files:
                    log_file.write(line)
                if self.log_to_stdout:
                    if self.color:
                        print("[%s%s%s] %s" % (self.color, self.tag,
                                               TerminalColor.END, line.strip()))
                    else:
                        print("[%s] %s" % (self.tag, line.strip()))
+7 −24
Original line number Diff line number Diff line
@@ -14,7 +14,6 @@
#   See the License for the specific language governing permissions and
#   limitations under the License.

import concurrent.futures
import importlib
import logging
import os
@@ -29,6 +28,7 @@ from acts import asserts, signals
from acts.context import get_current_context
from acts.base_test import BaseTestClass

from cert.async_subprocess_logger import AsyncSubprocessLogger
from cert.os_utils import get_gd_root
from cert.os_utils import read_crash_snippet_and_log_tail
from cert.os_utils import is_subprocess_alive
@@ -94,10 +94,11 @@ class GdBaseTestClass(BaseTestClass):
                is_subprocess_alive(self.rootcanal_process),
                msg="root-canal stopped immediately after running")

            self.rootcanal_logging_executor = concurrent.futures.ThreadPoolExecutor(
                max_workers=1)
            self.rootcanal_logging_future = self.rootcanal_logging_executor.submit(
                self.__rootcanal_logging_loop)
            self.rootcanal_logger = AsyncSubprocessLogger(
                self.rootcanal_process, [self.rootcanal_logpath],
                log_to_stdout=self.verbose_mode,
                tag="root_canal",
                color=TerminalColor.MAGENTA)

            # Modify the device config to include the correct root-canal port
            for gd_device_config in self.controller_configs.get("GdDevice"):
@@ -109,14 +110,6 @@ class GdBaseTestClass(BaseTestClass):
        self.dut = self.gd_devices[1]
        self.cert = self.gd_devices[0]

    def __rootcanal_logging_loop(self):
        with open(self.rootcanal_logpath, 'w') as rootcanal_log:
            for line in self.rootcanal_process.stdout:
                rootcanal_log.write(line)
                if self.verbose_mode:
                    print("[%s%s%s] %s" % (TerminalColor.MAGENTA, "root_canal",
                                           TerminalColor.END, line.strip()))

    def teardown_class(self):
        if self.rootcanal_running:
            stop_signal = signal.SIGINT
@@ -138,17 +131,7 @@ class GdBaseTestClass(BaseTestClass):
                    return_code = -65536
            if return_code != 0 and return_code != -stop_signal:
                logging.error("rootcanal stopped with code: %d" % return_code)
            try:
                result = self.rootcanal_logging_future.result(
                    timeout=self.SUBPROCESS_WAIT_TIMEOUT_SECONDS)
                if result:
                    logging.error(
                        "rootcanal process logging thread produced an error when executing: %s"
                        % str(result))
            except concurrent.futures.TimeoutError:
                logging.error(
                    "rootcanal process logging thread failed to finish on time")
            self.rootcanal_logging_executor.shutdown(wait=False)
            self.rootcanal_logger.stop()

    def setup_test(self):
        self.dut.rootservice.StartStack(
+7 −24
Original line number Diff line number Diff line
@@ -15,7 +15,6 @@
#   limitations under the License.

from abc import ABC
import concurrent.futures
import inspect
import logging
import os
@@ -36,6 +35,7 @@ from acts.controllers.adb import AdbError

from google.protobuf import empty_pb2 as empty_proto

from cert.async_subprocess_logger import AsyncSubprocessLogger
from cert.os_utils import get_gd_root
from cert.os_utils import read_crash_snippet_and_log_tail
from cert.os_utils import is_subprocess_alive
@@ -184,14 +184,6 @@ class GdDeviceBase(ABC):
        else:
            self.terminal_color = TerminalColor.YELLOW

    def __backing_process_logging_loop(self):
        with open(self.backing_process_log_path, 'w') as backing_process_log:
            for line in self.backing_process.stdout:
                backing_process_log.write(line)
                if self.verbose_mode:
                    print("[%s%s%s] %s" % (self.terminal_color, self.label,
                                           TerminalColor.END, line.strip()))

    def setup(self):
        """Set up this device for test, must run before using this device
        - After calling this, teardown() must be called when test finishes
@@ -229,10 +221,11 @@ class GdDeviceBase(ABC):
            # Wait for process to be ready
            signal_socket.accept()

        self.backing_process_logging_executor = concurrent.futures.ThreadPoolExecutor(
            max_workers=1)
        self.backing_process_logging_future = self.backing_process_logging_executor.submit(
            self.__backing_process_logging_loop)
        self.backing_process_logger = AsyncSubprocessLogger(
            self.backing_process, [self.backing_process_log_path],
            log_to_stdout=self.verbose_mode,
            tag=self.label,
            color=self.terminal_color)

        # Setup gRPC management channels
        self.grpc_root_server_channel = grpc.insecure_channel(
@@ -304,17 +297,7 @@ class GdDeviceBase(ABC):
        if return_code not in [-stop_signal, 0]:
            logging.error("backing process %s stopped with code: %d" %
                          (self.label, return_code))
        try:
            result = self.backing_process_logging_future.result(
                timeout=self.WAIT_CHANNEL_READY_TIMEOUT_SECONDS)
            if result:
                logging.error(
                    "backing process logging thread produced an error when executing: %s"
                    % str(result))
        except concurrent.futures.TimeoutError:
            logging.error(
                "backing process logging thread failed to finish on time")
        self.backing_process_logging_executor.shutdown(wait=False)
        self.backing_process_logger.stop()

    def wait_channel_ready(self):
        future = grpc.channel_ready_future(self.grpc_channel)