Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b83dd9a5 authored by Martin Brabham's avatar Martin Brabham Committed by Gerrit Code Review
Browse files

Merge changes from topic "floss_suspend_resume_api"

* changes:
  Make errors a bit more obvious
  Floss: Add GATT interface to suspend module.
  Floss: Add GATT interface for use by modules.
  Floss: GATT facade service and client topshim test apis.
  Floss: Tell the controller to allow all devices Topshim API
  Floss: Tell the controller to allow all devices BTIF API
  Floss: Tell the controller to allow all devices BTA API
  Floss: Tell the controller to allow all devices BTM API
  Floss: Tell the controller to allow all devices Controller shim API
  Floss: Topshim test updates.
  Floss: Add in async waiting for random number.
  Floss: Allow the device to be woken by HID devices Topshim API
  Floss: Allow the device to be woken by HID devices BTIF API
  Floss: Allow the device to be woken by HID devices BTA API
  Floss: Allow the device to be woken by HID devices BTM API
  Floss: Allow the device to be woken by HID devices Controller shim API
  Floss: Allow the device to be woken by HID devices Controller API
  Floss: Plumb suspend id from powerd to suspend
  Floss: Better ACL Disconnect implementation.
  Floss: Le Rand Callback interface declaration
parents 54cf6580 99b7541e
Loading
Loading
Loading
Loading
+78 −0
Original line number Diff line number Diff line
@@ -13,11 +13,89 @@ service AdapterService {
  rpc ClearFilterAcceptList(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc DisconnectAllAcls(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc LeRand(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetEventFilterConnectionSetupAllDevices(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc AllowWakeByHid(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc RestoreFilterAcceptList(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetDefaultEventMask(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetEventFilterInquiryResultAllDevices(google.protobuf.Empty) returns (google.protobuf.Empty) {}
}

service GattService {
  // Advertiser
  rpc RegisterAdvertiser(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc UnregisterAdvertiser(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc GetOwnAddress(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetParameters(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetData(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc AdvertisingEnable(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc AdvertisingDisable(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetPeriodicAdvertisingParameters(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetPeriodicAdvertisingData(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetPeriodicAdvertisingEnable(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc StartAdvertising(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc StartAdvertisingSet(google.protobuf.Empty) returns (google.protobuf.Empty) {}

  // Scanner
  rpc RegisterScanner(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc UnregisterScanner(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc StartScan(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc StopScan(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ScanFilterSetup(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ScanFilterAdd(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ScanFilterClear(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ScanFilterEnable(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ScanFilterDisable(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetScanParameters(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc BatchScanConfigStorage(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc BatchScanEnable(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc BatchScanDisable(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc BatchScanReadReports(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc StartSync(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc StopSync(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc CancelCreateSync(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc TransferSync(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc TransferSetInfo(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SyncTxParameters(google.protobuf.Empty) returns (google.protobuf.Empty) {}

  // Gatt Client
  rpc RegisterClient(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc UnregisterClient(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ClientConnect(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ClientDisconnect(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc Refresh(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SearchService(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc BtifGattcDiscoverServiceByUuid(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ReadCharacteristic(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ReadUsingCharacteristicUuid(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc WriteCharacteristic(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ReadDescriptor(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc WriteDescriptor(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ExecuteWrite(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc RegisterForNotification(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc DeregisterForNotification(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ReadRemoteRssi(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc GetDeviceType(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ConfigureMtu(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ConnParameterUpdate(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ClientSetPreferredPhy(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ClientReadPhy(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc TestCommand(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc GetGattDb(google.protobuf.Empty) returns (google.protobuf.Empty) {}

  // Gatt Server
  rpc RegisterServer(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc UnregisterServer(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ServerConnect(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ServerDisconnect(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc AddService(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc StopService(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc DeleteService(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SendIndication(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SendResponse(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ServerSetPreferredPhy(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc ServerReadPhy(google.protobuf.Empty) returns (google.protobuf.Empty) {}
}

enum EventType {
  ADAPTER_STATE = 0;
  SSP_REQUEST = 1;
+6 −1
Original line number Diff line number Diff line
@@ -94,7 +94,6 @@ class AdapterClient():
    async def le_rand(self):
        await self.__adapter_stub.LeRand(empty_proto.Empty())
        future = await self._listen_for_event(facade_pb2.EventType.LE_RAND)
        #        await asyncio.wait_for(future, AdapterClient.DEFAULT_TIMEOUT)
        return future.result()

    async def restore_filter_accept_list(self):
@@ -106,6 +105,12 @@ class AdapterClient():
    async def set_event_filter_inquiry_result_all_devices(self):
        await self.__adapter_stub.SetEventFilterInquiryResultAllDevices(empty_proto.Empty())

    async def set_event_filter_connection_setup_all_devices(self):
        await self.__adapter_stub.SetEventFilterConnectionSetupAllDevices(empty_proto.Empty())

    async def allow_wake_by_hid(self):
        await self.__adapter_stub.AllowWakeByHid(empty_proto.Empty())


class A2dpAutomationHelper():
    """Invoke gRPC on topshim for A2DP testing"""
+383 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import asyncio
import grpc

from blueberry.facade.topshim import facade_pb2
from blueberry.facade.topshim import facade_pb2_grpc

from google.protobuf import empty_pb2 as empty_proto


class GattClient():
    """
    Wrapper gRPC interface to the GATT Service
    """
    # Timeout for async wait
    DEFAULT_TIMEOUT = 2
    __task_list = []
    __channel = None
    __gatt_stub = None
    __adapter_event_stream = None

    def __init__(self, port=8999):
        self.__channel = grpc.aio.insecure_channel("localhost:%d" % port)
        self.__gatt_stub = facade_pb2_grpc.GattServiceStub(self.__channel)
        #self.__gatt_event_stream = self.__gatt_stub.FetchEvents(facade_pb2.FetchEventsRequest())

    async def terminate(self):
        """
        Terminate the current tasks
        """
        for task in self.__task_list:
            task.cancel()
            task = None
        self.__task_list.clear()
        await self.__channel.close()

    async def register_advertiser(self):
        """
        """
        await self.__gatt_stub.RegisterAdvertiser(empty_proto.Empty())

    async def unregister_advertiser(self, advertiser_id):
        """
        Stop advertising for advertiser id
        """
        # TODO(optedoblivion): make message to pass advertiser id
        await self.__gatt_stub.UnregisterAdvertiser(empty_proto.Empty())

    async def get_own_address(self):
        """
        """
        await self.__gatt_stub.GetOwnAddress(empty_proto.Empty())

    async def set_parameters(self):
        """
        """
        await self.__gatt_stub.SetParameters(empty_proto.Empty())

    async def set_data(self):
        """
        """
        await self.__gatt_stub.SetData(empty_proto.Empty())

    async def advertising_enable(self):
        """
        """
        await self.__gatt_stub.AdvertisingEnable(empty_proto.Empty())

    async def advertising_disable(self):
        """
        """
        await self.__gatt_stub.AdvertisingDisable(empty_proto.Empty())

    async def set_periodic_advertising_parameters(self):
        """
        """
        await self.__gatt_stub.SetPeriodicAdvertisingParameters(empty_proto.Empty())

    async def set_periodic_advertising_data(self):
        """
        """
        await self.__gatt_stub.SetPeriodicAdvertisingData(empty_proto.Empty())

    async def set_periodic_advertising_enable(self):
        """
        """
        await self.__gatt_stub.SetPeriodicAdvertisingEnable(empty_proto.Empty())

    async def start_advertising(self):
        """
        """
        await self.__gatt_stub.StartAdvertising(empty_proto.Empty())

    async def start_advertising_set(self):
        """
        Start advertising with the given parameters
        """
        await self.__gatt_stub.StartAdvertisingSet(empty_proto.Empty())

    async def register_scanner(self):
        """
        """
        await self.__gatt_stub.RegisterScanner(empty_proto.Empty())

    async def unregister_scanner(self):
        """
        """
        await self.__gatt_stub.UnregisterScanner(empty_proto.Empty())

    async def start_scan(self):
        """
        """
        await self.__gatt_stub.StartScan(empty_proto.Empty())

    async def stop_scan(self):
        """
        """
        await self.__gatt_stub.StopScan(empty_proto.Empty())

    async def scan_filter_setup(self):
        """
        """
        await self.__gatt_stub.ScanFilterSetup(empty_proto.Empty())

    async def scan_filter_add(self):
        """
        """
        await self.__gatt_stub.ScanFilterAdd(empty_proto.Empty())

    async def scan_filter_clear(self):
        """
        """
        await self.__gatt_stub.ScanFilterClear(empty_proto.Empty())

    async def scan_filter_enable(self):
        """
        """
        await self.__gatt_stub.ScanFilterEnable(empty_proto.Empty())

    async def scan_filter_disable(self):
        """
        """
        await self.__gatt_stub.ScanFilterDisable(empty_proto.Empty())

    async def set_scan_parameters(self):
        """
        """
        await self.__gatt_stub.SetScanParameters(empty_proto.Empty())

    async def batch_scan_config_storage(self):
        """
        """
        await self.__gatt_stub.BatchScanConfigStorage(empty_proto.Empty())

    async def batch_scan_enable(self):
        """
        """
        await self.__gatt_stub.BatchScanEnable(empty_proto.Empty())

    async def batch_scan_disable(self):
        """
        """
        await self.__gatt_stub.BatchScanDisable(empty_proto.Empty())

    async def batch_scan_read_reports(self):
        """
        """
        await self.__gatt_stub.BatchScanReadReports(empty_proto.Empty())

    async def start_sync(self):
        """
        """
        await self.__gatt_stub.StartSync(empty_proto.Empty())

    async def stop_sync(self):
        """
        """
        await self.__gatt_stub.StopSync(empty_proto.Empty())

    async def cancel_create_sync(self):
        """
        """
        await self.__gatt_stub.CancelCreateSync(empty_proto.Empty())

    async def transfer_sync(self):
        """
        """
        await self.__gatt_stub.TransferSync(empty_proto.Empty())

    async def transfer_set_info(self):
        """
        """
        await self.__gatt_stub.TransferSetInfo(empty_proto.Empty())

    async def sync_tx_parameters(self):
        """
        """
        await self.__gatt_stub.SyncTxParameters(empty_proto.Empty())

    async def register_client(self):
        """
        """
        await self.__gatt_stub.RegisterClient(empty_proto.Empty())

    async def unregister_client(self):
        """
        """
        await self.__gatt_stub.UnregisterClient(empty_proto.Empty())

    async def connect(self):
        """
        """
        await self.__gatt_stub.Connect(empty_proto.Empty())

    async def disconnect(self):
        """
        """
        await self.__gatt_stub.Disconnect(empty_proto.Empty())

    async def refresh(self):
        """
        """
        await self.__gatt_stub.Refresh(empty_proto.Empty())

    async def search_service(self):
        """
        """
        await self.__gatt_stub.SearchService(empty_proto.Empty())

    async def btif_gattc_discover_service_by_uuid(self):
        """
        """
        await self.__gatt_stub.BtifGattcDiscoverServiceByUuid(empty_proto.Empty())

    async def read_characteristic(self):
        """
        """
        await self.__gatt_stub.ReadCharacteristic(empty_proto.Empty())

    async def read_using_characteristic_uuid(self):
        """
        """
        await self.__gatt_stub.ReadUsingCharacteristicUuid(empty_proto.Empty())

    async def write_characteristic(self):
        """
        """
        await self.__gatt_stub.WriteCharacteristic(empty_proto.Empty())

    async def read_descriptor(self):
        """
        """
        await self.__gatt_stub.ReadDescriptor(empty_proto.Empty())

    async def write_descriptor(self):
        """
        """
        await self.__gatt_stub.WriteDescriptor(empty_proto.Empty())

    async def execute_write(self):
        """
        """
        await self.__gatt_stub.ExecuteWrite(empty_proto.Empty())

    async def register_for_notification(self):
        """
        """
        await self.__gatt_stub.RegisterForNotification(empty_proto.Empty())

    async def deregister_for_notification(self):
        """
        """
        await self.__gatt_stub.DeregisterForNotification(empty_proto.Empty())

    async def read_remote_rssi(self):
        """
        """
        await self.__gatt_stub.ReadRemoteRssi(empty_proto.Empty())

    async def get_device_type(self):
        """
        """
        await self.__gatt_stub.GetDeviceType(empty_proto.Empty())

    async def configure_mtu(self):
        """
        """
        await self.__gatt_stub.ConfigureMtu(empty_proto.Empty())

    async def conn_parameter_update(self):
        """
        """
        await self.__gatt_stub.ConnParameterUpdate(empty_proto.Empty())

    async def set_preferred_phy(self):
        """
        """
        await self.__gatt_stub.SetPreferredPhy(empty_proto.Empty())

    async def read_phy(self):
        """
        """
        await self.__gatt_stub.ReadPhy(empty_proto.Empty())

    async def test_command(self):
        """
        """
        await self.__gatt_stub.TestCommand(empty_proto.Empty())

    async def get_gatt_db(self):
        """
        """
        await self.__gatt_stub.GetGattDb(empty_proto.Empty())

    async def register_server(self):
        """
        """
        await self.__gatt_stub.RegisterServer(empty_proto.Empty())

    async def unregister_server(self):
        """
        """
        await self.__gatt_stub.UnregisterServer(empty_proto.Empty())

    async def connect(self):
        """
        """
        await self.__gatt_stub.Connect(empty_proto.Empty())

    async def disconnect(self):
        """
        """
        await self.__gatt_stub.Disconnect(empty_proto.Empty())

    async def add_service(self):
        """
        """
        await self.__gatt_stub.AddService(empty_proto.Empty())

    async def stop_service(self):
        """
        """
        await self.__gatt_stub.StopService(empty_proto.Empty())

    async def delete_service(self):
        """
        """
        await self.__gatt_stub.DeleteService(empty_proto.Empty())

    async def send_indication(self):
        """
        """
        await self.__gatt_stub.SendIndication(empty_proto.Empty())

    async def send_response(self):
        """
        """
        await self.__gatt_stub.SendResponse(empty_proto.Empty())

    async def set_preferred_phy(self):
        """
        """
        await self.__gatt_stub.SetPreferredPhy(empty_proto.Empty())

    async def read_phy(self):
        """
        """
        await self.__gatt_stub.ReadPhy(empty_proto.Empty())
+14 −0
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@ from blueberry.tests.gd.cert.os_utils import TerminalColor
from blueberry.tests.gd.cert.tracelogger import TraceLogger
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.topshim.lib.adapter_client import AdapterClient
from blueberry.tests.topshim.lib.gatt_client import GattClient

from mobly import asserts
from mobly import base_test
@@ -156,16 +157,29 @@ def dump_crashes_core(dut, cert, rootcanal_running, rootcanal_process, rootcanal

class TopshimBaseTest(base_test.BaseTestClass):

    # TODO(optedoblivion): Make TopshimTestStack class
    dut_adapter = None
    dut_gatt = None

    cert_adapter = None
    cert_gatt = None

    async def _setup_adapter(self):
        self.dut_adapter = AdapterClient(port=self.dut_port)
        self.cert_adapter = AdapterClient(port=self.cert_port)
        started = await self.dut_adapter._verify_adapter_started()
        assertThat(started).isTrue()
        started = await self.cert_adapter._verify_adapter_started()
        assertThat(started).isTrue()
        self.dut_gatt = GattClient(port=self.dut_port)
        self.cert_gatt = GattClient(port=self.cert_port)
        return started

    async def _teardown_adapter(self):
        await self.dut_adapter.terminate()
        await self.dut_gatt.terminate()
        await self.cert_adapter.terminate()
        await self.cert_gatt.terminate()

    def setup_class(self):
        """
+66 −21
Original line number Diff line number Diff line
@@ -25,35 +25,80 @@ from mobly import test_runner

class SuspendTest(TopshimBaseTest):

    async def __verify_disconnected_suspend(self):
    async def __verify_no_wake_suspend(self):
        # Start suspend work
        await self.dut_adapter.clear_event_filter()
        await self.dut_adapter.clear_event_mask()
        await self.dut_adapter.clear_event_filter()
        await self.dut_adapter.clear_filter_accept_list()
        # TODO(optedoblivion): Find a better way to disconnect active ACLs
        # await self.dut_adapter.disconnect_all_acls()
        random = await self.dut_adapter.le_rand()
        return random
        await self.dut_gatt.advertising_disable()
        await self.dut_gatt.stop_scan()
        await self.dut_adapter.disconnect_all_acls()
        return await self.dut_adapter.le_rand()

    async def __verify_disconnected_resume(self):
    async def __verify_no_wake_resume(self):
        # Start resume work
        await self.dut_adapter.set_default_event_mask()
        await self.dut_adapter.set_event_filter_inquiry_result_all_devices()
        await self.dut_adapter.set_event_filter_connection_setup_all_devices()
        return await self.dut_adapter.le_rand()

    async def __verify_wakeful_suspend(self, is_a2dp_connected):
        await self.dut_adapter.clear_event_mask()
        await self.dut_adapter.clear_event_filter()
        await self.dut_adapter.clear_filter_accept_list()
        await self.dut_gatt.advertising_disable()
        await self.dut_gatt.stop_scan()
        if is_a2dp_connected:
            # await self.media_server.disconnect_a2dp()
            pass
        await self.dut_adapter.disconnect_all_acls()
        await self.dut_adapter.allow_wake_by_hid()
        return await self.dut_adapter.le_rand()

    async def __verify_wakeful_resume(self, was_a2dp_connected):
        # Start resume work
        await self.dut_adapter.set_default_event_mask()
        await self.dut_adapter.set_event_filter_inquiry_result_all_devices()
        await self.dut_adapter.set_event_filter_connection_setup_all_devices()
        if was_a2dp_connected:
            # restore filter accept list?
            await self.dut_adapter.restore_filter_accept_list()
        random = await self.dut_adapter.le_rand()
        return random
            # reconnect a2dp
            # await self.media_server.reconnect_last_a2dp()
            # await self.gatt.restart_all_previous_advertising()
        await self.dut_gatt.advertising_enable()
        return await self.dut_adapter.le_rand()

    def test_no_wake_suspend(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_no_wake_suspend())

    def test_no_wake_resume(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_no_wake_resume())

    def test_no_wake_suspend_then_resume(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_no_wake_suspend())
        asyncio.get_event_loop().run_until_complete(self.__verify_no_wake_resume())

    def test_no_wake_suspend_then_resume_then_suspend(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_no_wake_suspend())
        asyncio.get_event_loop().run_until_complete(self.__verify_no_wake_resume())
        asyncio.get_event_loop().run_until_complete(self.__verify_no_wake_suspend())

    def test_wakeful_suspend_no_a2dp(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_wakeful_suspend(False))

    def test_wakeful_resume_no_a2dp(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_wakeful_resume(False))

    def test_disconnected_suspend(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_disconnected_suspend())
    def test_wakeful_suspend_then_resume_no_a2dp(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_wakeful_suspend(False))
        asyncio.get_event_loop().run_until_complete(self.__verify_wakeful_resume(False))

    def test_disconnected_resume(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_disconnected_resume())
    def test_wakeful_suspend_then_resume_then_suspend_no_a2dp(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_wakeful_suspend(False))
        asyncio.get_event_loop().run_until_complete(self.__verify_wakeful_resume(False))
        asyncio.get_event_loop().run_until_complete(self.__verify_wakeful_suspend(False))

    def test_disconnected_suspend_then_resume(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_disconnected_suspend())
        asyncio.get_event_loop().run_until_complete(self.__verify_disconnected_resume())

    def test_disconnected_suspend_then_resume_then_suspend(self):
        asyncio.get_event_loop().run_until_complete(self.__verify_disconnected_suspend())
        asyncio.get_event_loop().run_until_complete(self.__verify_disconnected_resume())
        asyncio.get_event_loop().run_until_complete(self.__verify_disconnected_suspend())
if __name__ == "__main__":
    test_runner.main()
Loading