Loading system/gd/hci/cert/acl_manager_test.py +11 −8 Original line number Diff line number Diff line Loading @@ -82,6 +82,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) # CERT Enables scans and gets its address self.enqueue_hci_command( hci_packets.WriteScanEnableBuilder( Loading @@ -104,7 +108,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs( get_address_from_complete) Loading @@ -115,6 +118,8 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS), address=bytes(cert_address, 'utf8')))) as connection_event_stream: connection_event_asserts = EventAsserts(connection_event_stream) connection_request = None def get_connect_request(packet): Loading Loading @@ -162,7 +167,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): )) # DUT gets a connection complete event and sends and receives connection_event_asserts = EventAsserts(connection_event_stream) handle = 0xfff connection_event_asserts.assert_event_occurs(get_handle) Loading @@ -173,8 +177,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' ))) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: b'SomeMoreAclData' in packet.data) acl_data_asserts.assert_event_occurs( Loading @@ -187,6 +189,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) acl_data_asserts = EventAsserts(acl_data_stream) # CERT Enables scans and gets its address self.enqueue_hci_command( hci_packets.WriteScanEnableBuilder( Loading @@ -209,7 +214,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs( get_address_from_complete) Loading @@ -220,6 +224,8 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS), address=bytes(cert_address, 'utf8')))) as connection_event_stream: connection_event_asserts = EventAsserts(connection_event_stream) connection_request = None def get_connect_request(packet): Loading Loading @@ -258,8 +264,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): cert_hci_event_asserts.assert_event_occurs(get_handle) cert_handle = handle acl_data_asserts = EventAsserts(acl_data_stream) self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_AUTOMATICALLY_FLUSHABLE, Loading @@ -276,7 +280,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): bytes(b'\x88\x13\x07\x00' + b'Hello' * 1000)) # DUT gets a connection complete event and sends and receives connection_event_asserts = EventAsserts(connection_event_stream) connection_event_asserts.assert_event_occurs(get_handle) acl_data_asserts.assert_event_occurs( Loading system/gd/hci/cert/direct_hci_test.py +23 −21 Original line number Diff line number Diff line Loading @@ -102,6 +102,7 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): with EventCallbackStream( self.device_under_test.hci.FetchEvents( empty_proto.Empty())) as hci_event_stream: hci_event_asserts = EventAsserts(hci_event_stream) self.enqueue_hci_command( hci_packets.WriteLoopbackModeBuilder( Loading @@ -111,7 +112,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): self.enqueue_hci_command(cmd2loop, True) looped_bytes = bytes(cmd2loop.Serialize()) hci_event_asserts = EventAsserts(hci_event_stream) hci_event_asserts.assert_event_occurs( lambda packet: looped_bytes in packet.event) Loading @@ -120,6 +120,9 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): with EventCallbackStream( self.device_under_test.hci.FetchEvents( empty_proto.Empty())) as hci_event_stream: hci_event_asserts = EventAsserts(hci_event_stream) self.send_hal_hci_command( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) Loading @@ -127,7 +130,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): lap.lap = 0x33 self.enqueue_hci_command( hci_packets.InquiryBuilder(lap, 0x30, 0xff), False) hci_event_asserts = EventAsserts(hci_event_stream) hci_event_asserts.assert_event_occurs( lambda packet: b'\x02\x0f' in packet.event # Expecting an HCI Event (code 0x02, length 0x0f) Loading @@ -139,6 +141,8 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): self.device_under_test.hci.FetchLeSubevents( empty_proto.Empty())) as hci_le_event_stream: hci_event_asserts = EventAsserts(hci_le_event_stream) # DUT Scans self.enqueue_hci_command( hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'), Loading Loading @@ -176,7 +180,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): hci_packets.LeSetAdvertisingEnableBuilder( hci_packets.Enable.ENABLED)) hci_event_asserts = EventAsserts(hci_le_event_stream) hci_event_asserts.assert_event_occurs( lambda packet: b'Im_A_Cert' in packet.event) Loading @@ -195,6 +198,11 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: le_event_asserts = EventAsserts(le_event_stream) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) self.send_hal_hci_command( hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) Loading Loading @@ -259,9 +267,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): return True return False le_event_asserts = EventAsserts(le_event_stream) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs(payload_handle) cert_handle = conn_handle conn_handle = 0xfff Loading @@ -286,8 +291,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData')) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: logging.debug(packet.payload) or b'SomeAclData' in packet.payload ) Loading Loading @@ -353,6 +356,11 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) hci_event_asserts = EventAsserts(hci_event_stream) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) address = hci_packets.Address() def get_address_from_complete(packet): Loading @@ -371,7 +379,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): # CERT Enables scans and gets its address self.send_hal_hci_command(hci_packets.ReadBdAddrBuilder()) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs( get_address_from_complete) self.send_hal_hci_command( Loading Loading @@ -427,8 +434,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): packet_bytes = packet.payload return get_handle(packet_bytes) hci_event_asserts = EventAsserts(hci_event_stream) cert_hci_event_asserts.assert_event_occurs(payload_handle) cert_handle = conn_handle conn_handle = 0xfff Loading @@ -453,8 +458,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData')) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: b'SomeAclData' in packet.payload) acl_data_asserts.assert_event_occurs( Loading @@ -468,6 +471,11 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: hci_event_asserts = EventAsserts(hci_event_stream) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) address = hci_packets.Address() def get_address_from_complete(packet): Loading @@ -489,8 +497,7 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True) self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) dut_hci_event_asserts = EventAsserts(hci_event_stream) dut_hci_event_asserts.assert_event_occurs(get_address_from_complete) hci_event_asserts.assert_event_occurs(get_address_from_complete) # Cert Connects self.send_hal_hci_command( Loading @@ -512,7 +519,7 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): return True return False dut_hci_event_asserts.assert_event_occurs(get_connect_request) hci_event_asserts.assert_event_occurs(get_connect_request) self.enqueue_hci_command( hci_packets.AcceptConnectionRequestBuilder( connection_request.GetBdAddr(), Loading Loading @@ -540,9 +547,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): packet_bytes = packet.payload return get_handle(packet_bytes) hci_event_asserts = EventAsserts(hci_event_stream) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs(payload_handle) cert_handle = conn_handle conn_handle = 0xfff Loading @@ -567,8 +571,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData')) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: b'SomeAclData' in packet.payload) acl_data_asserts.assert_event_occurs( Loading system/gd/hci/cert/le_advertising_manager_test.py +3 −1 Original line number Diff line number Diff line Loading @@ -75,6 +75,9 @@ class LeAdvertisingManagerTest(GdFacadeOnlyBaseTestClass): with EventCallbackStream( self.cert_device.hci.FetchLeSubevents( empty_proto.Empty())) as hci_le_event_stream: hci_event_asserts = EventAsserts(hci_le_event_stream) # CERT Scans self.enqueue_hci_command( hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'), Loading Loading @@ -116,7 +119,6 @@ class LeAdvertisingManagerTest(GdFacadeOnlyBaseTestClass): create_response = self.device_under_test.hci_le_advertising_manager.CreateAdvertiser( request) hci_event_asserts = EventAsserts(hci_le_event_stream) hci_event_asserts.assert_event_occurs( lambda packet: b'Im_The_DUT' in packet.event) Loading system/gd/hci/cert/le_scanning_manager_test.py +2 −1 Original line number Diff line number Diff line Loading @@ -76,6 +76,8 @@ class LeScanningManagerTest(GdFacadeOnlyBaseTestClass): self.device_under_test.hci_le_scanning_manager.StartScan( empty_proto.Empty())) as advertising_event_stream: hci_event_asserts = EventAsserts(advertising_event_stream) # CERT Advertises gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME Loading @@ -102,7 +104,6 @@ class LeScanningManagerTest(GdFacadeOnlyBaseTestClass): create_response = self.cert_device.hci_le_advertising_manager.CreateAdvertiser( request) hci_event_asserts = EventAsserts(advertising_event_stream) hci_event_asserts.assert_event_occurs( lambda packet: b'Im_The_CERT' in packet.event) Loading Loading
system/gd/hci/cert/acl_manager_test.py +11 −8 Original line number Diff line number Diff line Loading @@ -82,6 +82,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) # CERT Enables scans and gets its address self.enqueue_hci_command( hci_packets.WriteScanEnableBuilder( Loading @@ -104,7 +108,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs( get_address_from_complete) Loading @@ -115,6 +118,8 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS), address=bytes(cert_address, 'utf8')))) as connection_event_stream: connection_event_asserts = EventAsserts(connection_event_stream) connection_request = None def get_connect_request(packet): Loading Loading @@ -162,7 +167,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): )) # DUT gets a connection complete event and sends and receives connection_event_asserts = EventAsserts(connection_event_stream) handle = 0xfff connection_event_asserts.assert_event_occurs(get_handle) Loading @@ -173,8 +177,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' ))) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: b'SomeMoreAclData' in packet.data) acl_data_asserts.assert_event_occurs( Loading @@ -187,6 +189,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) acl_data_asserts = EventAsserts(acl_data_stream) # CERT Enables scans and gets its address self.enqueue_hci_command( hci_packets.WriteScanEnableBuilder( Loading @@ -209,7 +214,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs( get_address_from_complete) Loading @@ -220,6 +224,8 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS), address=bytes(cert_address, 'utf8')))) as connection_event_stream: connection_event_asserts = EventAsserts(connection_event_stream) connection_request = None def get_connect_request(packet): Loading Loading @@ -258,8 +264,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): cert_hci_event_asserts.assert_event_occurs(get_handle) cert_handle = handle acl_data_asserts = EventAsserts(acl_data_stream) self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_AUTOMATICALLY_FLUSHABLE, Loading @@ -276,7 +280,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): bytes(b'\x88\x13\x07\x00' + b'Hello' * 1000)) # DUT gets a connection complete event and sends and receives connection_event_asserts = EventAsserts(connection_event_stream) connection_event_asserts.assert_event_occurs(get_handle) acl_data_asserts.assert_event_occurs( Loading
system/gd/hci/cert/direct_hci_test.py +23 −21 Original line number Diff line number Diff line Loading @@ -102,6 +102,7 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): with EventCallbackStream( self.device_under_test.hci.FetchEvents( empty_proto.Empty())) as hci_event_stream: hci_event_asserts = EventAsserts(hci_event_stream) self.enqueue_hci_command( hci_packets.WriteLoopbackModeBuilder( Loading @@ -111,7 +112,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): self.enqueue_hci_command(cmd2loop, True) looped_bytes = bytes(cmd2loop.Serialize()) hci_event_asserts = EventAsserts(hci_event_stream) hci_event_asserts.assert_event_occurs( lambda packet: looped_bytes in packet.event) Loading @@ -120,6 +120,9 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): with EventCallbackStream( self.device_under_test.hci.FetchEvents( empty_proto.Empty())) as hci_event_stream: hci_event_asserts = EventAsserts(hci_event_stream) self.send_hal_hci_command( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) Loading @@ -127,7 +130,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): lap.lap = 0x33 self.enqueue_hci_command( hci_packets.InquiryBuilder(lap, 0x30, 0xff), False) hci_event_asserts = EventAsserts(hci_event_stream) hci_event_asserts.assert_event_occurs( lambda packet: b'\x02\x0f' in packet.event # Expecting an HCI Event (code 0x02, length 0x0f) Loading @@ -139,6 +141,8 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): self.device_under_test.hci.FetchLeSubevents( empty_proto.Empty())) as hci_le_event_stream: hci_event_asserts = EventAsserts(hci_le_event_stream) # DUT Scans self.enqueue_hci_command( hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'), Loading Loading @@ -176,7 +180,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): hci_packets.LeSetAdvertisingEnableBuilder( hci_packets.Enable.ENABLED)) hci_event_asserts = EventAsserts(hci_le_event_stream) hci_event_asserts.assert_event_occurs( lambda packet: b'Im_A_Cert' in packet.event) Loading @@ -195,6 +198,11 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: le_event_asserts = EventAsserts(le_event_stream) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) self.send_hal_hci_command( hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) Loading Loading @@ -259,9 +267,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): return True return False le_event_asserts = EventAsserts(le_event_stream) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs(payload_handle) cert_handle = conn_handle conn_handle = 0xfff Loading @@ -286,8 +291,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData')) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: logging.debug(packet.payload) or b'SomeAclData' in packet.payload ) Loading Loading @@ -353,6 +356,11 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) hci_event_asserts = EventAsserts(hci_event_stream) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) address = hci_packets.Address() def get_address_from_complete(packet): Loading @@ -371,7 +379,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): # CERT Enables scans and gets its address self.send_hal_hci_command(hci_packets.ReadBdAddrBuilder()) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs( get_address_from_complete) self.send_hal_hci_command( Loading Loading @@ -427,8 +434,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): packet_bytes = packet.payload return get_handle(packet_bytes) hci_event_asserts = EventAsserts(hci_event_stream) cert_hci_event_asserts.assert_event_occurs(payload_handle) cert_handle = conn_handle conn_handle = 0xfff Loading @@ -453,8 +458,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData')) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: b'SomeAclData' in packet.payload) acl_data_asserts.assert_event_occurs( Loading @@ -468,6 +471,11 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: hci_event_asserts = EventAsserts(hci_event_stream) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) address = hci_packets.Address() def get_address_from_complete(packet): Loading @@ -489,8 +497,7 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True) self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) dut_hci_event_asserts = EventAsserts(hci_event_stream) dut_hci_event_asserts.assert_event_occurs(get_address_from_complete) hci_event_asserts.assert_event_occurs(get_address_from_complete) # Cert Connects self.send_hal_hci_command( Loading @@ -512,7 +519,7 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): return True return False dut_hci_event_asserts.assert_event_occurs(get_connect_request) hci_event_asserts.assert_event_occurs(get_connect_request) self.enqueue_hci_command( hci_packets.AcceptConnectionRequestBuilder( connection_request.GetBdAddr(), Loading Loading @@ -540,9 +547,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): packet_bytes = packet.payload return get_handle(packet_bytes) hci_event_asserts = EventAsserts(hci_event_stream) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs(payload_handle) cert_handle = conn_handle conn_handle = 0xfff Loading @@ -567,8 +571,6 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData')) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: b'SomeAclData' in packet.payload) acl_data_asserts.assert_event_occurs( Loading
system/gd/hci/cert/le_advertising_manager_test.py +3 −1 Original line number Diff line number Diff line Loading @@ -75,6 +75,9 @@ class LeAdvertisingManagerTest(GdFacadeOnlyBaseTestClass): with EventCallbackStream( self.cert_device.hci.FetchLeSubevents( empty_proto.Empty())) as hci_le_event_stream: hci_event_asserts = EventAsserts(hci_le_event_stream) # CERT Scans self.enqueue_hci_command( hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'), Loading Loading @@ -116,7 +119,6 @@ class LeAdvertisingManagerTest(GdFacadeOnlyBaseTestClass): create_response = self.device_under_test.hci_le_advertising_manager.CreateAdvertiser( request) hci_event_asserts = EventAsserts(hci_le_event_stream) hci_event_asserts.assert_event_occurs( lambda packet: b'Im_The_DUT' in packet.event) Loading
system/gd/hci/cert/le_scanning_manager_test.py +2 −1 Original line number Diff line number Diff line Loading @@ -76,6 +76,8 @@ class LeScanningManagerTest(GdFacadeOnlyBaseTestClass): self.device_under_test.hci_le_scanning_manager.StartScan( empty_proto.Empty())) as advertising_event_stream: hci_event_asserts = EventAsserts(advertising_event_stream) # CERT Advertises gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME Loading @@ -102,7 +104,6 @@ class LeScanningManagerTest(GdFacadeOnlyBaseTestClass): create_response = self.cert_device.hci_le_advertising_manager.CreateAdvertiser( request) hci_event_asserts = EventAsserts(advertising_event_stream) hci_event_asserts.assert_event_occurs( lambda packet: b'Im_The_CERT' in packet.event) Loading