Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3e1bf44c authored by Aritra Sen's avatar Aritra Sen
Browse files

Add tests for testing HFP connection established b/w an HFP AG and HFP HF.

Bug: 261221271
Test: system/gd/cert/run --topshim --clean
Tag: #floss
Change-Id: I38e478cd6a5f3e8a6172600601a2d0f00bc34862
parent 264cf402
Loading
Loading
Loading
Loading
+72 −0
Original line number Original line Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.topshim.lib.topshim_base_test import TopshimBaseTest
from blueberry.tests.topshim.lib.topshim_device import TRANSPORT_CLASSIC

from mobly import test_runner


class HfpTest(TopshimBaseTest):

    def setup_test(self):
        super().setup_test()
        # Pair dut and cert device.
        self.dut().enable_inquiry_scan()
        self.cert().enable_inquiry_scan()
        self.dut().toggle_discovery(True)
        self.__paired_device = self.dut().find_device()
        self.dut().create_bond(address=self.__paired_device, transport=TRANSPORT_CLASSIC)

    def teardown_test(self):
        super().teardown_test()
        # Test teardown for dut and cert reset.
        self.dut().toggle_discovery(False)
        self.dut().disable_page_scan()
        self.cert().disable_page_scan()

    def test_hfp_connect_with_bond(self):
        state, _ = self.dut().start_slc(address=self.__paired_device)
        assertThat(state).isEqualTo("Connecting")
        state, _ = self.dut().wait_for_hfp_connection_state_change()
        assertThat(state).isEqualTo("Connected")
        state, conn_addr = self.dut().wait_for_hfp_connection_state_change()
        assertThat(state).isEqualTo("SlcConnected")
        assertThat(conn_addr).isEqualTo(self.__paired_device)

        #Extra steps to remove bonding to complete teardown.
        self.dut().remove_bonded_device(self.__paired_device)
        # This is required currently so that the HFP connection state change
        # callback doesn't affect other tests.
        self.dut().wait_for_hfp_connection_state_change()

    def test_hfp_disconnect_with_bond(self):
        state, _ = self.dut().start_slc(address=self.__paired_device)
        self.dut().wait_for_hfp_connection_state_change()  # To connected
        self.dut().wait_for_hfp_connection_state_change()  # To SLC connected

        # Actual test for stopping SLC connection.
        state, _ = self.dut().stop_slc(address=self.__paired_device)
        assertThat(state).isEqualTo("Disconnecting")
        state, _ = self.dut().wait_for_hfp_connection_state_change()
        assertThat(state).isEqualTo("Disconnected")
        #Extra steps to remove bonding to complete teardown.
        self.dut().remove_bonded_device(self.__paired_device)


if __name__ == "__main__":
    test_runner.main()
+5 −1
Original line number Original line Diff line number Diff line
@@ -13,7 +13,6 @@
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   See the License for the specific language governing permissions and
#   limitations under the License.
#   limitations under the License.

import asyncio
import asyncio
import grpc
import grpc


@@ -55,12 +54,14 @@ class HfpClient(AsyncClosable):
        """
        """
        await self.__hfp_stub.StartSlc(
        await self.__hfp_stub.StartSlc(
            facade_pb2.StartSlcRequest(connection=facade_pb2.Connection(cookie=address.encode())))
            facade_pb2.StartSlcRequest(connection=facade_pb2.Connection(cookie=address.encode())))
        return await self._listen_for_event(facade_pb2.EventType.HFP_CONNECTION_STATE)


    async def stop_slc(self, address):
    async def stop_slc(self, address):
        """
        """
        """
        """
        await self.__hfp_stub.StopSlc(
        await self.__hfp_stub.StopSlc(
            facade_pb2.StopSlcRequest(connection=facade_pb2.Connection(cookie=address.encode())))
            facade_pb2.StopSlcRequest(connection=facade_pb2.Connection(cookie=address.encode())))
        return await self._listen_for_event(facade_pb2.EventType.HFP_CONNECTION_STATE)


    async def connect_audio(self, address, is_sco_offload_enabled=False, force_cvsd=False):
    async def connect_audio(self, address, is_sco_offload_enabled=False, force_cvsd=False):
        """
        """
@@ -82,6 +83,9 @@ class HfpClient(AsyncClosable):
        await self.__hfp_stub.DisconnectAudio(
        await self.__hfp_stub.DisconnectAudio(
            facade_pb2.DisconnectAudioRequest(connection=facade_pb2.Connection(cookie=address.encode()), volume=volume))
            facade_pb2.DisconnectAudioRequest(connection=facade_pb2.Connection(cookie=address.encode()), volume=volume))


    async def wait_for_hfp_connection_state_change(self):
        return await self._listen_for_event(facade_pb2.EventType.HFP_CONNECTION_STATE)

    async def __get_next_event(self, event, future):
    async def __get_next_event(self, event, future):
        """Get the future of next event from the stream"""
        """Get the future of next event from the stream"""
        while True:
        while True:
+18 −0
Original line number Original line Diff line number Diff line
@@ -240,3 +240,21 @@ class TopshimDevice(AsyncClosable):
            return None
            return None


        return self.__post(waiter(f))
        return self.__post(waiter(f))

    def start_slc(self, address):
        f = self.__post(self.__hfp.start_slc(address))
        return self.__post(self.__hfp_connection_state_waiter(f))

    def stop_slc(self, address):
        f = self.__post(self.__hfp.stop_slc(address))
        return self.__post(self.__hfp_connection_state_waiter(f))

    def wait_for_hfp_connection_state_change(self):
        f = self.__post(self.__hfp.wait_for_hfp_connection_state_change())
        return self.__post(self.__hfp_connection_state_waiter(f))

    async def __hfp_connection_state_waiter(self, f):
        data = await f
        data_list = data.split(", ")
        state, address = data_list[0].strip(), data_list[1].strip()
        return (state, address)
+2 −1
Original line number Original line Diff line number Diff line
@@ -15,6 +15,7 @@
#   limitations under the License.
#   limitations under the License.


from blueberry.tests.topshim.adapter.adapter_test import AdapterTest
from blueberry.tests.topshim.adapter.adapter_test import AdapterTest
from blueberry.tests.topshim.hfp.hfp_test import HfpTest
from blueberry.tests.topshim.power.suspend_test import SuspendTest
from blueberry.tests.topshim.power.suspend_test import SuspendTest
from blueberry.tests.topshim.security.classic_security_test import ClassicSecurityTest
from blueberry.tests.topshim.security.classic_security_test import ClassicSecurityTest
from blueberry.tests.topshim.security.le_security_test import LeSecurityTest
from blueberry.tests.topshim.security.le_security_test import LeSecurityTest
@@ -22,7 +23,7 @@ from blueberry.tests.topshim.security.le_security_test import LeSecurityTest
from mobly import suite_runner
from mobly import suite_runner
import argparse
import argparse


ALL_TESTS = [AdapterTest, ClassicSecurityTest, LeSecurityTest, SuspendTest]
ALL_TESTS = [AdapterTest, ClassicSecurityTest, HfpTest, LeSecurityTest, SuspendTest]




def main():
def main():