Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9fd948f5 authored by Josh Wu's avatar Josh Wu
Browse files

Avatar: Provide AG default behavior

Bug: 286226902
Test: avatar run
Change-Id: I774da5ae1f53471047a84ba86dbfd826f292d645
parent 349be0e7
Loading
Loading
Loading
Loading
+94 −40
Original line number Original line Diff line number Diff line
@@ -46,6 +46,7 @@ from typing import Optional, Tuple, List, Dict
SDP_PROFILE_SUPPORTED_FEATURES_ID = 0x0311
SDP_PROFILE_SUPPORTED_FEATURES_ID = 0x0311


HFP_AG_FEATURE_HF_INDICATORS = (1 << 10)
HFP_AG_FEATURE_HF_INDICATORS = (1 << 10)
HFP_AG_FEATURE_DEFAULT = HFP_AG_FEATURE_HF_INDICATORS


HFP_HF_FEATURE_HF_INDICATORS = (1 << 8)
HFP_HF_FEATURE_HF_INDICATORS = (1 << 8)
HFP_HF_FEATURE_DEFAULT = hex(0x01b5)
HFP_HF_FEATURE_DEFAULT = hex(0x01b5)
@@ -54,6 +55,8 @@ PROPERTY_HF_ENABLED = 'bluetooth.profile.hfp.hf.enabled'
PROPERTY_HF_FEATURES = 'bluetooth.hfp.hf_client_features.config'
PROPERTY_HF_FEATURES = 'bluetooth.hfp.hf_client_features.config'
PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY = 'bluetooth.headset_client.indicator.enhanced_driver_safety.enabled'
PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY = 'bluetooth.headset_client.indicator.enhanced_driver_safety.enabled'


HFP_VERSION_1_7 = 0x0107



class HfpClientTest(base_test.BaseTestClass):  # type: ignore[misc]
class HfpClientTest(base_test.BaseTestClass):  # type: ignore[misc]
    devices: Optional[PandoraDevices] = None
    devices: Optional[PandoraDevices] = None
@@ -97,6 +100,7 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc]
    async def setup_test(self) -> None:
    async def setup_test(self) -> None:
        await asyncio.gather(self.dut.reset(), self.ref.reset())
        await asyncio.gather(self.dut.reset(), self.ref.reset())


    # TODO(b/286338264): Moving connecting and bonding methods to a shared util scripts
    async def make_classic_connection(self) -> Tuple[PandoraConnection, PandoraConnection]:
    async def make_classic_connection(self) -> Tuple[PandoraConnection, PandoraConnection]:
        dut_ref, ref_dut = await asyncio.gather(
        dut_ref, ref_dut = await asyncio.gather(
            self.dut.aio.host.WaitConnection(address=self.ref.address),
            self.dut.aio.host.WaitConnection(address=self.ref.address),
@@ -128,7 +132,7 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc]
        channel_number = rfcomm_server.listen(on_dlc)
        channel_number = rfcomm_server.listen(on_dlc)


        # Setup SDP records
        # Setup SDP records
        self.ref.device.sdp_service_records = make_sdp_records(channel_number)
        self.ref.device.sdp_service_records = make_bumble_ag_sdp_records(HFP_VERSION_1_7, channel_number, 0)


        # Connect and pair
        # Connect and pair
        dut_ref, ref_dut = await self.make_classic_connection()
        dut_ref, ref_dut = await self.make_classic_connection()
@@ -150,51 +154,26 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc]


        ref_dut_hfp_protocol = await self.make_hfp_connection()
        ref_dut_hfp_protocol = await self.make_hfp_connection()


        while True:
        class TestAgServer(HfpAgServer):
            line = await ref_dut_hfp_protocol.next_line()


            if line.startswith('AT+BRSF='):
            def on_brsf(self, hf_features: int) -> None:
                # HF indicators should be enabled
                # HF indicators should be enabled
                hf_features = parse_hf_features(line)
                assert_not_equal(hf_features & HFP_HF_FEATURE_HF_INDICATORS, 0)
                assert_not_equal(hf_features & HFP_HF_FEATURE_HF_INDICATORS, 0)
                ag_features = HFP_AG_FEATURE_HF_INDICATORS
                return super().on_brsf(hf_features)
                ref_dut_hfp_protocol.send_response_line(f'+BRSF: {ag_features}')

                ref_dut_hfp_protocol.send_response_line('OK')
            def on_bind_list(self, indicators: list[int]) -> None:
            elif line.startswith('AT+BIND='):
                indicators = line[len('AT+BIND='):].split(',')
                if enhanced_driver_safety_enabled:
                if enhanced_driver_safety_enabled:
                    assert_in('1', indicators)
                    assert_in(1, indicators)
                else:
                else:
                    assert_not_in('1', indicators)
                    assert_not_in(1, indicators)
                ref_dut_hfp_protocol.send_response_line('OK')
                self.terminated = True
                break
            elif line.startswith('AT+CIND=?'):
                ref_dut_hfp_protocol.send_response_line('+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
                                                        '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
                                                        '("callheld",(0-2))')
                ref_dut_hfp_protocol.send_response_line('OK')
            elif line.startswith('AT+CIND?'):
                ref_dut_hfp_protocol.send_response_line('+CIND: 0,0,1,4,1,5,0')
                ref_dut_hfp_protocol.send_response_line('OK')
            elif line.startswith((
                    'AT+CLIP=',
                    'AT+VGS=',
                    'AT+BIA=',
                    'AT+CMER=',
                    'AT+XEVENT=',
                    'AT+XAPL=',
            )):
                ref_dut_hfp_protocol.send_response_line('OK')
            else:
                ref_dut_hfp_protocol.send_response_line('ERROR')



def parse_hf_features(response_line: str) -> int:
        server = TestAgServer(ref_dut_hfp_protocol, ag_features=HFP_AG_FEATURE_HF_INDICATORS)
    assert response_line.startswith('AT+BRSF=')
        await server.serve()
    return int(response_line[len('AT+BRSF='):])




def make_sdp_records(rfcomm_channel: int) -> Dict[int, List[ServiceAttribute]]:
def make_bumble_ag_sdp_records(hfp_version: int, rfcomm_channel: int,
                               ag_sdp_features: int) -> Dict[int, List[ServiceAttribute]]:
    return {
    return {
        0x00010001: [
        0x00010001: [
            ServiceAttribute(
            ServiceAttribute(
@@ -223,18 +202,93 @@ def make_sdp_records(rfcomm_channel: int) -> Dict[int, List[ServiceAttribute]]:
                DataElement.sequence([
                DataElement.sequence([
                    DataElement.sequence([
                    DataElement.sequence([
                        DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
                        DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
                        DataElement.unsigned_integer_16(0x0107),
                        DataElement.unsigned_integer_16(hfp_version),
                    ])
                    ])
                ]),
                ]),
            ),
            ),
            ServiceAttribute(
            ServiceAttribute(
                SDP_PROFILE_SUPPORTED_FEATURES_ID,
                SDP_PROFILE_SUPPORTED_FEATURES_ID,
                DataElement.unsigned_integer_16(0),
                DataElement.unsigned_integer_16(ag_sdp_features),
            ),
            ),
        ]
        ]
    }
    }




class HfpAgServer:
    enabled_hf_indicators: list[int]
    hf_features: int

    def __init__(self, protocol: HfpProtocol, ag_features: int = HFP_AG_FEATURE_DEFAULT) -> None:
        self.protocol = protocol
        self.ag_features = ag_features
        self.terminated = False
        self.hf_features = 0  # Unknown

    async def serve(self) -> None:
        while not self.terminated:
            line = await self.protocol.next_line()

            if line.startswith('AT+BRSF='):
                hf_features = int(line[len('AT+BRSF='):])
                self.on_brsf(hf_features)
            elif line.startswith('AT+BIND=?'):
                self.on_bind_read_capabilities()
            elif line.startswith('AT+BIND='):
                indicators = [int(i) for i in line[len('AT+BIND='):].split(',')]
                self.on_bind_list(indicators)
            elif line.startswith('AT+BIND?'):
                self.on_bind_read_configuration()
            elif line.startswith('AT+CIND=?'):
                self.on_cind_read()
            elif line.startswith('AT+CIND?'):
                self.on_cind_test()
            # TODO(b/286226902): Implement handlers for these commands
            elif line.startswith((
                    'AT+CLIP=',
                    'AT+VGS=',
                    'AT+BIA=',
                    'AT+CMER=',
                    'AT+XEVENT=',
                    'AT+XAPL=',
            )):
                self.protocol.send_response_line('OK')
            else:
                self.protocol.send_response_line('ERROR')

    def on_brsf(self, hf_features: int) -> None:
        self.hf_features = hf_features
        self.protocol.send_response_line(f'+BRSF: {self.ag_features}')
        self.protocol.send_response_line('OK')

    # AT+CIND?
    def on_cind_read(self) -> None:
        self.protocol.send_response_line('+CIND: 0,0,1,4,1,5,0')
        self.protocol.send_response_line('OK')

    # AT+CIND=?
    def on_cind_test(self) -> None:
        self.protocol.send_response_line('+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
                                         '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
                                         '("callheld",(0-2))')
        self.protocol.send_response_line('OK')

    # AT+BIND=
    def on_bind_list(self, indicators: list[int]) -> None:
        self.enabled_hf_indicators = indicators[:]
        self.protocol.send_response_line('OK')

    # AT+BIND=?
    def on_bind_read_capabilities(self) -> None:
        self.protocol.send_response_line('+BIND: ' + ','.join(map(str, self.enabled_hf_indicators)))
        self.protocol.send_response_line('OK')

    # AT+BIND?
    def on_bind_read_configuration(self) -> None:
        for i in self.enabled_hf_indicators:
            self.protocol.send_response_line(f'+BIND: {i},1')
        self.protocol.send_response_line('OK')


if __name__ == '__main__':
if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    logging.basicConfig(level=logging.DEBUG)
    test_runner.main()  # type: ignore
    test_runner.main()  # type: ignore