Loading system/gd/cert/py_hal.py +1 −1 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ class PyHal(Closable): return self.acl_stream def send_hci_command(self, command): self.device.hal.SendCommand(hal_facade.Command(payload=bytes(command))) self.device.hal.SendCommand(hal_facade.Command(payload=bytes(command.Serialize()))) def send_acl(self, acl): self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl))) system/gd/hal/cert/simple_hal_test.py +238 −257 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ from datetime import timedelta from cert.gd_base_test import GdBaseTestClass from cert.event_stream import EventStream from cert.truth import assertThat from cert.py_hal import PyHal from google.protobuf import empty_pb2 from facade import rootservice_pb2 as facade_rootservice_pb2 from hal import facade_pb2 as hal_facade_pb2 Loading @@ -36,11 +37,16 @@ class SimpleHalTest(GdBaseTestClass): def setup_test(self): super().setup_test() self.send_dut_hci_command(hci_packets.ResetBuilder()) self.send_cert_hci_command(hci_packets.ResetBuilder()) self.dut_hal = PyHal(self.dut) self.cert_hal = PyHal(self.cert) def send_cert_hci_command(self, command): self.cert.hal.SendCommand(hal_facade_pb2.Command(payload=bytes(command.Serialize())), timeout=_GRPC_TIMEOUT) self.dut_hal.send_hci_command(hci_packets.ResetBuilder()) self.cert_hal.send_hci_command(hci_packets.ResetBuilder()) def teardown_test(self): self.dut_hal.close() self.cert_hal.close() super().teardown_test() def send_cert_acl_data(self, handle, pb_flag, b_flag, acl): lower = handle & 0xff Loading @@ -50,10 +56,7 @@ class SimpleHalTest(GdBaseTestClass): lower_length = len(acl) & 0xff upper_length = (len(acl) & 0xff00) >> 8 concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl)) self.cert.hal.SendAcl(hal_facade_pb2.AclPacket(payload=concatenated)) def send_dut_hci_command(self, command): self.dut.hal.SendCommand(hal_facade_pb2.Command(payload=bytes(command.Serialize())), timeout=_GRPC_TIMEOUT) self.cert_hal.send_acl(concatenated) def send_dut_acl_data(self, handle, pb_flag, b_flag, acl): lower = handle & 0xff Loading @@ -63,66 +66,55 @@ class SimpleHalTest(GdBaseTestClass): lower_length = len(acl) & 0xff upper_length = (len(acl) & 0xff00) >> 8 concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl)) self.dut.hal.SendAcl(hal_facade_pb2.AclPacket(payload=concatenated), timeout=_GRPC_TIMEOUT) def test_none_event(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream: assertThat(hci_event_stream).emitsNone(timeout=timedelta(seconds=1)) self.dut_hal.send_acl(concatenated) def test_fetch_hci_event(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream: self.send_dut_hci_command( hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) self.dut_hal.send_hci_command( hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) event = hci_packets.LeAddDeviceToConnectListCompleteBuilder(1, hci_packets.ErrorCode.SUCCESS) assertThat(hci_event_stream).emits(lambda packet: bytes(event.Serialize()) in packet.payload) assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: bytes(event.Serialize()) in packet.payload) def test_loopback_hci_command(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream: self.send_dut_hci_command(hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL)) self.dut_hal.send_hci_command(hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL)) command = hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01') self.send_dut_hci_command(command) self.dut_hal.send_hci_command(command) assertThat(hci_event_stream).emits(lambda packet: bytes(command.Serialize()) in packet.payload) assertThat( self.dut_hal.get_hci_event_stream()).emits(lambda packet: bytes(command.Serialize()) in packet.payload) def test_inquiry_from_dut(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream: self.cert_hal.send_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) self.send_cert_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) lap = hci_packets.Lap() lap.lap = 0x33 self.send_dut_hci_command(hci_packets.InquiryBuilder(lap, 0x30, 0xff)) self.dut_hal.send_hci_command(hci_packets.InquiryBuilder(lap, 0x30, 0xff)) assertThat(hci_event_stream).emits(lambda packet: b'\x02\x0f' in packet.payload assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'\x02\x0f' in packet.payload # Expecting an HCI Event (code 0x02, length 0x0f) ) def test_le_ad_scan_cert_advertises(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream: # DUT scans self.send_dut_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) phy_scan_params = hci_packets.PhyScanParameters() phy_scan_params.le_scan_interval = 6553 phy_scan_params.le_scan_window = 6553 phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1, [phy_scan_params])) self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0)) # CERT Advertises advertising_handle = 0 self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( advertising_handle, hci_packets.LegacyAdvertisingProperties.ADV_IND, Loading @@ -138,14 +130,14 @@ class SimpleHalTest(GdBaseTestClass): hci_packets.Enable.DISABLED # Scan request notification )) self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01')) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_A_Cert')) self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingDataBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) Loading @@ -153,28 +145,23 @@ class SimpleHalTest(GdBaseTestClass): enabled_set.advertising_handle = advertising_handle enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) assertThat(hci_event_stream).emits(lambda packet: b'Im_A_Cert' in packet.payload) assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload) # Disable Advertising self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.DISABLED, [enabled_set])) # Disable Scanning self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0)) def test_le_connection_dut_advertises(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream, \ EventStream(self.cert.hal.StreamEvents(empty_pb2.Empty())) as cert_hci_event_stream, \ EventStream(self.dut.hal.StreamAcl(empty_pb2.Empty())) as acl_data_stream, \ EventStream(self.cert.hal.StreamAcl(empty_pb2.Empty())) as cert_acl_data_stream: # Cert Connects self.send_cert_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) self.cert_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() phy_scan_params.scan_interval = 0x60 phy_scan_params.scan_window = 0x30 Loading @@ -184,15 +171,14 @@ class SimpleHalTest(GdBaseTestClass): phy_scan_params.supervision_timeout = 0x1f4 phy_scan_params.min_ce_length = 0 phy_scan_params.max_ce_length = 0 self.send_cert_hci_command( hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, '0D:05:04:03:02:01', 1, [phy_scan_params])) self.cert_hal.send_hci_command( hci_packets.LeExtendedCreateConnectionBuilder( hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, '0D:05:04:03:02:01', 1, [phy_scan_params])) # DUT Advertises advertising_handle = 0 self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( advertising_handle, hci_packets.LegacyAdvertisingProperties.ADV_IND, Loading @@ -208,14 +194,14 @@ class SimpleHalTest(GdBaseTestClass): hci_packets.Enable.DISABLED # Scan request notification )) self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0D:05:04:03:02:01')) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_The_DUT')) self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingDataBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) Loading @@ -224,7 +210,7 @@ class SimpleHalTest(GdBaseTestClass): gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME gap_short_name.data = list(bytes(b'Im_The_D')) self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name])) Loading @@ -233,7 +219,7 @@ class SimpleHalTest(GdBaseTestClass): enabled_set.advertising_handle = advertising_handle enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) conn_handle = 0xfff Loading @@ -249,10 +235,10 @@ class SimpleHalTest(GdBaseTestClass): return True return False assertThat(cert_hci_event_stream).emits(payload_handle) assertThat(self.cert_hal.get_hci_event_stream()).emits(payload_handle) cert_handle = conn_handle conn_handle = 0xfff assertThat(hci_event_stream).emits(payload_handle) assertThat(self.dut_hal.get_hci_event_stream()).emits(payload_handle) dut_handle = conn_handle # Send ACL Data Loading @@ -261,18 +247,14 @@ class SimpleHalTest(GdBaseTestClass): self.send_cert_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeMoreAclData')) assertThat(cert_acl_data_stream).emits(lambda packet: b'SomeAclData' in packet.payload) assertThat(acl_data_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload) assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) assertThat(self.dut_hal.get_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload) def test_le_connect_list_connection_cert_advertises(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream, \ EventStream(self.cert.hal.StreamEvents(empty_pb2.Empty())) as cert_hci_event_stream: # DUT Connects self.send_dut_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.send_dut_hci_command( hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) self.dut_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hal.send_hci_command( hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() phy_scan_params.scan_interval = 0x60 phy_scan_params.scan_window = 0x30 Loading @@ -282,15 +264,14 @@ class SimpleHalTest(GdBaseTestClass): phy_scan_params.supervision_timeout = 0x1f4 phy_scan_params.min_ce_length = 0 phy_scan_params.max_ce_length = 0 self.send_dut_hci_command( hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params])) self.dut_hal.send_hci_command( hci_packets.LeExtendedCreateConnectionBuilder( hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params])) # CERT Advertises advertising_handle = 1 self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( advertising_handle, hci_packets.LegacyAdvertisingProperties.ADV_IND, Loading @@ -306,14 +287,14 @@ class SimpleHalTest(GdBaseTestClass): hci_packets.Enable.DISABLED # Scan request notification )) self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01')) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_A_Cert')) self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingDataBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) Loading @@ -321,11 +302,11 @@ class SimpleHalTest(GdBaseTestClass): enabled_set.advertising_handle = 1 enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) # LeConnectionComplete cert_hci_event_stream.assert_event_occurs( assertThat(self.cert_hal.get_hci_event_stream()).emits( lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20)) hci_event_stream.assert_event_occurs( assertThat(self.dut_hal.get_hci_event_stream()).emits( lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20)) system/gd/hci/cert/direct_hci_test.py +2 −2 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ class DirectHciTest(GdBaseTestClass): super().setup_test() self.dut_hci = PyHci(self.dut, acl_streaming=True) self.cert_hal = PyHal(self.cert) self.cert_hal.send_hci_command(hci_packets.ResetBuilder().Serialize()) self.cert_hal.send_hci_command(hci_packets.ResetBuilder()) def teardown_test(self): self.dut_hci.close() Loading @@ -44,7 +44,7 @@ class DirectHciTest(GdBaseTestClass): super().teardown_test() def send_hal_hci_command(self, command): self.cert_hal.send_hci_command(bytes(command.Serialize())) self.cert_hal.send_hci_command(command) def enqueue_acl_data(self, handle, pb_flag, b_flag, acl): acl_msg = hci_facade.AclMsg( Loading Loading
system/gd/cert/py_hal.py +1 −1 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ class PyHal(Closable): return self.acl_stream def send_hci_command(self, command): self.device.hal.SendCommand(hal_facade.Command(payload=bytes(command))) self.device.hal.SendCommand(hal_facade.Command(payload=bytes(command.Serialize()))) def send_acl(self, acl): self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl)))
system/gd/hal/cert/simple_hal_test.py +238 −257 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ from datetime import timedelta from cert.gd_base_test import GdBaseTestClass from cert.event_stream import EventStream from cert.truth import assertThat from cert.py_hal import PyHal from google.protobuf import empty_pb2 from facade import rootservice_pb2 as facade_rootservice_pb2 from hal import facade_pb2 as hal_facade_pb2 Loading @@ -36,11 +37,16 @@ class SimpleHalTest(GdBaseTestClass): def setup_test(self): super().setup_test() self.send_dut_hci_command(hci_packets.ResetBuilder()) self.send_cert_hci_command(hci_packets.ResetBuilder()) self.dut_hal = PyHal(self.dut) self.cert_hal = PyHal(self.cert) def send_cert_hci_command(self, command): self.cert.hal.SendCommand(hal_facade_pb2.Command(payload=bytes(command.Serialize())), timeout=_GRPC_TIMEOUT) self.dut_hal.send_hci_command(hci_packets.ResetBuilder()) self.cert_hal.send_hci_command(hci_packets.ResetBuilder()) def teardown_test(self): self.dut_hal.close() self.cert_hal.close() super().teardown_test() def send_cert_acl_data(self, handle, pb_flag, b_flag, acl): lower = handle & 0xff Loading @@ -50,10 +56,7 @@ class SimpleHalTest(GdBaseTestClass): lower_length = len(acl) & 0xff upper_length = (len(acl) & 0xff00) >> 8 concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl)) self.cert.hal.SendAcl(hal_facade_pb2.AclPacket(payload=concatenated)) def send_dut_hci_command(self, command): self.dut.hal.SendCommand(hal_facade_pb2.Command(payload=bytes(command.Serialize())), timeout=_GRPC_TIMEOUT) self.cert_hal.send_acl(concatenated) def send_dut_acl_data(self, handle, pb_flag, b_flag, acl): lower = handle & 0xff Loading @@ -63,66 +66,55 @@ class SimpleHalTest(GdBaseTestClass): lower_length = len(acl) & 0xff upper_length = (len(acl) & 0xff00) >> 8 concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl)) self.dut.hal.SendAcl(hal_facade_pb2.AclPacket(payload=concatenated), timeout=_GRPC_TIMEOUT) def test_none_event(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream: assertThat(hci_event_stream).emitsNone(timeout=timedelta(seconds=1)) self.dut_hal.send_acl(concatenated) def test_fetch_hci_event(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream: self.send_dut_hci_command( hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) self.dut_hal.send_hci_command( hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) event = hci_packets.LeAddDeviceToConnectListCompleteBuilder(1, hci_packets.ErrorCode.SUCCESS) assertThat(hci_event_stream).emits(lambda packet: bytes(event.Serialize()) in packet.payload) assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: bytes(event.Serialize()) in packet.payload) def test_loopback_hci_command(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream: self.send_dut_hci_command(hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL)) self.dut_hal.send_hci_command(hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL)) command = hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01') self.send_dut_hci_command(command) self.dut_hal.send_hci_command(command) assertThat(hci_event_stream).emits(lambda packet: bytes(command.Serialize()) in packet.payload) assertThat( self.dut_hal.get_hci_event_stream()).emits(lambda packet: bytes(command.Serialize()) in packet.payload) def test_inquiry_from_dut(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream: self.cert_hal.send_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) self.send_cert_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) lap = hci_packets.Lap() lap.lap = 0x33 self.send_dut_hci_command(hci_packets.InquiryBuilder(lap, 0x30, 0xff)) self.dut_hal.send_hci_command(hci_packets.InquiryBuilder(lap, 0x30, 0xff)) assertThat(hci_event_stream).emits(lambda packet: b'\x02\x0f' in packet.payload assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'\x02\x0f' in packet.payload # Expecting an HCI Event (code 0x02, length 0x0f) ) def test_le_ad_scan_cert_advertises(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream: # DUT scans self.send_dut_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) phy_scan_params = hci_packets.PhyScanParameters() phy_scan_params.le_scan_interval = 6553 phy_scan_params.le_scan_window = 6553 phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1, [phy_scan_params])) self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0)) # CERT Advertises advertising_handle = 0 self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( advertising_handle, hci_packets.LegacyAdvertisingProperties.ADV_IND, Loading @@ -138,14 +130,14 @@ class SimpleHalTest(GdBaseTestClass): hci_packets.Enable.DISABLED # Scan request notification )) self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01')) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_A_Cert')) self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingDataBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) Loading @@ -153,28 +145,23 @@ class SimpleHalTest(GdBaseTestClass): enabled_set.advertising_handle = advertising_handle enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) assertThat(hci_event_stream).emits(lambda packet: b'Im_A_Cert' in packet.payload) assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload) # Disable Advertising self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.DISABLED, [enabled_set])) # Disable Scanning self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0)) def test_le_connection_dut_advertises(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream, \ EventStream(self.cert.hal.StreamEvents(empty_pb2.Empty())) as cert_hci_event_stream, \ EventStream(self.dut.hal.StreamAcl(empty_pb2.Empty())) as acl_data_stream, \ EventStream(self.cert.hal.StreamAcl(empty_pb2.Empty())) as cert_acl_data_stream: # Cert Connects self.send_cert_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) self.cert_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() phy_scan_params.scan_interval = 0x60 phy_scan_params.scan_window = 0x30 Loading @@ -184,15 +171,14 @@ class SimpleHalTest(GdBaseTestClass): phy_scan_params.supervision_timeout = 0x1f4 phy_scan_params.min_ce_length = 0 phy_scan_params.max_ce_length = 0 self.send_cert_hci_command( hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, '0D:05:04:03:02:01', 1, [phy_scan_params])) self.cert_hal.send_hci_command( hci_packets.LeExtendedCreateConnectionBuilder( hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, '0D:05:04:03:02:01', 1, [phy_scan_params])) # DUT Advertises advertising_handle = 0 self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( advertising_handle, hci_packets.LegacyAdvertisingProperties.ADV_IND, Loading @@ -208,14 +194,14 @@ class SimpleHalTest(GdBaseTestClass): hci_packets.Enable.DISABLED # Scan request notification )) self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0D:05:04:03:02:01')) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_The_DUT')) self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingDataBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) Loading @@ -224,7 +210,7 @@ class SimpleHalTest(GdBaseTestClass): gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME gap_short_name.data = list(bytes(b'Im_The_D')) self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name])) Loading @@ -233,7 +219,7 @@ class SimpleHalTest(GdBaseTestClass): enabled_set.advertising_handle = advertising_handle enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.send_dut_hci_command( self.dut_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) conn_handle = 0xfff Loading @@ -249,10 +235,10 @@ class SimpleHalTest(GdBaseTestClass): return True return False assertThat(cert_hci_event_stream).emits(payload_handle) assertThat(self.cert_hal.get_hci_event_stream()).emits(payload_handle) cert_handle = conn_handle conn_handle = 0xfff assertThat(hci_event_stream).emits(payload_handle) assertThat(self.dut_hal.get_hci_event_stream()).emits(payload_handle) dut_handle = conn_handle # Send ACL Data Loading @@ -261,18 +247,14 @@ class SimpleHalTest(GdBaseTestClass): self.send_cert_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeMoreAclData')) assertThat(cert_acl_data_stream).emits(lambda packet: b'SomeAclData' in packet.payload) assertThat(acl_data_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload) assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) assertThat(self.dut_hal.get_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload) def test_le_connect_list_connection_cert_advertises(self): with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream, \ EventStream(self.cert.hal.StreamEvents(empty_pb2.Empty())) as cert_hci_event_stream: # DUT Connects self.send_dut_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.send_dut_hci_command( hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) self.dut_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hal.send_hci_command( hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() phy_scan_params.scan_interval = 0x60 phy_scan_params.scan_window = 0x30 Loading @@ -282,15 +264,14 @@ class SimpleHalTest(GdBaseTestClass): phy_scan_params.supervision_timeout = 0x1f4 phy_scan_params.min_ce_length = 0 phy_scan_params.max_ce_length = 0 self.send_dut_hci_command( hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params])) self.dut_hal.send_hci_command( hci_packets.LeExtendedCreateConnectionBuilder( hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params])) # CERT Advertises advertising_handle = 1 self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( advertising_handle, hci_packets.LegacyAdvertisingProperties.ADV_IND, Loading @@ -306,14 +287,14 @@ class SimpleHalTest(GdBaseTestClass): hci_packets.Enable.DISABLED # Scan request notification )) self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01')) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_A_Cert')) self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingDataBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) Loading @@ -321,11 +302,11 @@ class SimpleHalTest(GdBaseTestClass): enabled_set.advertising_handle = 1 enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.send_cert_hci_command( self.cert_hal.send_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) # LeConnectionComplete cert_hci_event_stream.assert_event_occurs( assertThat(self.cert_hal.get_hci_event_stream()).emits( lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20)) hci_event_stream.assert_event_occurs( assertThat(self.dut_hal.get_hci_event_stream()).emits( lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20))
system/gd/hci/cert/direct_hci_test.py +2 −2 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ class DirectHciTest(GdBaseTestClass): super().setup_test() self.dut_hci = PyHci(self.dut, acl_streaming=True) self.cert_hal = PyHal(self.cert) self.cert_hal.send_hci_command(hci_packets.ResetBuilder().Serialize()) self.cert_hal.send_hci_command(hci_packets.ResetBuilder()) def teardown_test(self): self.dut_hci.close() Loading @@ -44,7 +44,7 @@ class DirectHciTest(GdBaseTestClass): super().teardown_test() def send_hal_hci_command(self, command): self.cert_hal.send_hci_command(bytes(command.Serialize())) self.cert_hal.send_hci_command(command) def enqueue_acl_data(self, handle, pb_flag, b_flag, acl): acl_msg = hci_facade.AclMsg( Loading