Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 042c8b1c authored by Martin Brabham's avatar Martin Brabham
Browse files

Topshim: Migrate to new location

Moving the topshim tests to a different directory.

Make topshim use mobly test runner.

Using `gd/cert/run` script to run the tests.

system/gd/cert/run_topshim is replaced by this commit.

Fix non clear target to not use dist which adds extra build overhead
time.

Heavily rely on async paradigm for topshim testing.

Bug: 230373381
Test: gd/cert/run --clean --topshim
Tag: #stability
Change-Id: I05e21ab8c4716dcb58570f2583c25915753cd6b0
parent 86c1a1d3
Loading
Loading
Loading
Loading
+33 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio

from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.topshim.lib.topshim_base_test import TopshimBaseTest
from blueberry.tests.topshim.lib.adapter_client import AdapterClient

from mobly import test_runner


class AdapterTest(TopshimBaseTest):

    def test_verify_adapter_started(self):
        print("Adapter is verified when test starts")


if __name__ == "__main__":
    test_runner.main()
+130 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2021 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
import grpc

from blueberry.facade.topshim import facade_pb2
from blueberry.facade.topshim import facade_pb2_grpc

from google.protobuf import empty_pb2 as empty_proto


class AdapterClient():
    """
    Wrapper gRPC interface to the Topshim/BTIF layer
    """
    # Timeout for async wait
    DEFAULT_TIMEOUT = 2
    __task_list = []
    __channel = None
    __adapter_stub = None
    __adapter_event_stream = None

    def __init__(self, port=8999):
        self.__channel = grpc.aio.insecure_channel("localhost:%d" % port)
        self.__adapter_stub = facade_pb2_grpc.AdapterServiceStub(self.__channel)
        self.__adapter_event_stream = self.__adapter_stub.FetchEvents(facade_pb2.FetchEventsRequest())

    async def terminate(self):
        for task in self.__task_list:
            task.cancel()
            task = None
        self.__task_list.clear()
        await self.__channel.close()

    async def __get_next_event(self, event, future):
        """Get the future of next event from the stream"""
        while True:
            e = await self.__adapter_event_stream.read()

            # Match event by some condition.
            if e.event_type == event:
                future.set_result(e.data)
                break
            else:
                print("Got '%s'; expecting '%s'" % (e.event_type, event))
                print(e)

    async def _listen_for_event(self, event):
        """Start fetching events"""
        future = asyncio.get_running_loop().create_future()
        self.__task_list.append(asyncio.get_running_loop().create_task(self.__get_next_event(event, future)))
        await asyncio.wait_for(future, AdapterClient.DEFAULT_TIMEOUT)
        return future

    async def _verify_adapter_started(self):
        future = await self._listen_for_event(facade_pb2.EventType.ADAPTER_STATE)
        return future.result() == "ON"

    async def toggle_stack(self, is_start=True):
        """Enable/disable the stack"""
        await self.__adapter_stub.ToggleStack(facade_pb2.ToggleStackRequest(start_stack=is_start))
        return await self._verify_adapter_started()

    async def set_enable_page_scan(self):
        """Enable page scan (might be used for A2dp sink to be discoverable)"""
        await self.__adapter_stub.SetDiscoveryMode(facade_pb2.SetDiscoveryModeRequest(enable_page_scan=True))

    async def clear_event_filter(self):
        await self.__adapter_stub.ClearEventFilter(empty_proto.Empty())

    async def clear_event_mask(self):
        await self.__adapter_stub.ClearEventMask(empty_proto.Empty())

    async def clear_filter_accept_list(self):
        await self.__adapter_stub.ClearFilterAcceptList(empty_proto.Empty())

    async def disconnect_all_acls(self):
        await self.__adapter_stub.DisconnectAllAcls(empty_proto.Empty())

    async def le_rand(self):
        await self.__adapter_stub.LeRand(empty_proto.Empty())
        future = await self._listen_for_event(facade_pb2.EventType.LE_RAND)
        #        await asyncio.wait_for(future, AdapterClient.DEFAULT_TIMEOUT)
        return future.result()

    async def restore_filter_accept_list(self):
        await self.__adapter_stub.RestoreFilterAcceptList(empty_proto.Empty())

    async def set_default_event_mask(self):
        await self.__adapter_stub.SetDefaultEventMask(empty_proto.Empty())

    async def set_event_filter_inquiry_result_all_devices(self):
        await self.__adapter_stub.SetEventFilterInquiryResultAllDevices(empty_proto.Empty())


class A2dpAutomationHelper():
    """Invoke gRPC on topshim for A2DP testing"""

    def __init__(self, port=8999):
        self.__channel = grpc.insecure_channel("localhost:%d" % port)
        self.media_stub = facade_pb2_grpc.MediaServiceStub(self.__channel)

    """Start A2dp source profile service"""

    def start_source(self):
        self.media_stub.StartA2dp(facade_pb2.StartA2dpRequest(start_a2dp_source=True))

    """Start A2dp sink profile service"""

    def start_sink(self):
        self.media_stub.StartA2dp(facade_pb2.StartA2dpRequest(start_a2dp_sink=True))

    """Initialize an A2dp connection from source to sink"""

    def source_connect_to_remote(self, address="11:22:33:44:55:66"):
        self.media_stub.A2dpSourceConnect(facade_pb2.A2dpSourceConnectRequest(address=address))
+211 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2021 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
import importlib
import logging
import os
import signal
import subprocess

from blueberry.tests.gd.cert.async_subprocess_logger import AsyncSubprocessLogger
from blueberry.tests.gd.cert.context import get_current_context
from blueberry.tests.gd.cert.os_utils import get_gd_root
from blueberry.tests.gd.cert.os_utils import get_gd_root
from blueberry.tests.gd.cert.os_utils import read_crash_snippet_and_log_tail
from blueberry.tests.gd.cert.os_utils import is_subprocess_alive
from blueberry.tests.gd.cert.os_utils import make_ports_available
from blueberry.tests.gd.cert.os_utils import TerminalColor
from blueberry.tests.gd.cert.tracelogger import TraceLogger
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.topshim.lib.adapter_client import AdapterClient

from mobly import asserts
from mobly import base_test

CONTROLLER_CONFIG_NAME = "GdDevice"


def _setup_class_core(verbose_mode, log_path_base, controller_configs):
    info = {}
    info['controller_configs'] = controller_configs

    # Start root-canal if needed
    info['rootcanal_running'] = False
    info['rootcanal_logpath'] = None
    info['rootcanal_process'] = None
    info['rootcanal_logger'] = None
    if 'rootcanal' not in info['controller_configs']:
        return
    info['rootcanal_running'] = True
    # Get root canal binary
    rootcanal = os.path.join(get_gd_root(), "root-canal")
    info['rootcanal'] = rootcanal
    info['rootcanal_exist'] = os.path.isfile(rootcanal)
    if not os.path.isfile(rootcanal):
        return info
    # Get root canal log
    rootcanal_logpath = os.path.join(log_path_base, 'rootcanal_logs.txt')
    info['rootcanal_logpath'] = rootcanal_logpath
    # Make sure ports are available
    rootcanal_config = info['controller_configs']['rootcanal']
    rootcanal_test_port = int(rootcanal_config.get("test_port", "6401"))
    rootcanal_hci_port = int(rootcanal_config.get("hci_port", "6402"))
    rootcanal_link_layer_port = int(rootcanal_config.get("link_layer_port", "6403"))

    info['make_rootcanal_ports_available'] = make_ports_available((rootcanal_test_port, rootcanal_hci_port,
                                                                   rootcanal_link_layer_port))
    if not make_ports_available((rootcanal_test_port, rootcanal_hci_port, rootcanal_link_layer_port)):
        return info

    # Start root canal process
    rootcanal_cmd = [rootcanal, str(rootcanal_test_port), str(rootcanal_hci_port), str(rootcanal_link_layer_port)]
    info['rootcanal_cmd'] = rootcanal_cmd

    rootcanal_process = subprocess.Popen(
        rootcanal_cmd,
        cwd=get_gd_root(),
        env=os.environ.copy(),
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        universal_newlines=True)

    info['rootcanal_process'] = rootcanal_process
    if rootcanal_process:
        info['is_rootcanal_process_started'] = True
    else:
        info['is_rootcanal_process_started'] = False
        return info
    info['is_subprocess_alive'] = is_subprocess_alive(rootcanal_process)
    if not is_subprocess_alive(rootcanal_process):
        info['is_subprocess_alive'] = False
        return info

    info['rootcanal_logger'] = AsyncSubprocessLogger(
        rootcanal_process, [rootcanal_logpath],
        log_to_stdout=verbose_mode,
        tag="rootcanal",
        color=TerminalColor.MAGENTA)

    # Modify the device config to include the correct root-canal port
    for gd_device_config in info['controller_configs'].get("GdDevice"):
        gd_device_config["rootcanal_port"] = str(rootcanal_hci_port)

    return info


def _teardown_class_core(rootcanal_running, rootcanal_process, rootcanal_logger, subprocess_wait_timeout_seconds):
    if rootcanal_running:
        stop_signal = signal.SIGINT
        rootcanal_process.send_signal(stop_signal)
        try:
            return_code = rootcanal_process.wait(timeout=subprocess_wait_timeout_seconds)
        except subprocess.TimeoutExpired:
            logging.error("Failed to interrupt root canal via SIGINT, sending SIGKILL")
            stop_signal = signal.SIGKILL
            rootcanal_process.kill()
            try:
                return_code = rootcanal_process.wait(timeout=subprocess_wait_timeout_seconds)
            except subprocess.TimeoutExpired:
                logging.error("Failed to kill root canal")
                return_code = -65536
        if return_code != 0 and return_code != -stop_signal:
            logging.error("rootcanal stopped with code: %d" % return_code)
        rootcanal_logger.stop()


def dump_crashes_core(dut, cert, rootcanal_running, rootcanal_process, rootcanal_logpath):
    dut_crash, dut_log_tail = dut.get_crash_snippet_and_log_tail()
    cert_crash, cert_log_tail = cert.get_crash_snippet_and_log_tail()
    rootcanal_crash = None
    rootcanal_log_tail = None
    if rootcanal_running and not is_subprocess_alive(rootcanal_process):
        rootcanal_crash, roocanal_log_tail = read_crash_snippet_and_log_tail(rootcanal_logpath)

    crash_detail = ""
    if dut_crash or cert_crash or rootcanal_crash:
        if rootcanal_crash:
            crash_detail += "rootcanal crashed:\n\n%s\n\n" % rootcanal_crash
        if dut_crash:
            crash_detail += "dut stack crashed:\n\n%s\n\n" % dut_crash
        if cert_crash:
            crash_detail += "cert stack crashed:\n\n%s\n\n" % cert_crash
    else:
        if rootcanal_log_tail:
            crash_detail += "rootcanal log tail:\n\n%s\n\n" % rootcanal_log_tail
        if dut_log_tail:
            crash_detail += "dut log tail:\n\n%s\n\n" % dut_log_tail
        if cert_log_tail:
            crash_detail += "cert log tail:\n\n%s\n\n" % cert_log_tail

    return crash_detail


class TopshimBaseTest(base_test.BaseTestClass):

    dut_adapter = None

    async def _setup_adapter(self):
        self.dut_adapter = AdapterClient(port=self.dut_port)
        started = await self.dut_adapter._verify_adapter_started()
        assertThat(started).isTrue()
        return started

    async def _teardown_adapter(self):
        await self.dut_adapter.terminate()

    def setup_class(self):
        """
        Configure rootcanal and setup test parameters
        """
        self.log = TraceLogger(logging.getLogger())
        self.log_path_base = get_current_context().get_full_output_path()
        self.verbose_mode = bool(self.user_params.get('verbose_mode', False))
        for config in self.controller_configs[CONTROLLER_CONFIG_NAME]:
            config['verbose_mode'] = self.verbose_mode

        self.info = _setup_class_core(
            verbose_mode=self.verbose_mode,
            log_path_base=self.log_path_base,
            controller_configs=self.controller_configs)
        self.rootcanal_running = self.info['rootcanal_running']
        self.rootcanal_logpath = self.info['rootcanal_logpath']
        self.rootcanal_process = self.info['rootcanal_process']
        self.rootcanal_logger = self.info['rootcanal_logger']

        asserts.assert_true(self.info['rootcanal_exist'], "Root canal does not exist at %s" % self.info['rootcanal'])
        asserts.assert_true(self.info['make_rootcanal_ports_available'], "Failed to make root canal ports available")

        self.log.debug("Running %s" % " ".join(self.info['rootcanal_cmd']))
        asserts.assert_true(
            self.info['is_rootcanal_process_started'], msg="Cannot start root-canal at " + str(self.info['rootcanal']))
        asserts.assert_true(self.info['is_subprocess_alive'], msg="root-canal stopped immediately after running")

        self.controller_configs = self.info['controller_configs']

        controllers = self.register_controller(importlib.import_module('blueberry.tests.topshim.lib.topshim_device'))
        self.cert_port = controllers[0].grpc_port
        self.dut_port = controllers[1].grpc_port
        asyncio.set_event_loop(asyncio.new_event_loop())
        asyncio.get_event_loop().run_until_complete(self._setup_adapter())

    def teardown_class(self):
        _teardown_class_core(
            rootcanal_running=self.rootcanal_running,
            rootcanal_process=self.rootcanal_process,
            rootcanal_logger=self.rootcanal_logger,
            subprocess_wait_timeout_seconds=1)
        asyncio.get_event_loop().run_until_complete(self._teardown_adapter())
+64 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2019 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging

from blueberry.tests.gd.cert.gd_device import GdHostOnlyDevice
from blueberry.tests.gd.cert.gd_device import MOBLY_CONTROLLER_CONFIG_NAME
from blueberry.tests.gd.cert.os_utils import get_gd_root


def create(configs):
    return get_instances_with_configs(configs)


def destroy(devices):
    pass


def replace_vars_for_topshim(string, config):
    serial_number = config.get("serial_number")
    if serial_number is None:
        serial_number = ""
    rootcanal_port = config.get("rootcanal_port")
    if rootcanal_port is None:
        rootcanal_port = ""
    if serial_number == "DUT" or serial_number == "CERT":
        raise Exception("Did you forget to configure the serial number?")
    # We run bt_topshim_facade instead of bluetooth_stack_with_facade
    return string.replace("$GD_ROOT", get_gd_root()) \
                 .replace("bluetooth_stack_with_facade", "bt_topshim_facade") \
                 .replace("$(grpc_port)", config.get("grpc_port")) \
                 .replace("$(grpc_root_server_port)", config.get("grpc_root_server_port")) \
                 .replace("$(rootcanal_port)", rootcanal_port) \
                 .replace("$(signal_port)", config.get("signal_port")) \
                 .replace("$(serial_number)", serial_number)


def get_instances_with_configs(configs):
    logging.info(configs)
    devices = []
    for config in configs:
        resolved_cmd = []
        for arg in config["cmd"]:
            logging.debug(arg)
            resolved_cmd.append(replace_vars_for_topshim(arg, config))
        verbose_mode = bool(config.get('verbose_mode', False))
        device = GdHostOnlyDevice(config["grpc_port"], "-1", config["signal_port"], resolved_cmd, config["label"],
                                  MOBLY_CONTROLLER_CONFIG_NAME, config["name"], verbose_mode)
        device.setup()
        devices.append(device)
    return devices
+32 −0
Original line number Diff line number Diff line
_description: Bluetooth cert testing
TestBeds:
  - Name: HostOnlyCert
    Controllers:
      rootcanal:
        test_port: '6401'
        hci_port: '6402'
        link_layer_port: '6403'
      GdDevice:
        - grpc_port: '8998'
          grpc_root_server_port: '8996'
          signal_port: '8994'
          label: cert
          name: Cert Device
          cmd:
            - "$GD_ROOT/bluetooth_stack_with_facade"
            - "--grpc-port=$(grpc_port)"
            - "--root-server-port=$(grpc_root_server_port)"
            - "--rootcanal-port=$(rootcanal_port)"
            - "--signal-port=$(signal_port)"
        - grpc_port: '8999'
          grpc_root_server_port: '8997'
          signal_port: '8995'
          label: dut
          name: DUT Device
          cmd:
            - "$GD_ROOT/bluetooth_stack_with_facade"
            - "--grpc-port=$(grpc_port)"
            - "--root-server-port=$(grpc_root_server_port)"
            - "--rootcanal-port=$(rootcanal_port)"
            - "--signal-port=$(signal_port)"
logpath: "/tmp/logs"
Loading