Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit de81f530 authored by Jizheng Chu's avatar Jizheng Chu Committed by Gerrit Code Review
Browse files

Merge "Unbind gd_device.py from ACTS"

parents 7147ca16 2ff953b5
Loading
Loading
Loading
Loading
+34 −225
Original line number Diff line number Diff line
@@ -36,9 +36,15 @@ from acts.controllers.adb import AdbProxy
from acts.controllers.adb import AdbError
from acts.controllers.adb_lib.error import AdbCommandError

from google.protobuf import empty_pb2 as empty_proto

from cert.async_subprocess_logger import AsyncSubprocessLogger
from cert.gd_device_lib import create_core
from cert.gd_device_lib import destroy_core
from cert.gd_device_lib import get_info
from cert.gd_device_lib import replace_vars
from cert.gd_device_lib import GdDeviceBaseCore
from cert.gd_device_lib import GdHostOnlyDeviceCore
from cert.gd_device_lib import MOBLY_CONTROLLER_CONFIG_NAME
from cert.gd_device_lib import ACTS_CONTROLLER_REFERENCE_NAME
from cert.logging_client_interceptor import LoggingClientInterceptor
from cert.os_utils import get_gd_root
from cert.os_utils import read_crash_snippet_and_log_tail
@@ -46,43 +52,15 @@ from cert.os_utils import is_subprocess_alive
from cert.os_utils import make_ports_available
from cert.os_utils import TerminalColor
from facade import rootservice_pb2_grpc as facade_rootservice_pb2_grpc
from hal import hal_facade_pb2_grpc
from hci.facade import hci_facade_pb2_grpc
from hci.facade import acl_manager_facade_pb2_grpc
from hci.facade import controller_facade_pb2_grpc
from hci.facade import le_acl_manager_facade_pb2_grpc
from hci.facade import le_advertising_manager_facade_pb2_grpc
from hci.facade import le_initiator_address_facade_pb2_grpc
from hci.facade import le_scanning_manager_facade_pb2_grpc
from l2cap.classic import facade_pb2_grpc as l2cap_facade_pb2_grpc
from l2cap.le import facade_pb2_grpc as l2cap_le_facade_pb2_grpc
from iso import facade_pb2_grpc as iso_facade_pb2_grpc
from neighbor.facade import facade_pb2_grpc as neighbor_facade_pb2_grpc
from security import facade_pb2_grpc as security_facade_pb2_grpc
from shim.facade import facade_pb2_grpc as shim_facade_pb2_grpc

MOBLY_CONTROLLER_CONFIG_NAME = "GdDevice"
ACTS_CONTROLLER_REFERENCE_NAME = "gd_devices"


def create(configs):
    if not configs:
        raise Exception("Configuration is empty")
    elif not isinstance(configs, list):
        raise Exception("Configuration should be a list")
    create_core(configs)
    return get_instances_with_configs(configs)


def destroy(devices):
    for device in devices:
        try:
            device.teardown()
        except:
            logging.exception("[%s] Failed to clean up properly due to" % device.label)


def get_info(devices):
    return []
    destroy_core(devices)


def get_instances_with_configs(configs):
@@ -107,24 +85,7 @@ def get_instances_with_configs(configs):
    return devices


def replace_vars(string, config):
    serial_number = config.get("serial_number")
    if serial_number is None:
        serial_number = ""
    rootcanal_port = config.get("rootcanal_port")
    if rootcanal_port is None:
        rootcanal_port = ""
    if serial_number == "DUT" or serial_number == "CERT":
        raise Exception("Did you forget to configure the serial number?")
    return string.replace("$GD_ROOT", get_gd_root()) \
                 .replace("$(grpc_port)", config.get("grpc_port")) \
                 .replace("$(grpc_root_server_port)", config.get("grpc_root_server_port")) \
                 .replace("$(rootcanal_port)", rootcanal_port) \
                 .replace("$(signal_port)", config.get("signal_port")) \
                 .replace("$(serial_number)", serial_number)


class GdDeviceBase(ABC):
class GdDeviceBase(GdDeviceBaseCore):
    """
    Base GD device class that covers common traits which assumes that the
    device must be driven by a driver-like backing process that takes following
@@ -136,12 +97,10 @@ class GdDeviceBase(ABC):
    --rootcanal-port: root-canal HCI port, optional
    """

    WAIT_CHANNEL_READY_TIMEOUT_SECONDS = 10

    def __init__(self, grpc_port: str, grpc_root_server_port: str, signal_port: str, cmd: List[str], label: str,
                 type_identifier: str, name: str, verbose_mode: bool):
        """Base GD device, common traits for both device based and host only GD
        cert tests
        """Verify arguments and log path, initialize Base GD device, common traits
         for both device based and host only GD cert tests
        :param grpc_port: main gRPC service port
        :param grpc_root_server_port: gRPC root server port
        :param signal_port: signaling port for backing process start up
@@ -155,139 +114,49 @@ class GdDeviceBase(ABC):
        arguments = [values[arg] for arg in inspect.getfullargspec(GdDeviceBase.__init__).args if arg != "verbose_mode"]
        asserts.assert_true(all(arguments), "All arguments to GdDeviceBase must not be None nor empty")
        asserts.assert_true(all(cmd), "cmd list should not have None nor empty component")
        self.verbose_mode = verbose_mode
        self.grpc_root_server_port = int(grpc_root_server_port)
        self.grpc_port = int(grpc_port)
        self.signal_port = int(signal_port)
        self.name = name
        self.type_identifier = type_identifier
        self.label = label

        # logging.log_path only exists when this is used in an ACTS test run.
        self.log_path_base = get_current_context().get_full_output_path()
        self.test_runner_base_path = \
            get_current_context().get_base_output_path()
        self.backing_process_log_path = os.path.join(self.log_path_base,
                                                     '%s_%s_backing_logs.txt' % (self.type_identifier, self.label))
        if "--btsnoop=" not in " ".join(cmd):
            cmd.append("--btsnoop=%s" % os.path.join(self.log_path_base, '%s_btsnoop_hci.log' % self.label))
        if "--btsnooz=" not in " ".join(cmd):
            cmd.append("--btsnooz=%s" % os.path.join(self.log_path_base, '%s_btsnooz_hci.log' % self.label))
        if "--btconfig=" not in " ".join(cmd):
            cmd.append("--btconfig=%s" % os.path.join(self.log_path_base, '%s_bt_config.conf' % self.label))
        self.cmd = cmd
        self.environment = os.environ.copy()
        if "cert" in self.label:
            self.terminal_color = TerminalColor.BLUE
        else:
            self.terminal_color = TerminalColor.YELLOW

        GdDeviceBaseCore.__init__(self, grpc_port, grpc_root_server_port, signal_port, cmd, label, type_identifier,
                                  name, verbose_mode, self.log_path_base, self.test_runner_base_path)

    def setup(self):
        """Set up this device for test, must run before using this device
        """Inherited setup method from base class to set up this device for test,
        ensure signal port is available and backing process is started and alive,
        must run before using this device
        - After calling this, teardown() must be called when test finishes
        - Should be executed after children classes' setup() methods
        :return:
        """
        GdDeviceBaseCore.setup(self)
        # Ensure signal port is available
        # signal port is the only port that always listen on the host machine
        asserts.assert_true(
            make_ports_available([self.signal_port]), "[%s] Failed to make signal port available" % self.label)
        # Start backing process
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as signal_socket:
            # Setup signaling socket
            signal_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            signal_socket.bind(("localhost", self.signal_port))
            signal_socket.listen(1)
            signal_socket.settimeout(300)  # 5 minute timeout for blocking socket operations

            # Start backing process
            logging.debug("Running %s" % " ".join(self.cmd))
            self.backing_process = subprocess.Popen(
                self.cmd,
                cwd=get_gd_root(),
                env=self.environment,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                universal_newlines=True)
        asserts.assert_true(self.signal_port_available, "[%s] Failed to make signal port available" % self.label)

        # Ensure backing process is started and alive
        asserts.assert_true(self.backing_process, msg="Cannot start backing_process at " + " ".join(self.cmd))
        asserts.assert_true(
                is_subprocess_alive(self.backing_process),
            self.is_backing_process_alive,
            msg="backing_process stopped immediately after running " + " ".join(self.cmd))

            # Wait for process to be ready
            logging.debug("Waiting for backing_process accept.")
            signal_socket.accept()

        self.backing_process_logger = AsyncSubprocessLogger(
            self.backing_process, [self.backing_process_log_path],
            log_to_stdout=self.verbose_mode,
            tag=self.label,
            color=self.terminal_color)

        # Setup gRPC management channels
        self.grpc_root_server_channel = grpc.insecure_channel("localhost:%d" % self.grpc_root_server_port)
        self.grpc_channel = grpc.insecure_channel("localhost:%d" % self.grpc_port)

        if self.verbose_mode:
            self.grpc_channel = grpc.intercept_channel(self.grpc_channel, LoggingClientInterceptor(self.label))

        # Establish services from facades
        self.rootservice = facade_rootservice_pb2_grpc.RootFacadeStub(self.grpc_root_server_channel)
        self.hal = hal_facade_pb2_grpc.HciHalFacadeStub(self.grpc_channel)
        self.controller_read_only_property = facade_rootservice_pb2_grpc.ReadOnlyPropertyStub(self.grpc_channel)
        self.hci = hci_facade_pb2_grpc.HciFacadeStub(self.grpc_channel)
        self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub(self.grpc_channel)
        self.l2cap_le = l2cap_le_facade_pb2_grpc.L2capLeModuleFacadeStub(self.grpc_channel)
        self.iso = iso_facade_pb2_grpc.IsoModuleFacadeStub(self.grpc_channel)
        self.hci_acl_manager = acl_manager_facade_pb2_grpc.AclManagerFacadeStub(self.grpc_channel)
        self.hci_le_acl_manager = le_acl_manager_facade_pb2_grpc.LeAclManagerFacadeStub(self.grpc_channel)
        self.hci_le_initiator_address = le_initiator_address_facade_pb2_grpc.LeInitiatorAddressFacadeStub(
            self.grpc_channel)
        self.hci_controller = controller_facade_pb2_grpc.ControllerFacadeStub(self.grpc_channel)
        self.hci_controller.GetMacAddressSimple = lambda: self.hci_controller.GetMacAddress(empty_proto.Empty()).address
        self.hci_controller.GetLocalNameSimple = lambda: self.hci_controller.GetLocalName(empty_proto.Empty()).name
        self.hci_le_advertising_manager = le_advertising_manager_facade_pb2_grpc.LeAdvertisingManagerFacadeStub(
            self.grpc_channel)
        self.hci_le_scanning_manager = le_scanning_manager_facade_pb2_grpc.LeScanningManagerFacadeStub(
            self.grpc_channel)
        self.neighbor = neighbor_facade_pb2_grpc.NeighborFacadeStub(self.grpc_channel)
        self.security = security_facade_pb2_grpc.SecurityModuleFacadeStub(self.grpc_channel)
        self.shim = shim_facade_pb2_grpc.ShimFacadeStub(self.grpc_channel)

    def get_crash_snippet_and_log_tail(self):
        if is_subprocess_alive(self.backing_process):
            return None, None

        return read_crash_snippet_and_log_tail(self.backing_process_log_path)
        GdDeviceBaseCore.get_crash_snippet_and_log_tail(self)

    def teardown(self):
        """Tear down this device and clean up any resources.
        """Inherited teardown method from base class to tear down this device
        and clean up any resources.
        - Must be called after setup()
        - Should be executed before children classes' teardown()
        :return:
        """
        self.grpc_channel.close()
        self.grpc_root_server_channel.close()
        stop_signal = signal.SIGINT
        self.backing_process.send_signal(stop_signal)
        try:
            return_code = self.backing_process.wait(timeout=self.WAIT_CHANNEL_READY_TIMEOUT_SECONDS)
        except subprocess.TimeoutExpired:
            logging.error("[%s] Failed to interrupt backing process via SIGINT, sending SIGKILL" % self.label)
            stop_signal = signal.SIGKILL
            self.backing_process.kill()
            try:
                return_code = self.backing_process.wait(timeout=self.WAIT_CHANNEL_READY_TIMEOUT_SECONDS)
            except subprocess.TimeoutExpired:
                logging.error("Failed to kill backing process")
                return_code = -65536
        if return_code not in [-stop_signal, 0]:
            logging.error("backing process %s stopped with code: %d" % (self.label, return_code))
        self.backing_process_logger.stop()
        GdDeviceBaseCore.teardown(self)

    def wait_channel_ready(self):
        future = grpc.channel_ready_future(self.grpc_channel)
        try:
            future.result(timeout=self.WAIT_CHANNEL_READY_TIMEOUT_SECONDS)
            GdDeviceBaseCore.wait_channel_ready(self)
        except grpc.FutureTimeoutError:
            asserts.fail("[%s] wait channel ready timeout" % self.label)

@@ -317,68 +186,8 @@ class GdHostOnlyDevice(GdDeviceBase):
        self.generate_coverage_report()

    def generate_coverage_report(self):
        if not self.backing_process_profraw_path.is_file():
            logging.info("[%s] Skip coverage report as there is no profraw file at %s" %
                         (self.label, str(self.backing_process_profraw_path)))
            return
        try:
            if self.backing_process_profraw_path.stat().st_size <= 0:
                logging.info("[%s] Skip coverage report as profraw file is empty at %s" %
                             (self.label, str(self.backing_process_profraw_path)))
                return
        except OSError:
            logging.info("[%s] Skip coverage report as profraw file is inaccessible at %s" %
                         (self.label, str(self.backing_process_profraw_path)))
            return
        llvm_binutils = pathlib.Path(get_gd_root()).joinpath("llvm_binutils").joinpath("bin")
        llvm_profdata = llvm_binutils.joinpath("llvm-profdata")
        if not llvm_profdata.is_file():
            logging.info(
                "[%s] Skip coverage report as llvm-profdata is not found at %s" % (self.label, str(llvm_profdata)))
            return
        llvm_cov = llvm_binutils.joinpath("llvm-cov")
        if not llvm_cov.is_file():
            logging.info("[%s] Skip coverage report as llvm-cov is not found at %s" % (self.label, str(llvm_cov)))
            return
        logging.info("[%s] Generating coverage report" % self.label)
        profdata_path = pathlib.Path(self.test_runner_base_path).joinpath(
            "%s_%s_backing_process_coverage.profdata" % (self.type_identifier, self.label))
        profdata_path_tmp = pathlib.Path(self.test_runner_base_path).joinpath(
            "%s_%s_backing_process_coverage_tmp.profdata" % (self.type_identifier, self.label))
        # Merge with existing profdata if possible
        profdata_cmd = [str(llvm_profdata), "merge", "-sparse", str(self.backing_process_profraw_path)]
        if profdata_path.is_file():
            profdata_cmd.append(str(profdata_path))
        profdata_cmd += ["-o", str(profdata_path_tmp)]
        result = subprocess.run(profdata_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        if result.returncode != 0:
            logging.warning("[%s] Failed to index profdata, cmd result: %r" % (self.label, result))
            profdata_path.unlink(missing_ok=True)
            return
        shutil.move(profdata_path_tmp, profdata_path)
        coverage_result_path = pathlib.Path(self.test_runner_base_path).joinpath(
            "%s_%s_backing_process_coverage.json" % (self.type_identifier, self.label))
        with coverage_result_path.open("w") as coverage_result_file:
            result = subprocess.run(
                [str(llvm_cov), "export", "--format=text", "--instr-profile", profdata_path, self.cmd[0]],
                stderr=subprocess.PIPE,
                stdout=coverage_result_file,
                cwd=os.path.join(get_gd_root()))
        if result.returncode != 0:
            logging.warning("[%s] Failed to generated coverage report, cmd result: %r" % (self.label, result))
            coverage_result_path.unlink(missing_ok=True)
            return
        coverage_summary_path = pathlib.Path(self.test_runner_base_path).joinpath(
            "%s_%s_backing_process_coverage_summary.txt" % (self.type_identifier, self.label))
        with coverage_summary_path.open("w") as coverage_summary_file:
            result = subprocess.run(
                [llvm_cov, "report", "--instr-profile", profdata_path, self.cmd[0]],
                stderr=subprocess.PIPE,
                stdout=coverage_summary_file,
                cwd=os.path.join(get_gd_root()))
        if result.returncode != 0:
            logging.warning("[%s] Failed to generated coverage summary, cmd result: %r" % (self.label, result))
            coverage_summary_path.unlink(missing_ok=True)
        GdHostOnlyDeviceCore.generate_coverage_report(self, self.backing_process_profraw_path, self.label,
                                                      self.test_runner_base_path, self.type_identifier, self.cmd)

    def setup(self):
        # Ensure ports are available
+323 −0

File added.

Preview size limit exceeded, changes collapsed.