Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 66e0e71b authored by Henri Chataing's avatar Henri Chataing
Browse files

Remove system/blueberry/tests/topshim

Bug: 384782957
Test: TreeHugger
Flag: EXEMPT, dead code removal
Change-Id: I1520c7851947a6b4fe067266dd249699d260e1d2
parent 5202e3c8
Loading
Loading
Loading
Loading
+0 −74
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.topshim.lib.topshim_base_test import TopshimBaseTest
from blueberry.tests.topshim.lib.adapter_client import AdapterClient

from mobly import test_runner


class AdapterTest(TopshimBaseTest):

    def test_verify_adapter_started(self):
        print("Adapter is verified when test starts")

    def test_enable_inquiry_scan(self):
        status, discovery_mode = self.dut().enable_inquiry_scan()
        assertThat(status).isEqualTo("Success")
        assertThat(discovery_mode).isEqualTo("ConnectableDiscoverable")

    def test_enable_page_scan(self):
        status, discovery_mode = self.dut().enable_page_scan()
        assertThat(status).isEqualTo("Success")
        assertThat(discovery_mode).isEqualTo("Connectable")

    def test_disable_page_scan(self):
        status, discovery_mode = self.dut().disable_page_scan()
        assertThat(status).isEqualTo("Success")
        assertThat(discovery_mode).isEqualTo("None_")

    def test_set_local_io_caps(self):
        status, caps = self.dut().set_local_io_caps(3)
        assertThat(status).isEqualTo("Success")
        assertThat(caps).isEqualTo("None_")

    def test_start_discovery(self):
        state = self.dut().toggle_discovery(True)
        assertThat(state).isEqualTo("Started")
        # Reset device to not discovering.
        self.dut().toggle_discovery(False)

    def test_cancel_discovery(self):
        self.dut().toggle_discovery(True)
        state = self.dut().toggle_discovery(False)
        assertThat(state).isEqualTo("Stopped")

    def test_find_device_device_available(self):
        self.dut().enable_inquiry_scan()
        self.cert().enable_inquiry_scan()
        self.dut().toggle_discovery(True)
        device_addr = self.dut().find_device()
        assertThat(device_addr).isNotNone()
        # Reset DUT device discovering and scanning to None
        self.dut().disable_page_scan()
        self.dut().toggle_discovery(False)
        # Reset CERT device to not discoverable
        self.cert().disable_page_scan()


if __name__ == "__main__":
    test_runner.main()
+0 −72
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.topshim.lib.topshim_base_test import TopshimBaseTest
from blueberry.tests.topshim.lib.topshim_device import TRANSPORT_CLASSIC

from mobly import test_runner


class HfpTest(TopshimBaseTest):

    def setup_test(self):
        super().setup_test()
        # Pair dut and cert device.
        self.dut().enable_inquiry_scan()
        self.cert().enable_inquiry_scan()
        self.dut().toggle_discovery(True)
        self.__paired_device = self.dut().find_device()
        self.dut().create_bond(address=self.__paired_device, transport=TRANSPORT_CLASSIC)

    def teardown_test(self):
        super().teardown_test()
        # Test teardown for dut and cert reset.
        self.dut().toggle_discovery(False)
        self.dut().disable_page_scan()
        self.cert().disable_page_scan()

    def test_hfp_connect_with_bond(self):
        state, _ = self.dut().start_slc(address=self.__paired_device)
        assertThat(state).isEqualTo("Connecting")
        state, _ = self.dut().wait_for_hfp_connection_state_change()
        assertThat(state).isEqualTo("Connected")
        state, conn_addr = self.dut().wait_for_hfp_connection_state_change()
        assertThat(state).isEqualTo("SlcConnected")
        assertThat(conn_addr).isEqualTo(self.__paired_device)

        #Extra steps to remove bonding to complete teardown.
        self.dut().remove_bonded_device(self.__paired_device)
        # This is required currently so that the HFP connection state change
        # callback doesn't affect other tests.
        self.dut().wait_for_hfp_connection_state_change()

    def test_hfp_disconnect_with_bond(self):
        state, _ = self.dut().start_slc(address=self.__paired_device)
        self.dut().wait_for_hfp_connection_state_change()  # To connected
        self.dut().wait_for_hfp_connection_state_change()  # To SLC connected

        # Actual test for stopping SLC connection.
        state, _ = self.dut().stop_slc(address=self.__paired_device)
        assertThat(state).isEqualTo("Disconnecting")
        state, _ = self.dut().wait_for_hfp_connection_state_change()
        assertThat(state).isEqualTo("Disconnected")
        #Extra steps to remove bonding to complete teardown.
        self.dut().remove_bonded_device(self.__paired_device)


if __name__ == "__main__":
    test_runner.main()
+0 −168
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2021 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
import grpc

from blueberry.facade.topshim import facade_pb2
from blueberry.facade.topshim import facade_pb2_grpc
from blueberry.tests.topshim.lib.async_closable import AsyncClosable

from google.protobuf import empty_pb2 as empty_proto


class AdapterClient(AsyncClosable):
    """
    Wrapper gRPC interface to the Topshim/BTIF layer
    """
    # Timeout for async wait
    DEFAULT_TIMEOUT = 2
    __task_list = []
    __channel = None
    __adapter_stub = None
    __adapter_event_stream = None

    def __init__(self, port=8999):
        self.__channel = grpc.aio.insecure_channel("localhost:%d" % port)
        self.__adapter_stub = facade_pb2_grpc.AdapterServiceStub(self.__channel)
        self.__adapter_event_stream = self.__adapter_stub.FetchEvents(facade_pb2.FetchEventsRequest())

    async def close(self):
        for task in self.__task_list:
            if task.done() or task.cancelled():
                continue
            task.cancel()
        self.__task_list.clear()
        await self.__channel.close()

    async def __get_next_event(self, event, future):
        """Get the future of next event from the stream"""
        while True:
            e = await self.__adapter_event_stream.read()

            # Match event by some condition.
            if e.event_type == event:
                future.set_result(e.params)
                break
            else:
                print("Got '%s'; expecting '%s'" % (e.event_type, event))
                print(e)

    async def _listen_for_event(self, event):
        """Start fetching events"""
        future = asyncio.get_running_loop().create_future()
        task = asyncio.get_running_loop().create_task(self.__get_next_event(event, future))
        self.__task_list.append(task)
        try:
            await asyncio.wait_for(future, AdapterClient.DEFAULT_TIMEOUT)
        except:
            task.cancel()
            print("Failed to get event", event)
        return future

    async def _verify_adapter_started(self):
        future = await self._listen_for_event(facade_pb2.EventType.ADAPTER_STATE)
        params = future.result()
        return params["state"].data[0] == "ON"

    async def toggle_stack(self, is_start=True):
        """Enable/disable the stack"""
        await self.__adapter_stub.ToggleStack(facade_pb2.ToggleStackRequest(start_stack=is_start))
        return await self._verify_adapter_started()

    async def enable_inquiry_scan(self):
        """Enable inquiry scan (Required to make device connectable and discoverable by other devices)"""
        await self.__adapter_stub.SetDiscoveryMode(facade_pb2.SetDiscoveryModeRequest(enable_inquiry_scan=True))
        return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY)

    async def enable_page_scan(self):
        """Enable page scan (might be used for A2dp sink to be discoverable)"""
        await self.__adapter_stub.SetDiscoveryMode(facade_pb2.SetDiscoveryModeRequest(enable_page_scan=True))
        return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY)

    async def disable_page_scan(self):
        """Enable page scan (might be used for A2dp sink to be discoverable)"""
        await self.__adapter_stub.SetDiscoveryMode(facade_pb2.SetDiscoveryModeRequest(enable_page_scan=False))
        return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY)

    async def clear_event_filter(self):
        await self.__adapter_stub.ClearEventFilter(empty_proto.Empty())

    async def clear_event_mask(self):
        await self.__adapter_stub.ClearEventMask(empty_proto.Empty())

    async def clear_filter_accept_list(self):
        await self.__adapter_stub.ClearFilterAcceptList(empty_proto.Empty())

    async def disconnect_all_acls(self):
        await self.__adapter_stub.DisconnectAllAcls(empty_proto.Empty())

    async def le_rand(self):
        await self.__adapter_stub.LeRand(empty_proto.Empty())
        future = await self._listen_for_event(facade_pb2.EventType.LE_RAND)
        params = future.result()
        return params["data"].data[0]

    async def restore_filter_accept_list(self):
        await self.__adapter_stub.RestoreFilterAcceptList(empty_proto.Empty())

    async def set_default_event_mask_except(self, mask, le_mask):
        await self.__adapter_stub.SetDefaultEventMaskExcept(
            facade_pb2.SetDefaultEventMaskExceptRequest(mask=mask, le_mask=le_mask))

    async def set_event_filter_inquiry_result_all_devices(self):
        await self.__adapter_stub.SetEventFilterInquiryResultAllDevices(empty_proto.Empty())

    async def set_event_filter_connection_setup_all_devices(self):
        await self.__adapter_stub.SetEventFilterConnectionSetupAllDevices(empty_proto.Empty())

    async def allow_wake_by_hid(self):
        await self.__adapter_stub.AllowWakeByHid(empty_proto.Empty())

    async def set_local_io_caps(self, io_capability):
        await self.__adapter_stub.SetLocalIoCaps(facade_pb2.SetLocalIoCapsRequest(io_capability=io_capability))
        return await self._listen_for_event(facade_pb2.EventType.ADAPTER_PROPERTY)

    async def toggle_discovery(self, is_start):
        await self.__adapter_stub.ToggleDiscovery(facade_pb2.ToggleDiscoveryRequest(is_start=is_start))
        future = await self._listen_for_event(facade_pb2.EventType.DISCOVERY_STATE)
        return future

    async def find_device(self):
        return await self._listen_for_event(facade_pb2.EventType.DEVICE_FOUND)


class A2dpAutomationHelper():
    """Invoke gRPC on topshim for A2DP testing"""

    def __init__(self, port=8999):
        self.__channel = grpc.insecure_channel("localhost:%d" % port)
        self.media_stub = facade_pb2_grpc.MediaServiceStub(self.__channel)

    """Start A2dp source profile service"""

    def start_source(self):
        self.media_stub.StartA2dp(facade_pb2.StartA2dpRequest(start_a2dp_source=True))

    """Start A2dp sink profile service"""

    def start_sink(self):
        self.media_stub.StartA2dp(facade_pb2.StartA2dpRequest(start_a2dp_sink=True))

    """Initialize an A2dp connection from source to sink"""

    def source_connect_to_remote(self, address="11:22:33:44:55:66"):
        self.media_stub.A2dpSourceConnect(facade_pb2.A2dpSourceConnectRequest(address=address))
+0 −50
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
import time
from abc import ABC, abstractmethod
import logging


class AsyncClosable(ABC):

    async def __async_exit(self, type=None, value=None, traceback=None):
        try:
            return await self.close()
        except Exception:
            logging.warning("Failed to close or already closed")

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        asyncio.run_until_complete(self.__async_exit(type, value, traceback))
        return traceback is None

    def __del__(self):
        asyncio.get_event_loop().run_until_complete(self.__async_exit())

    @abstractmethod
    async def close(self):
        pass


async def asyncSafeClose(closable):
    if closable is None:
        logging.warn("Tried to close an object that is None")
        return
    await closable.close()
+0 −385
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
import grpc

from blueberry.facade.topshim import facade_pb2
from blueberry.facade.topshim import facade_pb2_grpc
from blueberry.tests.topshim.lib.async_closable import AsyncClosable
from blueberry.tests.topshim.lib.async_closable import asyncSafeClose

from google.protobuf import empty_pb2 as empty_proto


class GattClient(AsyncClosable):
    """
    Wrapper gRPC interface to the GATT Service
    """
    # Timeout for async wait
    DEFAULT_TIMEOUT = 2
    __task_list = []
    __channel = None
    __gatt_stub = None
    __adapter_event_stream = None

    def __init__(self, port=8999):
        self.__channel = grpc.aio.insecure_channel("localhost:%d" % port)
        self.__gatt_stub = facade_pb2_grpc.GattServiceStub(self.__channel)
        #self.__gatt_event_stream = self.__gatt_stub.FetchEvents(facade_pb2.FetchEventsRequest())

    async def close(self):
        """
        Terminate the current tasks
        """
        for task in self.__task_list:
            task.cancel()
            task = None
        self.__task_list.clear()
        await self.__channel.close()

    async def register_advertiser(self):
        """
        """
        await self.__gatt_stub.RegisterAdvertiser(empty_proto.Empty())

    async def unregister_advertiser(self, advertiser_id):
        """
        Stop advertising for advertiser id
        """
        # TODO(optedoblivion): make message to pass advertiser id
        await self.__gatt_stub.UnregisterAdvertiser(empty_proto.Empty())

    async def get_own_address(self):
        """
        """
        await self.__gatt_stub.GetOwnAddress(empty_proto.Empty())

    async def set_parameters(self):
        """
        """
        await self.__gatt_stub.SetParameters(empty_proto.Empty())

    async def set_data(self):
        """
        """
        await self.__gatt_stub.SetData(empty_proto.Empty())

    async def advertising_enable(self):
        """
        """
        await self.__gatt_stub.AdvertisingEnable(empty_proto.Empty())

    async def advertising_disable(self):
        """
        """
        await self.__gatt_stub.AdvertisingDisable(empty_proto.Empty())

    async def set_periodic_advertising_parameters(self):
        """
        """
        await self.__gatt_stub.SetPeriodicAdvertisingParameters(empty_proto.Empty())

    async def set_periodic_advertising_data(self):
        """
        """
        await self.__gatt_stub.SetPeriodicAdvertisingData(empty_proto.Empty())

    async def set_periodic_advertising_enable(self):
        """
        """
        await self.__gatt_stub.SetPeriodicAdvertisingEnable(empty_proto.Empty())

    async def start_advertising(self):
        """
        """
        await self.__gatt_stub.StartAdvertising(empty_proto.Empty())

    async def start_advertising_set(self):
        """
        Start advertising with the given parameters
        """
        await self.__gatt_stub.StartAdvertisingSet(empty_proto.Empty())

    async def register_scanner(self):
        """
        """
        await self.__gatt_stub.RegisterScanner(empty_proto.Empty())

    async def unregister_scanner(self):
        """
        """
        await self.__gatt_stub.UnregisterScanner(empty_proto.Empty())

    async def start_scan(self):
        """
        """
        await self.__gatt_stub.StartScan(empty_proto.Empty())

    async def stop_scan(self):
        """
        """
        await self.__gatt_stub.StopScan(empty_proto.Empty())

    async def scan_filter_setup(self):
        """
        """
        await self.__gatt_stub.ScanFilterSetup(empty_proto.Empty())

    async def scan_filter_add(self):
        """
        """
        await self.__gatt_stub.ScanFilterAdd(empty_proto.Empty())

    async def scan_filter_clear(self):
        """
        """
        await self.__gatt_stub.ScanFilterClear(empty_proto.Empty())

    async def scan_filter_enable(self):
        """
        """
        await self.__gatt_stub.ScanFilterEnable(empty_proto.Empty())

    async def scan_filter_disable(self):
        """
        """
        await self.__gatt_stub.ScanFilterDisable(empty_proto.Empty())

    async def set_scan_parameters(self):
        """
        """
        await self.__gatt_stub.SetScanParameters(empty_proto.Empty())

    async def batch_scan_config_storage(self):
        """
        """
        await self.__gatt_stub.BatchScanConfigStorage(empty_proto.Empty())

    async def batch_scan_enable(self):
        """
        """
        await self.__gatt_stub.BatchScanEnable(empty_proto.Empty())

    async def batch_scan_disable(self):
        """
        """
        await self.__gatt_stub.BatchScanDisable(empty_proto.Empty())

    async def batch_scan_read_reports(self):
        """
        """
        await self.__gatt_stub.BatchScanReadReports(empty_proto.Empty())

    async def start_sync(self):
        """
        """
        await self.__gatt_stub.StartSync(empty_proto.Empty())

    async def stop_sync(self):
        """
        """
        await self.__gatt_stub.StopSync(empty_proto.Empty())

    async def cancel_create_sync(self):
        """
        """
        await self.__gatt_stub.CancelCreateSync(empty_proto.Empty())

    async def transfer_sync(self):
        """
        """
        await self.__gatt_stub.TransferSync(empty_proto.Empty())

    async def transfer_set_info(self):
        """
        """
        await self.__gatt_stub.TransferSetInfo(empty_proto.Empty())

    async def sync_tx_parameters(self):
        """
        """
        await self.__gatt_stub.SyncTxParameters(empty_proto.Empty())

    async def register_client(self):
        """
        """
        await self.__gatt_stub.RegisterClient(empty_proto.Empty())

    async def unregister_client(self):
        """
        """
        await self.__gatt_stub.UnregisterClient(empty_proto.Empty())

    async def connect(self):
        """
        """
        await self.__gatt_stub.Connect(empty_proto.Empty())

    async def disconnect(self):
        """
        """
        await self.__gatt_stub.Disconnect(empty_proto.Empty())

    async def refresh(self):
        """
        """
        await self.__gatt_stub.Refresh(empty_proto.Empty())

    async def search_service(self):
        """
        """
        await self.__gatt_stub.SearchService(empty_proto.Empty())

    async def btif_gattc_discover_service_by_uuid(self):
        """
        """
        await self.__gatt_stub.BtifGattcDiscoverServiceByUuid(empty_proto.Empty())

    async def read_characteristic(self):
        """
        """
        await self.__gatt_stub.ReadCharacteristic(empty_proto.Empty())

    async def read_using_characteristic_uuid(self):
        """
        """
        await self.__gatt_stub.ReadUsingCharacteristicUuid(empty_proto.Empty())

    async def write_characteristic(self):
        """
        """
        await self.__gatt_stub.WriteCharacteristic(empty_proto.Empty())

    async def read_descriptor(self):
        """
        """
        await self.__gatt_stub.ReadDescriptor(empty_proto.Empty())

    async def write_descriptor(self):
        """
        """
        await self.__gatt_stub.WriteDescriptor(empty_proto.Empty())

    async def execute_write(self):
        """
        """
        await self.__gatt_stub.ExecuteWrite(empty_proto.Empty())

    async def register_for_notification(self):
        """
        """
        await self.__gatt_stub.RegisterForNotification(empty_proto.Empty())

    async def deregister_for_notification(self):
        """
        """
        await self.__gatt_stub.DeregisterForNotification(empty_proto.Empty())

    async def read_remote_rssi(self):
        """
        """
        await self.__gatt_stub.ReadRemoteRssi(empty_proto.Empty())

    async def get_device_type(self):
        """
        """
        await self.__gatt_stub.GetDeviceType(empty_proto.Empty())

    async def configure_mtu(self):
        """
        """
        await self.__gatt_stub.ConfigureMtu(empty_proto.Empty())

    async def conn_parameter_update(self):
        """
        """
        await self.__gatt_stub.ConnParameterUpdate(empty_proto.Empty())

    async def set_preferred_phy(self):
        """
        """
        await self.__gatt_stub.SetPreferredPhy(empty_proto.Empty())

    async def read_phy(self):
        """
        """
        await self.__gatt_stub.ReadPhy(empty_proto.Empty())

    async def test_command(self):
        """
        """
        await self.__gatt_stub.TestCommand(empty_proto.Empty())

    async def get_gatt_db(self):
        """
        """
        await self.__gatt_stub.GetGattDb(empty_proto.Empty())

    async def register_server(self):
        """
        """
        await self.__gatt_stub.RegisterServer(empty_proto.Empty())

    async def unregister_server(self):
        """
        """
        await self.__gatt_stub.UnregisterServer(empty_proto.Empty())

    async def connect(self):
        """
        """
        await self.__gatt_stub.Connect(empty_proto.Empty())

    async def disconnect(self):
        """
        """
        await self.__gatt_stub.Disconnect(empty_proto.Empty())

    async def add_service(self):
        """
        """
        await self.__gatt_stub.AddService(empty_proto.Empty())

    async def stop_service(self):
        """
        """
        await self.__gatt_stub.StopService(empty_proto.Empty())

    async def delete_service(self):
        """
        """
        await self.__gatt_stub.DeleteService(empty_proto.Empty())

    async def send_indication(self):
        """
        """
        await self.__gatt_stub.SendIndication(empty_proto.Empty())

    async def send_response(self):
        """
        """
        await self.__gatt_stub.SendResponse(empty_proto.Empty())

    async def set_preferred_phy(self):
        """
        """
        await self.__gatt_stub.SetPreferredPhy(empty_proto.Empty())

    async def read_phy(self):
        """
        """
        await self.__gatt_stub.ReadPhy(empty_proto.Empty())
Loading