Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4d2bbac7 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge changes I51a03489,I1520c785 into main

* changes:
  gd/cert: Remove the build rule for bluetooth_cert_tests.zip
  Remove system/blueberry/tests/topshim
parents f69f4b91 7f6912c0
Loading
Loading
Loading
Loading
+0 −183
Original line number Diff line number Diff line
@@ -119,24 +119,6 @@ genrule {
    ],
}

genrule {
    name: "BlueberryFacadeAndCertGeneratedStub_py",
    tools: [
        "aprotoc",
        "protoc-gen-grpc-python-plugin",
        "soong_zip",
    ],
    cmd: "mkdir -p $(genDir)/files && " +
        "$(location aprotoc) -Ipackages/modules/Bluetooth/system -Iexternal/protobuf/src --plugin=protoc-gen-grpc=$(location protoc-gen-grpc-python-plugin) $(locations :BlueberryFacadeProto) --grpc_out=$(genDir)/files --python_out=$(genDir)/files && " +
        "for dir in `find $(genDir)/files -type d`; do touch $$dir/__init__.py; done &&" +
        "$(location soong_zip) -C $(genDir)/files -D $(genDir)/files -o $(out)",
    srcs: [
        ":BlueberryFacadeProto",
        ":libprotobuf-internal-protos",
    ],
    out: ["blueberry_facade_generated_py.zip"],
}

rust_protobuf {
    name: "libbt_topshim_facade_protobuf",
    crate_name: "bt_topshim_facade_protobuf",
@@ -160,168 +142,3 @@ cc_library_headers {
    ],
    min_sdk_version: "30",
}

genrule {
    name: "bluetooth_cert_test_sources-zip",
    tools: [
        "soong_zip",
    ],
    srcs: [
        "blueberry/**/*.py",
        "blueberry/**/*.yaml",
        "setup.py",
    ],
    out: ["bluetooth_cert_test_sources.zip"],
    cmd: "echo $(in) > $(genDir)/file_list.txt && " +
        "$(location soong_zip) -C packages/modules/Bluetooth/system -l $(genDir)/file_list.txt -o $(out)",
}

genrule {
    name: "gd_hci_packets_python3_gen-zip",
    tools: [
        "soong_zip",
    ],
    srcs: [
        ":gd_hci_packets_python3_gen",
    ],
    out: ["gd_hci_packets_python3_gen.zip"],
    cmd: "echo $(in) > $(genDir)/file_list.txt && " +
        "$(location soong_zip) -j -l $(genDir)/file_list.txt -o $(out)",
}

genrule {
    name: "gd_smp_packets_python3_gen-zip",
    tools: [
        "soong_zip",
    ],
    srcs: [
        ":gd_smp_packets_python3_gen",
    ],
    out: ["gd_smp_packets_python3_gen.zip"],
    cmd: "echo $(in) > $(genDir)/file_list.txt && " +
        "$(location soong_zip) -j -l $(genDir)/file_list.txt -o $(out)",
}

cc_genrule {
    name: "bluetooth_cert_test_host_deps-zip",
    host_supported: true,
    device_supported: false,
    compile_multilib: "first",
    tools: [
        "bluetooth_stack_with_facade",
        "bt_topshim_facade",
        "root-canal",
        "soong_zip",
    ],
    srcs: [
        ":libbase",
        ":libbinder",
        ":libbinder_ndk",
        ":libbluetooth",
        ":libbluetooth_gd",
        ":libc++",
        ":libchrome",
        ":libcrypto",
        ":libcutils",
        ":libevent",
        ":libflatbuffers-cpp",
        ":libgrpc++",
        ":libgrpc_wrap",
        ":liblog",
        ":liblzma",
        ":libprotobuf-cpp-full",
        ":libssl",
        ":libunwindstack",
        ":libutils",
        ":libz",
        ":server_configurable_flags",
    ],
    out: ["bluetooth_cert_test_host_deps.zip"],
    cmd: "echo $(in) > $(genDir)/file_list.txt && " +
        "$(location soong_zip) -j -f $(location bluetooth_stack_with_facade) -f $(location bt_topshim_facade) -f $(location root-canal) -P lib64 -l $(genDir)/file_list.txt -o $(out)",
}

cc_genrule {
    name: "bluetooth_cert_test_target_deps-zip",
    compile_multilib: "first",
    tools: [
        "soong_zip",
    ],
    srcs: [
        // executables
        ":bluetooth_stack_with_facade",
        // libs
        ":android.hardware.bluetooth@1.0",
        ":android.hardware.bluetooth@1.1",
        ":libandroid_runtime_lazy",
        ":libbase",
        ":libbinder",
        ":libbinder_ndk",
        ":libc++",
        ":libcrypto",
        ":libcutils",
        ":libgrpc++",
        ":libgrpc_wrap",
        ":libhidlbase",
        ":liblog",
        ":liblzma",
        ":libprotobuf-cpp-full",
        ":libssl",
        ":libunwindstack",
        ":libutils",
        ":libz",
        ":server_configurable_flags",
    ],
    out: ["bluetooth_cert_test_target_deps.zip"],
    cmd: "echo $(in) > $(genDir)/file_list.txt && " +
        "$(location soong_zip) -j -P target -l $(genDir)/file_list.txt -o $(out)",
}

genrule {
    name: "llvm-tools-zip",
    tools: [
        "soong_zip",
    ],
    srcs: [
        ":llvm-tools",
    ],
    out: ["llvm-tools.zip"],
    cmd: "mkdir -p $(genDir)/llvm_binutils/bin && mkdir -p $(genDir)/llvm_binutils/lib/x86_64-unknown-linux-gnu && " +
        "cp prebuilts/clang/host/linux-x86/*/bin/llvm-cov $(genDir)/llvm_binutils/bin && " +
        "cp prebuilts/clang/host/linux-x86/*/bin/llvm-profdata $(genDir)/llvm_binutils/bin && " +
        "cp prebuilts/clang/host/linux-x86/*/bin/llvm-symbolizer $(genDir)/llvm_binutils/bin && " +
        "cp prebuilts/clang/host/linux-x86/*/lib/x86_64-unknown-linux-gnu/libc++.so $(genDir)/llvm_binutils/lib/x86_64-unknown-linux-gnu && " +
        "$(location soong_zip) -C $(genDir) -D $(genDir)/llvm_binutils -o $(out)",
}

cc_genrule {
    name: "bluetooth_cert_tests.zip",
    host_supported: true,
    device_supported: false,
    compile_multilib: "first",
    tools: [
        "merge_zips",
        "soong_zip",
    ],
    srcs: [
        ":BlueberryFacadeAndCertGeneratedStub_py",
        ":bluetooth_cert_test_host_deps-zip",
        ":bluetooth_cert_test_sources-zip",
        ":gd_hci_packets_python3_gen-zip",
        ":gd_smp_packets_python3_gen-zip",
        ":llvm-tools-zip",
    ],
    device_first_srcs: [
        ":bluetooth_cert_test_target_deps-zip",
    ],
    out: ["bluetooth_cert_tests.zip"],
    cmd: "$(location merge_zips) $(genDir)/temp.zip $(in) && " +
        "unzip -q -d $(genDir)/files $(genDir)/temp.zip && " +
        "for d in $$(find $(genDir)/files/blueberry -type d -name '*'); do touch -a $$d/__init__.py; done && " +
        "$(location soong_zip) -C $(genDir)/files -D $(genDir)/files -o $(out)",
    dist: {
        targets: [
            "bluetooth_stack_with_facade",
        ],
    },
}
+0 −74
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.topshim.lib.topshim_base_test import TopshimBaseTest
from blueberry.tests.topshim.lib.adapter_client import AdapterClient

from mobly import test_runner


class AdapterTest(TopshimBaseTest):

    def test_verify_adapter_started(self):
        print("Adapter is verified when test starts")

    def test_enable_inquiry_scan(self):
        status, discovery_mode = self.dut().enable_inquiry_scan()
        assertThat(status).isEqualTo("Success")
        assertThat(discovery_mode).isEqualTo("ConnectableDiscoverable")

    def test_enable_page_scan(self):
        status, discovery_mode = self.dut().enable_page_scan()
        assertThat(status).isEqualTo("Success")
        assertThat(discovery_mode).isEqualTo("Connectable")

    def test_disable_page_scan(self):
        status, discovery_mode = self.dut().disable_page_scan()
        assertThat(status).isEqualTo("Success")
        assertThat(discovery_mode).isEqualTo("None_")

    def test_set_local_io_caps(self):
        status, caps = self.dut().set_local_io_caps(3)
        assertThat(status).isEqualTo("Success")
        assertThat(caps).isEqualTo("None_")

    def test_start_discovery(self):
        state = self.dut().toggle_discovery(True)
        assertThat(state).isEqualTo("Started")
        # Reset device to not discovering.
        self.dut().toggle_discovery(False)

    def test_cancel_discovery(self):
        self.dut().toggle_discovery(True)
        state = self.dut().toggle_discovery(False)
        assertThat(state).isEqualTo("Stopped")

    def test_find_device_device_available(self):
        self.dut().enable_inquiry_scan()
        self.cert().enable_inquiry_scan()
        self.dut().toggle_discovery(True)
        device_addr = self.dut().find_device()
        assertThat(device_addr).isNotNone()
        # Reset DUT device discovering and scanning to None
        self.dut().disable_page_scan()
        self.dut().toggle_discovery(False)
        # Reset CERT device to not discoverable
        self.cert().disable_page_scan()


if __name__ == "__main__":
    test_runner.main()
+0 −72
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.topshim.lib.topshim_base_test import TopshimBaseTest
from blueberry.tests.topshim.lib.topshim_device import TRANSPORT_CLASSIC

from mobly import test_runner


class HfpTest(TopshimBaseTest):

    def setup_test(self):
        super().setup_test()
        # Pair dut and cert device.
        self.dut().enable_inquiry_scan()
        self.cert().enable_inquiry_scan()
        self.dut().toggle_discovery(True)
        self.__paired_device = self.dut().find_device()
        self.dut().create_bond(address=self.__paired_device, transport=TRANSPORT_CLASSIC)

    def teardown_test(self):
        super().teardown_test()
        # Test teardown for dut and cert reset.
        self.dut().toggle_discovery(False)
        self.dut().disable_page_scan()
        self.cert().disable_page_scan()

    def test_hfp_connect_with_bond(self):
        state, _ = self.dut().start_slc(address=self.__paired_device)
        assertThat(state).isEqualTo("Connecting")
        state, _ = self.dut().wait_for_hfp_connection_state_change()
        assertThat(state).isEqualTo("Connected")
        state, conn_addr = self.dut().wait_for_hfp_connection_state_change()
        assertThat(state).isEqualTo("SlcConnected")
        assertThat(conn_addr).isEqualTo(self.__paired_device)

        #Extra steps to remove bonding to complete teardown.
        self.dut().remove_bonded_device(self.__paired_device)
        # This is required currently so that the HFP connection state change
        # callback doesn't affect other tests.
        self.dut().wait_for_hfp_connection_state_change()

    def test_hfp_disconnect_with_bond(self):
        state, _ = self.dut().start_slc(address=self.__paired_device)
        self.dut().wait_for_hfp_connection_state_change()  # To connected
        self.dut().wait_for_hfp_connection_state_change()  # To SLC connected

        # Actual test for stopping SLC connection.
        state, _ = self.dut().stop_slc(address=self.__paired_device)
        assertThat(state).isEqualTo("Disconnecting")
        state, _ = self.dut().wait_for_hfp_connection_state_change()
        assertThat(state).isEqualTo("Disconnected")
        #Extra steps to remove bonding to complete teardown.
        self.dut().remove_bonded_device(self.__paired_device)


if __name__ == "__main__":
    test_runner.main()
+0 −168
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2021 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
import grpc

from blueberry.facade.topshim import facade_pb2
from blueberry.facade.topshim import facade_pb2_grpc
from blueberry.tests.topshim.lib.async_closable import AsyncClosable

from google.protobuf import empty_pb2 as empty_proto


class AdapterClient(AsyncClosable):
    """
    Wrapper gRPC interface to the Topshim/BTIF layer
    """
    # Timeout for async wait
    DEFAULT_TIMEOUT = 2
    __task_list = []
    __channel = None
    __adapter_stub = None
    __adapter_event_stream = None

    def __init__(self, port=8999):
        self.__channel = grpc.aio.insecure_channel("localhost:%d" % port)
        self.__adapter_stub = facade_pb2_grpc.AdapterServiceStub(self.__channel)
        self.__adapter_event_stream = self.__adapter_stub.FetchEvents(facade_pb2.FetchEventsRequest())

    async def close(self):
        for task in self.__task_list:
            if task.done() or task.cancelled():
                continue
            task.cancel()
        self.__task_list.clear()
        await self.__channel.close()

    async def __get_next_event(self, event, future):
        """Get the future of next event from the stream"""
        while True:
            e = await self.__adapter_event_stream.read()

            # Match event by some condition.
            if e.event_type == event:
                future.set_result(e.params)
                break
            else:
                print("Got '%s'; expecting '%s'" % (e.event_type, event))
                print(e)

    async def _listen_for_event(self, event):
        """Start fetching events"""
        future = asyncio.get_running_loop().create_future()
        task = asyncio.get_running_loop().create_task(self.__get_next_event(event, future))
        self.__task_list.append(task)
        try:
            await asyncio.wait_for(future, AdapterClient.DEFAULT_TIMEOUT)
        except:
            task.cancel()
            print("Failed to get event", event)
        return future

    async def _verify_adapter_started(self):
        future = await self._listen_for_event(facade_pb2.EventType.ADAPTER_STATE)
        params = future.result()
        return params["state"].data[0] == "ON"

    async def toggle_stack(self, is_start=True):
        """Enable/disable the stack"""
        await self.__adapter_stub.ToggleStack(facade_pb2.ToggleStackRequest(start_stack=is_start))
        return await self._verify_adapter_started()

    async def enable_inquiry_scan(self):
        """Enable inquiry scan (Required to make device connectable and discoverable by other devices)"""
        await self.__adapter_stub.SetDiscoveryMode(facade_pb2.SetDiscoveryModeRequest(enable_inquiry_scan=True))
        return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY)

    async def enable_page_scan(self):
        """Enable page scan (might be used for A2dp sink to be discoverable)"""
        await self.__adapter_stub.SetDiscoveryMode(facade_pb2.SetDiscoveryModeRequest(enable_page_scan=True))
        return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY)

    async def disable_page_scan(self):
        """Enable page scan (might be used for A2dp sink to be discoverable)"""
        await self.__adapter_stub.SetDiscoveryMode(facade_pb2.SetDiscoveryModeRequest(enable_page_scan=False))
        return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY)

    async def clear_event_filter(self):
        await self.__adapter_stub.ClearEventFilter(empty_proto.Empty())

    async def clear_event_mask(self):
        await self.__adapter_stub.ClearEventMask(empty_proto.Empty())

    async def clear_filter_accept_list(self):
        await self.__adapter_stub.ClearFilterAcceptList(empty_proto.Empty())

    async def disconnect_all_acls(self):
        await self.__adapter_stub.DisconnectAllAcls(empty_proto.Empty())

    async def le_rand(self):
        await self.__adapter_stub.LeRand(empty_proto.Empty())
        future = await self._listen_for_event(facade_pb2.EventType.LE_RAND)
        params = future.result()
        return params["data"].data[0]

    async def restore_filter_accept_list(self):
        await self.__adapter_stub.RestoreFilterAcceptList(empty_proto.Empty())

    async def set_default_event_mask_except(self, mask, le_mask):
        await self.__adapter_stub.SetDefaultEventMaskExcept(
            facade_pb2.SetDefaultEventMaskExceptRequest(mask=mask, le_mask=le_mask))

    async def set_event_filter_inquiry_result_all_devices(self):
        await self.__adapter_stub.SetEventFilterInquiryResultAllDevices(empty_proto.Empty())

    async def set_event_filter_connection_setup_all_devices(self):
        await self.__adapter_stub.SetEventFilterConnectionSetupAllDevices(empty_proto.Empty())

    async def allow_wake_by_hid(self):
        await self.__adapter_stub.AllowWakeByHid(empty_proto.Empty())

    async def set_local_io_caps(self, io_capability):
        await self.__adapter_stub.SetLocalIoCaps(facade_pb2.SetLocalIoCapsRequest(io_capability=io_capability))
        return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY)

    async def toggle_discovery(self, is_start):
        await self.__adapter_stub.ToggleDiscovery(facade_pb2.ToggleDiscoveryRequest(is_start=is_start))
        future = await self._listen_for_event(facade_pb2.EventType.DISCOVERY_STATE)
        return future

    async def find_device(self):
        return await self._listen_for_event(facade_pb2.EventType.DEVICE_FOUND)


class A2dpAutomationHelper():
    """Invoke gRPC on topshim for A2DP testing"""

    def __init__(self, port=8999):
        self.__channel = grpc.insecure_channel("localhost:%d" % port)
        self.media_stub = facade_pb2_grpc.MediaServiceStub(self.__channel)

    """Start A2dp source profile service"""

    def start_source(self):
        self.media_stub.StartA2dp(facade_pb2.StartA2dpRequest(start_a2dp_source=True))

    """Start A2dp sink profile service"""

    def start_sink(self):
        self.media_stub.StartA2dp(facade_pb2.StartA2dpRequest(start_a2dp_sink=True))

    """Initialize an A2dp connection from source to sink"""

    def source_connect_to_remote(self, address="11:22:33:44:55:66"):
        self.media_stub.A2dpSourceConnect(facade_pb2.A2dpSourceConnectRequest(address=address))
+0 −50
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
import time
from abc import ABC, abstractmethod
import logging


class AsyncClosable(ABC):

    async def __async_exit(self, type=None, value=None, traceback=None):
        try:
            return await self.close()
        except Exception:
            logging.warning("Failed to close or already closed")

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        asyncio.run_until_complete(self.__async_exit(type, value, traceback))
        return traceback is None

    def __del__(self):
        asyncio.get_event_loop().run_until_complete(self.__async_exit())

    @abstractmethod
    async def close(self):
        pass


async def asyncSafeClose(closable):
    if closable is None:
        logging.warn("Tried to close an object that is None")
        return
    await closable.close()
Loading