Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 89881b06 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

Run topshim test with mobly

Add helper classes to be compatible with mobly, so blueberry test server
can run it.

Test: build
Tag: #refactor
Bug: 181590011
Change-Id: I6f73f4ac39fe10510d39882e3e00edf0c0cdb82d
parent b9625dde
Loading
Loading
Loading
Loading
+10 −12
Original line number Diff line number Diff line
@@ -15,20 +15,15 @@
#   limitations under the License.

import asyncio
import unittest
from mobly import test_runner
from rust.topshim.facade import topshim_base_test
from rust.topshim.facade.automation_helper import AdapterAutomationHelper

from topshim_base_test import TopshimBaseTest

class AdapterTest(topshim_base_test.TopshimBaseTest):

class AdapterTest(TopshimBaseTest):

    async def asyncSetUp(self):
        await super().asyncSetUp()
        from automation_helper import AdapterAutomationHelper

        self.dut_adapter = AdapterAutomationHelper()

    async def test_verify_adapter_started(self):
    async def _test_verify_adapter_started(self):
        self.dut_adapter = AdapterAutomationHelper(port=self.dut_port)
        event_loop = asyncio.get_running_loop()
        self.dut_adapter.fetch_events(event_loop)
        self.dut_adapter.pending_future = event_loop.create_future()
@@ -36,6 +31,9 @@ class AdapterTest(TopshimBaseTest):
        await self.dut_adapter.verify_adapter_started()
        self.dut_adapter.event_handler.cancel()

    def test_verify_adapter_started(self):
        asyncio.run(self._test_verify_adapter_started())


if __name__ == "__main__":
    unittest.main()
    test_runner.main()
+8 −21
Original line number Diff line number Diff line
@@ -17,17 +17,17 @@
import asyncio
import grpc

import facade_pb2
import facade_pb2_grpc
from rust.topshim.facade import facade_pb2
from rust.topshim.facade import facade_pb2_grpc


class AdapterAutomationHelper():
    # Timeout for async wait
    DEFAULT_TIMEOUT = 3
    DEFAULT_TIMEOUT = 6
    """Invoke gRPC on topshim for Adapter testing"""

    def __init__(self, url="localhost:8899"):
        self.channel = grpc.aio.insecure_channel(url)
    def __init__(self, port=8999):
        self.channel = grpc.aio.insecure_channel("localhost:%d" % port)
        self.adapter_stub = facade_pb2_grpc.AdapterServiceStub(self.channel)

        self.pending_future = None
@@ -35,8 +35,7 @@ class AdapterAutomationHelper():
    """Start fetching events"""

    def fetch_events(self, async_event_loop):
        self.adapter_event_stream = self.adapter_stub.FetchEvents(
            facade_pb2.FetchEventsRequest(), timeout=AdapterAutomationHelper.DEFAULT_TIMEOUT)
        self.adapter_event_stream = self.adapter_stub.FetchEvents(facade_pb2.FetchEventsRequest())
        self.event_handler = async_event_loop.create_task(self.get_next_event())

    """Enable/disable the stack"""
@@ -65,8 +64,8 @@ class AdapterAutomationHelper():
class A2dpAutomationHelper():
    """Invoke gRPC on topshim for A2DP testing"""

    def __init__(self, url="localhost:8899"):
        self.channel = grpc.insecure_channel(url)
    def __init__(self, port=8999):
        self.channel = grpc.insecure_channel("localhost:%d" % port)
        self.media_stub = facade_pb2_grpc.MediaServiceStub(self.channel)

    """Start A2dp source profile service"""
@@ -83,15 +82,3 @@ class A2dpAutomationHelper():

    def source_connect_to_remote(self, address="11:22:33:44:55:66"):
        self.media_stub.A2dpSourceConnect(facade_pb2.A2dpSourceConnectRequest(address=address))


async def init_adapter():
    adapter = AdapterAutomationHelper()
    await adapter.toggle_stack()


if __name__ == "__main__":
    import asyncio
    asyncio.run(init_adapter())
    a2dp = A2dpAutomationHelper()
    a2dp.start_source()
+175 −18
Original line number Diff line number Diff line
@@ -14,27 +14,184 @@
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
import importlib
import logging
import os
import unittest
import signal
import subprocess

from blueberry.tests.gd.cert.context import get_current_context
from blueberry.tests.gd.cert.tracelogger import TraceLogger
from cert.async_subprocess_logger import AsyncSubprocessLogger
from cert.os_utils import get_gd_root
from cert.os_utils import get_gd_root
from cert.os_utils import read_crash_snippet_and_log_tail
from cert.os_utils import is_subprocess_alive
from cert.os_utils import make_ports_available
from cert.os_utils import TerminalColor
from mobly import asserts
from mobly import base_test

class TopshimBaseTest(unittest.IsolatedAsyncioTestCase):
CONTROLLER_CONFIG_NAME = "GdDevice"

    async def asyncSetUp(self):
        """
        Run aprotoc to generate python proto, and open root-canal and bt_topshim_facade
        """
        assert os.getenv(
            "ANDROID_BUILD_TOP"
        ) is not None, "Currently we only support run with Android tree, with bt_topshim_facade and root-canal built"
        # Run root-canal and DUT process.  STDERR should be saved by the process itself.
        self.root_canal = await asyncio.create_subprocess_exec("root-canal", stderr=asyncio.subprocess.DEVNULL)
        self.dut = await asyncio.create_subprocess_exec("bt_topshim_facade", stderr=asyncio.subprocess.DEVNULL)

        # Wait for gRPC channel to open
        await asyncio.sleep(2)
def setup_test_core(verbose_mode, log_path_base, controller_configs):
    info = {}
    info['controller_configs'] = controller_configs

    async def asyncTearDown(self):
        self.dut.terminate()
        self.root_canal.terminate()
    # Start root-canal if needed
    info['rootcanal_running'] = False
    info['rootcanal_logpath'] = None
    info['rootcanal_process'] = None
    info['rootcanal_logger'] = None
    if 'rootcanal' not in info['controller_configs']:
        return
    info['rootcanal_running'] = True
    # Get root canal binary
    rootcanal = os.path.join(get_gd_root(), "root-canal")
    info['rootcanal'] = rootcanal
    info['rootcanal_exist'] = os.path.isfile(rootcanal)
    if not os.path.isfile(rootcanal):
        return info
    # Get root canal log
    rootcanal_logpath = os.path.join(log_path_base, 'rootcanal_logs.txt')
    info['rootcanal_logpath'] = rootcanal_logpath
    # Make sure ports are available
    rootcanal_config = info['controller_configs']['rootcanal']
    rootcanal_test_port = int(rootcanal_config.get("test_port", "6401"))
    rootcanal_hci_port = int(rootcanal_config.get("hci_port", "6402"))
    rootcanal_link_layer_port = int(rootcanal_config.get("link_layer_port", "6403"))

    info['make_rootcanal_ports_available'] = make_ports_available((rootcanal_test_port, rootcanal_hci_port,
                                                                   rootcanal_link_layer_port))
    if not make_ports_available((rootcanal_test_port, rootcanal_hci_port, rootcanal_link_layer_port)):
        return info

    # Start root canal process
    rootcanal_cmd = [rootcanal, str(rootcanal_test_port), str(rootcanal_hci_port), str(rootcanal_link_layer_port)]
    info['rootcanal_cmd'] = rootcanal_cmd

    rootcanal_process = subprocess.Popen(
        rootcanal_cmd,
        cwd=get_gd_root(),
        env=os.environ.copy(),
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        universal_newlines=True)

    info['rootcanal_process'] = rootcanal_process
    if rootcanal_process:
        info['is_rootcanal_process_started'] = True
    else:
        info['is_rootcanal_process_started'] = False
        return info
    info['is_subprocess_alive'] = is_subprocess_alive(rootcanal_process)
    if not is_subprocess_alive(rootcanal_process):
        info['is_subprocess_alive'] = False
        return info

    info['rootcanal_logger'] = AsyncSubprocessLogger(
        rootcanal_process, [rootcanal_logpath],
        log_to_stdout=verbose_mode,
        tag="rootcanal",
        color=TerminalColor.MAGENTA)

    # Modify the device config to include the correct root-canal port
    for gd_device_config in info['controller_configs'].get("GdDevice"):
        gd_device_config["rootcanal_port"] = str(rootcanal_hci_port)

    return info


def teardown_class_core(rootcanal_running, rootcanal_process, rootcanal_logger, subprocess_wait_timeout_seconds):
    if rootcanal_running:
        stop_signal = signal.SIGINT
        rootcanal_process.send_signal(stop_signal)
        try:
            return_code = rootcanal_process.wait(timeout=subprocess_wait_timeout_seconds)
        except subprocess.TimeoutExpired:
            logging.error("Failed to interrupt root canal via SIGINT, sending SIGKILL")
            stop_signal = signal.SIGKILL
            rootcanal_process.kill()
            try:
                return_code = rootcanal_process.wait(timeout=subprocess_wait_timeout_seconds)
            except subprocess.TimeoutExpired:
                logging.error("Failed to kill root canal")
                return_code = -65536
        if return_code != 0 and return_code != -stop_signal:
            logging.error("rootcanal stopped with code: %d" % return_code)
        rootcanal_logger.stop()


def dump_crashes_core(dut, cert, rootcanal_running, rootcanal_process, rootcanal_logpath):
    dut_crash, dut_log_tail = dut.get_crash_snippet_and_log_tail()
    cert_crash, cert_log_tail = cert.get_crash_snippet_and_log_tail()
    rootcanal_crash = None
    rootcanal_log_tail = None
    if rootcanal_running and not is_subprocess_alive(rootcanal_process):
        rootcanal_crash, roocanal_log_tail = read_crash_snippet_and_log_tail(rootcanal_logpath)

    crash_detail = ""
    if dut_crash or cert_crash or rootcanal_crash:
        if rootcanal_crash:
            crash_detail += "rootcanal crashed:\n\n%s\n\n" % rootcanal_crash
        if dut_crash:
            crash_detail += "dut stack crashed:\n\n%s\n\n" % dut_crash
        if cert_crash:
            crash_detail += "cert stack crashed:\n\n%s\n\n" % cert_crash
    else:
        if rootcanal_log_tail:
            crash_detail += "rootcanal log tail:\n\n%s\n\n" % rootcanal_log_tail
        if dut_log_tail:
            crash_detail += "dut log tail:\n\n%s\n\n" % dut_log_tail
        if cert_log_tail:
            crash_detail += "cert log tail:\n\n%s\n\n" % cert_log_tail

    return crash_detail


class TopshimBaseTest(base_test.BaseTestClass):

    def setup_class(self):
        super().setup_test()
        self.log = TraceLogger(logging.getLogger())
        self.log_path_base = get_current_context().get_full_output_path()
        self.verbose_mode = bool(self.user_params.get('verbose_mode', False))
        for config in self.controller_configs[CONTROLLER_CONFIG_NAME]:
            config['verbose_mode'] = self.verbose_mode

        self.info = setup_test_core(
            verbose_mode=self.verbose_mode,
            log_path_base=self.log_path_base,
            controller_configs=self.controller_configs)
        self.rootcanal_running = self.info['rootcanal_running']
        self.rootcanal_logpath = self.info['rootcanal_logpath']
        self.rootcanal_process = self.info['rootcanal_process']
        self.rootcanal_logger = self.info['rootcanal_logger']

        asserts.assert_true(self.info['rootcanal_exist'], "Root canal does not exist at %s" % self.info['rootcanal'])
        asserts.assert_true(self.info['make_rootcanal_ports_available'], "Failed to make root canal ports available")

        self.log.debug("Running %s" % " ".join(self.info['rootcanal_cmd']))
        asserts.assert_true(
            self.info['is_rootcanal_process_started'], msg="Cannot start root-canal at " + str(self.info['rootcanal']))
        asserts.assert_true(self.info['is_subprocess_alive'], msg="root-canal stopped immediately after running")

        self.controller_configs = self.info['controller_configs']

        controllers = self.register_controller(importlib.import_module('rust.topshim.facade.topshim_device'))
        self.cert_port = controllers[0].grpc_port
        self.dut_port = controllers[1].grpc_port

    def test_empty(self):
        pass

    def teardown_test(self):
        return super().teardown_test()

    def teardown_class(self):
        teardown_class_core(
            rootcanal_running=self.rootcanal_running,
            rootcanal_process=self.rootcanal_process,
            rootcanal_logger=self.rootcanal_logger,
            subprocess_wait_timeout_seconds=1)
+68 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2019 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging

from blueberry.tests.gd.cert.gd_device import GdHostOnlyDevice
from cert.gd_device_lib import create_core
from cert.gd_device_lib import destroy_core
from cert.gd_device_lib import MOBLY_CONTROLLER_CONFIG_NAME
from cert.os_utils import get_gd_root


def create(configs):
    create_core(configs)
    return get_instances_with_configs(configs)


def destroy(devices):
    destroy_core(devices)


def replace_vars_for_topshim(string, config):
    serial_number = config.get("serial_number")
    if serial_number is None:
        serial_number = ""
    rootcanal_port = config.get("rootcanal_port")
    if rootcanal_port is None:
        rootcanal_port = ""
    if serial_number == "DUT" or serial_number == "CERT":
        raise Exception("Did you forget to configure the serial number?")
    # We run bt_topshim_facade instead of bluetooth_stack_with_facade
    return string.replace("$GD_ROOT", get_gd_root()) \
                 .replace("bluetooth_stack_with_facade", "bt_topshim_facade") \
                 .replace("$(grpc_port)", config.get("grpc_port")) \
                 .replace("$(grpc_root_server_port)", config.get("grpc_root_server_port")) \
                 .replace("$(rootcanal_port)", rootcanal_port) \
                 .replace("$(signal_port)", config.get("signal_port")) \
                 .replace("$(serial_number)", serial_number)


def get_instances_with_configs(configs):
    logging.info(configs)
    devices = []
    for config in configs:
        resolved_cmd = []
        for arg in config["cmd"]:
            logging.debug(arg)
            resolved_cmd.append(replace_vars_for_topshim(arg, config))
        verbose_mode = bool(config.get('verbose_mode', False))
        device = GdHostOnlyDevice(config["grpc_port"], config["grpc_root_server_port"], config["signal_port"],
                                  resolved_cmd, config["label"], MOBLY_CONTROLLER_CONFIG_NAME, config["name"],
                                  verbose_mode)
        device.setup()
        devices.append(device)
    return devices