Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 86c1a1d3 authored by Martin Brabham's avatar Martin Brabham
Browse files

Topshim: Remove original topshim package space.

Bug: 230373381
Test: run_topshim no longer works
Tag: #floss
Change-Id: I2df268018e31b2f12792a3ef787f12657503498a
parent eeb355b1
Loading
Loading
Loading
Loading
+0 −39
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2021 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
from mobly import test_runner
from blueberry.tests.gd.rust.topshim.facade import topshim_base_test
from blueberry.tests.gd.rust.topshim.facade.automation_helper import AdapterAutomationHelper


class AdapterTest(topshim_base_test.TopshimBaseTest):

    async def _test_verify_adapter_started(self):
        self.dut_adapter = AdapterAutomationHelper(port=self.dut_port)
        event_loop = asyncio.get_running_loop()
        self.dut_adapter.fetch_events(event_loop)
        self.dut_adapter.pending_future = event_loop.create_future()
        await self.dut_adapter.toggle_stack()
        await self.dut_adapter.verify_adapter_started()
        self.dut_adapter.event_handler.cancel()

    def test_verify_adapter_started(self):
        asyncio.run(self._test_verify_adapter_started())


if __name__ == "__main__":
    test_runner.main()
+0 −113
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2021 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
import grpc

from blueberry.facade.topshim import facade_pb2
from blueberry.facade.topshim import facade_pb2_grpc

from google.protobuf import empty_pb2 as empty_proto


class AdapterAutomationHelper():
    # Timeout for async wait
    DEFAULT_TIMEOUT = 2
    """Invoke gRPC on topshim for Adapter testing"""

    def __init__(self, port=8999):
        self.channel = grpc.aio.insecure_channel("localhost:%d" % port)
        self.adapter_stub = facade_pb2_grpc.AdapterServiceStub(self.channel)

        self.pending_future = None

    """Start fetching events"""

    def fetch_events(self, async_event_loop):
        self.adapter_event_stream = self.adapter_stub.FetchEvents(facade_pb2.FetchEventsRequest())
        self.event_handler = async_event_loop.create_task(self.get_next_event())

    """Enable/disable the stack"""

    async def toggle_stack(self, is_start=True):
        await self.adapter_stub.ToggleStack(facade_pb2.ToggleStackRequest(start_stack=is_start))

    """Enable page scan (might be used for A2dp sink to be discoverable)"""

    async def set_enable_page_scan(self):
        await self.adapter_stub.SetDiscoveryMode(facade_pb2.SetDiscoveryModeRequest(enable_page_scan=True))

    """Get the future of next event from the stream"""

    async def get_next_event(self):
        while True:
            e = await self.adapter_event_stream.read()
            print(e)
            # Match event by some condition.
            if e.event_type == facade_pb2.EventType.ADAPTER_STATE and e.data == "ON" and self.pending_future is not None:
                self.pending_future.set_result(True)
            if e.event_type == facade_pb2.EventType.LE_RAND:
                self.pending_future.set_result(True)

    async def verify_adapter_started(self):
        await asyncio.wait_for(self.pending_future, AdapterAutomationHelper.DEFAULT_TIMEOUT)

    async def clear_event_filter(self):
        await self.adapter_stub.ClearEventFilter(empty_proto.Empty())

    async def clear_event_mask(self):
        await self.adapter_stub.ClearEventMask(empty_proto.Empty())

    async def clear_filter_accept_list(self):
        await self.adapter_stub.ClearFilterAcceptList(empty_proto.Empty())

    async def disconnect_all_acls(self):
        await self.adapter_stub.DisconnectAllAcls(empty_proto.Empty())

    async def le_rand(self):
        await self.adapter_stub.LeRand(empty_proto.Empty())

    async def restore_filter_accept_list(self):
        await self.adapter_stub.RestoreFilterAcceptList(empty_proto.Empty())

    async def set_default_event_mask(self):
        await self.adapter_stub.SetDefaultEventMask(empty_proto.Empty())

    async def set_event_filter_inquiry_result_all_devices(self):
        await self.adapter_stub.SetEventFilterInquiryResultAllDevices(empty_proto.Empty())


class A2dpAutomationHelper():
    """Invoke gRPC on topshim for A2DP testing"""

    def __init__(self, port=8999):
        self.channel = grpc.insecure_channel("localhost:%d" % port)
        self.media_stub = facade_pb2_grpc.MediaServiceStub(self.channel)

    """Start A2dp source profile service"""

    def start_source(self):
        self.media_stub.StartA2dp(facade_pb2.StartA2dpRequest(start_a2dp_source=True))

    """Start A2dp sink profile service"""

    def start_sink(self):
        self.media_stub.StartA2dp(facade_pb2.StartA2dpRequest(start_a2dp_sink=True))

    """Initialize an A2dp connection from source to sink"""

    def source_connect_to_remote(self, address="11:22:33:44:55:66"):
        self.media_stub.A2dpSourceConnect(facade_pb2.A2dpSourceConnectRequest(address=address))
+0 −57
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2021 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
from mobly import test_runner
from blueberry.tests.gd.rust.topshim.facade import topshim_base_test
from blueberry.tests.gd.rust.topshim.facade.automation_helper import AdapterAutomationHelper


class SuspendTest(topshim_base_test.TopshimBaseTest):

    async def _test_verify_suspend(self):

        # Setup
        self.dut_adapter = AdapterAutomationHelper(port=self.dut_port)
        event_loop = asyncio.get_running_loop()

        # Get 'ON' event
        self.dut_adapter.pending_future = event_loop.create_future()
        self.dut_adapter.fetch_events(event_loop)
        await asyncio.wait_for(self.dut_adapter.pending_future, AdapterAutomationHelper.DEFAULT_TIMEOUT)

        # Start suspend work
        await self.dut_adapter.clear_event_filter()
        await self.dut_adapter.clear_event_mask()
        await self.dut_adapter.clear_filter_accept_list()
        # TODO(optedoblivion): Find a better way to disconnect active ACLs
        #await self.dut_adapter.disconnect_all_acls()
        await self.dut_adapter.le_rand()

        # Get 'LE RAND' event
        self.dut_adapter.pending_future = event_loop.create_future()
        self.dut_adapter.fetch_events(event_loop)
        await asyncio.wait_for(self.dut_adapter.pending_future, AdapterAutomationHelper.DEFAULT_TIMEOUT)

        # Teardown
        self.dut_adapter.event_handler.cancel()

    def test_verify_suspend(self):
        asyncio.run(self._test_verify_suspend())


if __name__ == "__main__":
    test_runner.main()
+0 −198
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2021 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import importlib
import logging
import os
import signal
import subprocess

from blueberry.tests.gd.cert.context import get_current_context
from blueberry.tests.gd.cert.tracelogger import TraceLogger
from blueberry.tests.gd.cert.async_subprocess_logger import AsyncSubprocessLogger
from blueberry.tests.gd.cert.os_utils import get_gd_root
from blueberry.tests.gd.cert.os_utils import get_gd_root
from blueberry.tests.gd.cert.os_utils import read_crash_snippet_and_log_tail
from blueberry.tests.gd.cert.os_utils import is_subprocess_alive
from blueberry.tests.gd.cert.os_utils import make_ports_available
from blueberry.tests.gd.cert.os_utils import TerminalColor
from mobly import asserts
from mobly import base_test

CONTROLLER_CONFIG_NAME = "GdDevice"


def setup_test_core(verbose_mode, log_path_base, controller_configs):
    info = {}
    info['controller_configs'] = controller_configs

    # Start root-canal if needed
    info['rootcanal_running'] = False
    info['rootcanal_logpath'] = None
    info['rootcanal_process'] = None
    info['rootcanal_logger'] = None
    if 'rootcanal' not in info['controller_configs']:
        return
    info['rootcanal_running'] = True
    # Get root canal binary
    rootcanal = os.path.join(get_gd_root(), "root-canal")
    info['rootcanal'] = rootcanal
    info['rootcanal_exist'] = os.path.isfile(rootcanal)
    if not os.path.isfile(rootcanal):
        return info
    # Get root canal log
    rootcanal_logpath = os.path.join(log_path_base, 'rootcanal_logs.txt')
    info['rootcanal_logpath'] = rootcanal_logpath
    # Make sure ports are available
    rootcanal_config = info['controller_configs']['rootcanal']
    rootcanal_test_port = int(rootcanal_config.get("test_port", "6401"))
    rootcanal_hci_port = int(rootcanal_config.get("hci_port", "6402"))
    rootcanal_link_layer_port = int(rootcanal_config.get("link_layer_port", "6403"))

    info['make_rootcanal_ports_available'] = make_ports_available((rootcanal_test_port, rootcanal_hci_port,
                                                                   rootcanal_link_layer_port))
    if not make_ports_available((rootcanal_test_port, rootcanal_hci_port, rootcanal_link_layer_port)):
        return info

    # Start root canal process
    rootcanal_cmd = [rootcanal, str(rootcanal_test_port), str(rootcanal_hci_port), str(rootcanal_link_layer_port)]
    info['rootcanal_cmd'] = rootcanal_cmd

    rootcanal_process = subprocess.Popen(
        rootcanal_cmd,
        cwd=get_gd_root(),
        env=os.environ.copy(),
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        universal_newlines=True)

    info['rootcanal_process'] = rootcanal_process
    if rootcanal_process:
        info['is_rootcanal_process_started'] = True
    else:
        info['is_rootcanal_process_started'] = False
        return info
    info['is_subprocess_alive'] = is_subprocess_alive(rootcanal_process)
    if not is_subprocess_alive(rootcanal_process):
        info['is_subprocess_alive'] = False
        return info

    info['rootcanal_logger'] = AsyncSubprocessLogger(
        rootcanal_process, [rootcanal_logpath],
        log_to_stdout=verbose_mode,
        tag="rootcanal",
        color=TerminalColor.MAGENTA)

    # Modify the device config to include the correct root-canal port
    for gd_device_config in info['controller_configs'].get("GdDevice"):
        gd_device_config["rootcanal_port"] = str(rootcanal_hci_port)

    return info


def teardown_class_core(rootcanal_running, rootcanal_process, rootcanal_logger, subprocess_wait_timeout_seconds):
    if rootcanal_running:
        stop_signal = signal.SIGINT
        rootcanal_process.send_signal(stop_signal)
        try:
            return_code = rootcanal_process.wait(timeout=subprocess_wait_timeout_seconds)
        except subprocess.TimeoutExpired:
            logging.error("Failed to interrupt root canal via SIGINT, sending SIGKILL")
            stop_signal = signal.SIGKILL
            rootcanal_process.kill()
            try:
                return_code = rootcanal_process.wait(timeout=subprocess_wait_timeout_seconds)
            except subprocess.TimeoutExpired:
                logging.error("Failed to kill root canal")
                return_code = -65536
        if return_code != 0 and return_code != -stop_signal:
            logging.error("rootcanal stopped with code: %d" % return_code)
        rootcanal_logger.stop()


def dump_crashes_core(dut, cert, rootcanal_running, rootcanal_process, rootcanal_logpath):
    dut_crash, dut_log_tail = dut.get_crash_snippet_and_log_tail()
    cert_crash, cert_log_tail = cert.get_crash_snippet_and_log_tail()
    rootcanal_crash = None
    rootcanal_log_tail = None
    if rootcanal_running and not is_subprocess_alive(rootcanal_process):
        rootcanal_crash, roocanal_log_tail = read_crash_snippet_and_log_tail(rootcanal_logpath)

    crash_detail = ""
    if dut_crash or cert_crash or rootcanal_crash:
        if rootcanal_crash:
            crash_detail += "rootcanal crashed:\n\n%s\n\n" % rootcanal_crash
        if dut_crash:
            crash_detail += "dut stack crashed:\n\n%s\n\n" % dut_crash
        if cert_crash:
            crash_detail += "cert stack crashed:\n\n%s\n\n" % cert_crash
    else:
        if rootcanal_log_tail:
            crash_detail += "rootcanal log tail:\n\n%s\n\n" % rootcanal_log_tail
        if dut_log_tail:
            crash_detail += "dut log tail:\n\n%s\n\n" % dut_log_tail
        if cert_log_tail:
            crash_detail += "cert log tail:\n\n%s\n\n" % cert_log_tail

    return crash_detail


class TopshimBaseTest(base_test.BaseTestClass):

    def setup_class(self):
        super().setup_test()
        self.log = TraceLogger(logging.getLogger())
        self.log_path_base = get_current_context().get_full_output_path()
        self.verbose_mode = bool(self.user_params.get('verbose_mode', False))
        for config in self.controller_configs[CONTROLLER_CONFIG_NAME]:
            config['verbose_mode'] = self.verbose_mode

        self.info = setup_test_core(
            verbose_mode=self.verbose_mode,
            log_path_base=self.log_path_base,
            controller_configs=self.controller_configs)
        self.rootcanal_running = self.info['rootcanal_running']
        self.rootcanal_logpath = self.info['rootcanal_logpath']
        self.rootcanal_process = self.info['rootcanal_process']
        self.rootcanal_logger = self.info['rootcanal_logger']

        asserts.assert_true(self.info['rootcanal_exist'], "Root canal does not exist at %s" % self.info['rootcanal'])
        asserts.assert_true(self.info['make_rootcanal_ports_available'], "Failed to make root canal ports available")

        self.log.debug("Running %s" % " ".join(self.info['rootcanal_cmd']))
        asserts.assert_true(
            self.info['is_rootcanal_process_started'], msg="Cannot start root-canal at " + str(self.info['rootcanal']))
        asserts.assert_true(self.info['is_subprocess_alive'], msg="root-canal stopped immediately after running")

        self.controller_configs = self.info['controller_configs']

        controllers = self.register_controller(
            importlib.import_module('blueberry.tests.gd.rust.topshim.facade.topshim_device'))
        self.cert_port = controllers[0].grpc_port
        self.dut_port = controllers[1].grpc_port

    def test_empty(self):
        pass

    def teardown_test(self):
        return super().teardown_test()

    def teardown_class(self):
        teardown_class_core(
            rootcanal_running=self.rootcanal_running,
            rootcanal_process=self.rootcanal_process,
            rootcanal_logger=self.rootcanal_logger,
            subprocess_wait_timeout_seconds=1)
+0 −64
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2019 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging

from blueberry.tests.gd.cert.gd_device import GdHostOnlyDevice
from blueberry.tests.gd.cert.gd_device import MOBLY_CONTROLLER_CONFIG_NAME
from blueberry.tests.gd.cert.os_utils import get_gd_root


def create(configs):
    return get_instances_with_configs(configs)


def destroy(devices):
    pass


def replace_vars_for_topshim(string, config):
    serial_number = config.get("serial_number")
    if serial_number is None:
        serial_number = ""
    rootcanal_port = config.get("rootcanal_port")
    if rootcanal_port is None:
        rootcanal_port = ""
    if serial_number == "DUT" or serial_number == "CERT":
        raise Exception("Did you forget to configure the serial number?")
    # We run bt_topshim_facade instead of bluetooth_stack_with_facade
    return string.replace("$GD_ROOT", get_gd_root()) \
                 .replace("bluetooth_stack_with_facade", "bt_topshim_facade") \
                 .replace("$(grpc_port)", config.get("grpc_port")) \
                 .replace("$(grpc_root_server_port)", config.get("grpc_root_server_port")) \
                 .replace("$(rootcanal_port)", rootcanal_port) \
                 .replace("$(signal_port)", config.get("signal_port")) \
                 .replace("$(serial_number)", serial_number)


def get_instances_with_configs(configs):
    logging.info(configs)
    devices = []
    for config in configs:
        resolved_cmd = []
        for arg in config["cmd"]:
            logging.debug(arg)
            resolved_cmd.append(replace_vars_for_topshim(arg, config))
        verbose_mode = bool(config.get('verbose_mode', False))
        device = GdHostOnlyDevice(config["grpc_port"], "-1", config["signal_port"], resolved_cmd, config["label"],
                                  MOBLY_CONTROLLER_CONFIG_NAME, config["name"], verbose_mode)
        device.setup()
        devices.append(device)
    return devices
Loading