Loading android/pandora/test/hfpclient_test.py +94 −40 Original line number Diff line number Diff line Loading @@ -46,6 +46,7 @@ from typing import Optional, Tuple, List, Dict SDP_PROFILE_SUPPORTED_FEATURES_ID = 0x0311 HFP_AG_FEATURE_HF_INDICATORS = (1 << 10) HFP_AG_FEATURE_DEFAULT = HFP_AG_FEATURE_HF_INDICATORS HFP_HF_FEATURE_HF_INDICATORS = (1 << 8) HFP_HF_FEATURE_DEFAULT = hex(0x01b5) Loading @@ -54,6 +55,8 @@ PROPERTY_HF_ENABLED = 'bluetooth.profile.hfp.hf.enabled' PROPERTY_HF_FEATURES = 'bluetooth.hfp.hf_client_features.config' PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY = 'bluetooth.headset_client.indicator.enhanced_driver_safety.enabled' HFP_VERSION_1_7 = 0x0107 class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc] devices: Optional[PandoraDevices] = None Loading Loading @@ -97,6 +100,7 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc] async def setup_test(self) -> None: await asyncio.gather(self.dut.reset(), self.ref.reset()) # TODO(b/286338264): Moving connecting and bonding methods to a shared util scripts async def make_classic_connection(self) -> Tuple[PandoraConnection, PandoraConnection]: dut_ref, ref_dut = await asyncio.gather( self.dut.aio.host.WaitConnection(address=self.ref.address), Loading Loading @@ -128,7 +132,7 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc] channel_number = rfcomm_server.listen(on_dlc) # Setup SDP records self.ref.device.sdp_service_records = make_sdp_records(channel_number) self.ref.device.sdp_service_records = make_bumble_ag_sdp_records(HFP_VERSION_1_7, channel_number, 0) # Connect and pair dut_ref, ref_dut = await self.make_classic_connection() Loading @@ -150,51 +154,26 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc] ref_dut_hfp_protocol = await self.make_hfp_connection() while True: line = await ref_dut_hfp_protocol.next_line() class TestAgServer(HfpAgServer): if line.startswith('AT+BRSF='): def on_brsf(self, hf_features: int) -> None: # HF indicators should be enabled hf_features = parse_hf_features(line) assert_not_equal(hf_features & HFP_HF_FEATURE_HF_INDICATORS, 0) ag_features = HFP_AG_FEATURE_HF_INDICATORS ref_dut_hfp_protocol.send_response_line(f'+BRSF: {ag_features}') ref_dut_hfp_protocol.send_response_line('OK') elif line.startswith('AT+BIND='): indicators = line[len('AT+BIND='):].split(',') return super().on_brsf(hf_features) def on_bind_list(self, indicators: list[int]) -> None: if enhanced_driver_safety_enabled: assert_in('1', indicators) assert_in(1, indicators) else: assert_not_in('1', indicators) ref_dut_hfp_protocol.send_response_line('OK') break elif line.startswith('AT+CIND=?'): ref_dut_hfp_protocol.send_response_line('+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' '("callheld",(0-2))') ref_dut_hfp_protocol.send_response_line('OK') elif line.startswith('AT+CIND?'): ref_dut_hfp_protocol.send_response_line('+CIND: 0,0,1,4,1,5,0') ref_dut_hfp_protocol.send_response_line('OK') elif line.startswith(( 'AT+CLIP=', 'AT+VGS=', 'AT+BIA=', 'AT+CMER=', 'AT+XEVENT=', 'AT+XAPL=', )): ref_dut_hfp_protocol.send_response_line('OK') else: ref_dut_hfp_protocol.send_response_line('ERROR') assert_not_in(1, indicators) self.terminated = True def parse_hf_features(response_line: str) -> int: assert response_line.startswith('AT+BRSF=') return int(response_line[len('AT+BRSF='):]) server = TestAgServer(ref_dut_hfp_protocol, ag_features=HFP_AG_FEATURE_HF_INDICATORS) await server.serve() def make_sdp_records(rfcomm_channel: int) -> Dict[int, List[ServiceAttribute]]: def make_bumble_ag_sdp_records(hfp_version: int, rfcomm_channel: int, ag_sdp_features: int) -> Dict[int, List[ServiceAttribute]]: return { 0x00010001: [ ServiceAttribute( Loading Loading @@ -223,18 +202,93 @@ def make_sdp_records(rfcomm_channel: int) -> Dict[int, List[ServiceAttribute]]: DataElement.sequence([ DataElement.sequence([ DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), DataElement.unsigned_integer_16(0x0107), DataElement.unsigned_integer_16(hfp_version), ]) ]), ), ServiceAttribute( SDP_PROFILE_SUPPORTED_FEATURES_ID, DataElement.unsigned_integer_16(0), DataElement.unsigned_integer_16(ag_sdp_features), ), ] } class HfpAgServer: enabled_hf_indicators: list[int] hf_features: int def __init__(self, protocol: HfpProtocol, ag_features: int = HFP_AG_FEATURE_DEFAULT) -> None: self.protocol = protocol self.ag_features = ag_features self.terminated = False self.hf_features = 0 # Unknown async def serve(self) -> None: while not self.terminated: line = await self.protocol.next_line() if line.startswith('AT+BRSF='): hf_features = int(line[len('AT+BRSF='):]) self.on_brsf(hf_features) elif line.startswith('AT+BIND=?'): self.on_bind_read_capabilities() elif line.startswith('AT+BIND='): indicators = [int(i) for i in line[len('AT+BIND='):].split(',')] self.on_bind_list(indicators) elif line.startswith('AT+BIND?'): self.on_bind_read_configuration() elif line.startswith('AT+CIND=?'): self.on_cind_read() elif line.startswith('AT+CIND?'): self.on_cind_test() # TODO(b/286226902): Implement handlers for these commands elif line.startswith(( 'AT+CLIP=', 'AT+VGS=', 'AT+BIA=', 'AT+CMER=', 'AT+XEVENT=', 'AT+XAPL=', )): self.protocol.send_response_line('OK') else: self.protocol.send_response_line('ERROR') def on_brsf(self, hf_features: int) -> None: self.hf_features = hf_features self.protocol.send_response_line(f'+BRSF: {self.ag_features}') self.protocol.send_response_line('OK') # AT+CIND? def on_cind_read(self) -> None: self.protocol.send_response_line('+CIND: 0,0,1,4,1,5,0') self.protocol.send_response_line('OK') # AT+CIND=? def on_cind_test(self) -> None: self.protocol.send_response_line('+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' '("callheld",(0-2))') self.protocol.send_response_line('OK') # AT+BIND= def on_bind_list(self, indicators: list[int]) -> None: self.enabled_hf_indicators = indicators[:] self.protocol.send_response_line('OK') # AT+BIND=? def on_bind_read_capabilities(self) -> None: self.protocol.send_response_line('+BIND: ' + ','.join(map(str, self.enabled_hf_indicators))) self.protocol.send_response_line('OK') # AT+BIND? def on_bind_read_configuration(self) -> None: for i in self.enabled_hf_indicators: self.protocol.send_response_line(f'+BIND: {i},1') self.protocol.send_response_line('OK') if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) test_runner.main() # type: ignore Loading
android/pandora/test/hfpclient_test.py +94 −40 Original line number Diff line number Diff line Loading @@ -46,6 +46,7 @@ from typing import Optional, Tuple, List, Dict SDP_PROFILE_SUPPORTED_FEATURES_ID = 0x0311 HFP_AG_FEATURE_HF_INDICATORS = (1 << 10) HFP_AG_FEATURE_DEFAULT = HFP_AG_FEATURE_HF_INDICATORS HFP_HF_FEATURE_HF_INDICATORS = (1 << 8) HFP_HF_FEATURE_DEFAULT = hex(0x01b5) Loading @@ -54,6 +55,8 @@ PROPERTY_HF_ENABLED = 'bluetooth.profile.hfp.hf.enabled' PROPERTY_HF_FEATURES = 'bluetooth.hfp.hf_client_features.config' PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY = 'bluetooth.headset_client.indicator.enhanced_driver_safety.enabled' HFP_VERSION_1_7 = 0x0107 class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc] devices: Optional[PandoraDevices] = None Loading Loading @@ -97,6 +100,7 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc] async def setup_test(self) -> None: await asyncio.gather(self.dut.reset(), self.ref.reset()) # TODO(b/286338264): Moving connecting and bonding methods to a shared util scripts async def make_classic_connection(self) -> Tuple[PandoraConnection, PandoraConnection]: dut_ref, ref_dut = await asyncio.gather( self.dut.aio.host.WaitConnection(address=self.ref.address), Loading Loading @@ -128,7 +132,7 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc] channel_number = rfcomm_server.listen(on_dlc) # Setup SDP records self.ref.device.sdp_service_records = make_sdp_records(channel_number) self.ref.device.sdp_service_records = make_bumble_ag_sdp_records(HFP_VERSION_1_7, channel_number, 0) # Connect and pair dut_ref, ref_dut = await self.make_classic_connection() Loading @@ -150,51 +154,26 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc] ref_dut_hfp_protocol = await self.make_hfp_connection() while True: line = await ref_dut_hfp_protocol.next_line() class TestAgServer(HfpAgServer): if line.startswith('AT+BRSF='): def on_brsf(self, hf_features: int) -> None: # HF indicators should be enabled hf_features = parse_hf_features(line) assert_not_equal(hf_features & HFP_HF_FEATURE_HF_INDICATORS, 0) ag_features = HFP_AG_FEATURE_HF_INDICATORS ref_dut_hfp_protocol.send_response_line(f'+BRSF: {ag_features}') ref_dut_hfp_protocol.send_response_line('OK') elif line.startswith('AT+BIND='): indicators = line[len('AT+BIND='):].split(',') return super().on_brsf(hf_features) def on_bind_list(self, indicators: list[int]) -> None: if enhanced_driver_safety_enabled: assert_in('1', indicators) assert_in(1, indicators) else: assert_not_in('1', indicators) ref_dut_hfp_protocol.send_response_line('OK') break elif line.startswith('AT+CIND=?'): ref_dut_hfp_protocol.send_response_line('+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' '("callheld",(0-2))') ref_dut_hfp_protocol.send_response_line('OK') elif line.startswith('AT+CIND?'): ref_dut_hfp_protocol.send_response_line('+CIND: 0,0,1,4,1,5,0') ref_dut_hfp_protocol.send_response_line('OK') elif line.startswith(( 'AT+CLIP=', 'AT+VGS=', 'AT+BIA=', 'AT+CMER=', 'AT+XEVENT=', 'AT+XAPL=', )): ref_dut_hfp_protocol.send_response_line('OK') else: ref_dut_hfp_protocol.send_response_line('ERROR') assert_not_in(1, indicators) self.terminated = True def parse_hf_features(response_line: str) -> int: assert response_line.startswith('AT+BRSF=') return int(response_line[len('AT+BRSF='):]) server = TestAgServer(ref_dut_hfp_protocol, ag_features=HFP_AG_FEATURE_HF_INDICATORS) await server.serve() def make_sdp_records(rfcomm_channel: int) -> Dict[int, List[ServiceAttribute]]: def make_bumble_ag_sdp_records(hfp_version: int, rfcomm_channel: int, ag_sdp_features: int) -> Dict[int, List[ServiceAttribute]]: return { 0x00010001: [ ServiceAttribute( Loading Loading @@ -223,18 +202,93 @@ def make_sdp_records(rfcomm_channel: int) -> Dict[int, List[ServiceAttribute]]: DataElement.sequence([ DataElement.sequence([ DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), DataElement.unsigned_integer_16(0x0107), DataElement.unsigned_integer_16(hfp_version), ]) ]), ), ServiceAttribute( SDP_PROFILE_SUPPORTED_FEATURES_ID, DataElement.unsigned_integer_16(0), DataElement.unsigned_integer_16(ag_sdp_features), ), ] } class HfpAgServer: enabled_hf_indicators: list[int] hf_features: int def __init__(self, protocol: HfpProtocol, ag_features: int = HFP_AG_FEATURE_DEFAULT) -> None: self.protocol = protocol self.ag_features = ag_features self.terminated = False self.hf_features = 0 # Unknown async def serve(self) -> None: while not self.terminated: line = await self.protocol.next_line() if line.startswith('AT+BRSF='): hf_features = int(line[len('AT+BRSF='):]) self.on_brsf(hf_features) elif line.startswith('AT+BIND=?'): self.on_bind_read_capabilities() elif line.startswith('AT+BIND='): indicators = [int(i) for i in line[len('AT+BIND='):].split(',')] self.on_bind_list(indicators) elif line.startswith('AT+BIND?'): self.on_bind_read_configuration() elif line.startswith('AT+CIND=?'): self.on_cind_read() elif line.startswith('AT+CIND?'): self.on_cind_test() # TODO(b/286226902): Implement handlers for these commands elif line.startswith(( 'AT+CLIP=', 'AT+VGS=', 'AT+BIA=', 'AT+CMER=', 'AT+XEVENT=', 'AT+XAPL=', )): self.protocol.send_response_line('OK') else: self.protocol.send_response_line('ERROR') def on_brsf(self, hf_features: int) -> None: self.hf_features = hf_features self.protocol.send_response_line(f'+BRSF: {self.ag_features}') self.protocol.send_response_line('OK') # AT+CIND? def on_cind_read(self) -> None: self.protocol.send_response_line('+CIND: 0,0,1,4,1,5,0') self.protocol.send_response_line('OK') # AT+CIND=? def on_cind_test(self) -> None: self.protocol.send_response_line('+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' '("callheld",(0-2))') self.protocol.send_response_line('OK') # AT+BIND= def on_bind_list(self, indicators: list[int]) -> None: self.enabled_hf_indicators = indicators[:] self.protocol.send_response_line('OK') # AT+BIND=? def on_bind_read_capabilities(self) -> None: self.protocol.send_response_line('+BIND: ' + ','.join(map(str, self.enabled_hf_indicators))) self.protocol.send_response_line('OK') # AT+BIND? def on_bind_read_configuration(self) -> None: for i in self.enabled_hf_indicators: self.protocol.send_response_line(f'+BIND: {i},1') self.protocol.send_response_line('OK') if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) test_runner.main() # type: ignore