Loading system/gd/hci/cert/direct_hci_test.py +228 −0 Original line number Diff line number Diff line Loading @@ -349,3 +349,231 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): lambda packet: b'\x3e\x13\x01\x00' in packet.payload) le_event_asserts.assert_event_occurs( lambda packet: b'\x3e\x13\x01\x00' in packet.event) def test_connection_dut_connects(self): self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) with EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \ EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: address = hci_packets.Address() def get_address_from_complete(packet): packet_bytes = packet.payload if b'\x0e\x0a\x01\x09\x10' in packet_bytes: nonlocal address addr_view = hci_packets.ReadBdAddrCompleteView( hci_packets.CommandCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet_bytes))))) address = addr_view.GetBdAddr() return True return False # CERT Enables scans and gets its address self.send_hal_hci_command(hci_packets.ReadBdAddrBuilder()) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs( get_address_from_complete) self.send_hal_hci_command( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) # DUT Connects self.enqueue_hci_command( hci_packets.CreateConnectionBuilder( address, 0x11, hci_packets.PageScanRepetitionMode.R0, 0x22, hci_packets.ClockOffsetValid.VALID, hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH), False) # Cert Accepts connection_request = None def get_connect_request(packet): if b'\x04\x0a' in packet.payload: nonlocal connection_request connection_request = hci_packets.ConnectionRequestView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.payload)))) return True return False # Cert Accepts cert_hci_event_asserts.assert_event_occurs(get_connect_request) self.send_hal_hci_command( hci_packets.AcceptConnectionRequestBuilder( connection_request.GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) conn_handle = 0xfff def get_handle(packet_bytes): if b'\x03\x0b\x00' in packet_bytes: nonlocal conn_handle cc_view = hci_packets.ConnectionCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet_bytes)))) conn_handle = cc_view.GetConnectionHandle() return True return False def event_handle(packet): packet_bytes = packet.event return get_handle(packet_bytes) def payload_handle(packet): packet_bytes = packet.payload return get_handle(packet_bytes) hci_event_asserts = EventAsserts(hci_event_stream) cert_hci_event_asserts.assert_event_occurs(payload_handle) cert_handle = conn_handle conn_handle = 0xfff hci_event_asserts.assert_event_occurs(event_handle) dut_handle = conn_handle if dut_handle == 0xfff: logging.warning("Failed to get the DUT handle") return False if cert_handle == 0xfff: logging.warning("Failed to get the CERT handle") return False # Send ACL Data self.enqueue_acl_data( dut_handle, hci_packets.PacketBoundaryFlag. FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeAclData')) self.send_hal_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData')) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: b'SomeAclData' in packet.payload) acl_data_asserts.assert_event_occurs( lambda packet: b'SomeMoreAclData' in packet.data) def test_connection_cert_connects(self): self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST) with EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \ EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: address = hci_packets.Address() def get_address_from_complete(packet): packet_bytes = packet.event if b'\x0e\x0a\x01\x09\x10' in packet_bytes: nonlocal address addr_view = hci_packets.ReadBdAddrCompleteView( hci_packets.CommandCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet_bytes))))) address = addr_view.GetBdAddr() return True return False # DUT Enables scans and gets its address self.enqueue_hci_command( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True) self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) dut_hci_event_asserts = EventAsserts(hci_event_stream) dut_hci_event_asserts.assert_event_occurs(get_address_from_complete) # Cert Connects self.send_hal_hci_command( hci_packets.CreateConnectionBuilder( address, 0x11, hci_packets.PageScanRepetitionMode.R0, 0x22, hci_packets.ClockOffsetValid.VALID, hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) # DUT Accepts connection_request = None def get_connect_request(packet): if b'\x04\x0a' in packet.event: nonlocal connection_request connection_request = hci_packets.ConnectionRequestView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.event)))) return True return False dut_hci_event_asserts.assert_event_occurs(get_connect_request) self.enqueue_hci_command( hci_packets.AcceptConnectionRequestBuilder( connection_request.GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE), False) conn_handle = 0xfff def get_handle(packet_bytes): if b'\x03\x0b\x00' in packet_bytes: nonlocal conn_handle cc_view = hci_packets.ConnectionCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet_bytes)))) conn_handle = cc_view.GetConnectionHandle() return True return False def event_handle(packet): packet_bytes = packet.event return get_handle(packet_bytes) def payload_handle(packet): packet_bytes = packet.payload return get_handle(packet_bytes) hci_event_asserts = EventAsserts(hci_event_stream) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs(payload_handle) cert_handle = conn_handle conn_handle = 0xfff hci_event_asserts.assert_event_occurs(event_handle) dut_handle = conn_handle if dut_handle == 0xfff: logging.warning("Failed to get the DUT handle") return False if cert_handle == 0xfff: logging.warning("Failed to get the CERT handle") return False # Send ACL Data self.enqueue_acl_data( dut_handle, hci_packets.PacketBoundaryFlag. FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeAclData')) self.send_hal_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData')) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: b'SomeAclData' in packet.payload) acl_data_asserts.assert_event_occurs( lambda packet: b'SomeMoreAclData' in packet.data) Loading
system/gd/hci/cert/direct_hci_test.py +228 −0 Original line number Diff line number Diff line Loading @@ -349,3 +349,231 @@ class DirectHciTest(GdFacadeOnlyBaseTestClass): lambda packet: b'\x3e\x13\x01\x00' in packet.payload) le_event_asserts.assert_event_occurs( lambda packet: b'\x3e\x13\x01\x00' in packet.event) def test_connection_dut_connects(self): self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) with EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \ EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: address = hci_packets.Address() def get_address_from_complete(packet): packet_bytes = packet.payload if b'\x0e\x0a\x01\x09\x10' in packet_bytes: nonlocal address addr_view = hci_packets.ReadBdAddrCompleteView( hci_packets.CommandCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet_bytes))))) address = addr_view.GetBdAddr() return True return False # CERT Enables scans and gets its address self.send_hal_hci_command(hci_packets.ReadBdAddrBuilder()) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs( get_address_from_complete) self.send_hal_hci_command( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) # DUT Connects self.enqueue_hci_command( hci_packets.CreateConnectionBuilder( address, 0x11, hci_packets.PageScanRepetitionMode.R0, 0x22, hci_packets.ClockOffsetValid.VALID, hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH), False) # Cert Accepts connection_request = None def get_connect_request(packet): if b'\x04\x0a' in packet.payload: nonlocal connection_request connection_request = hci_packets.ConnectionRequestView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.payload)))) return True return False # Cert Accepts cert_hci_event_asserts.assert_event_occurs(get_connect_request) self.send_hal_hci_command( hci_packets.AcceptConnectionRequestBuilder( connection_request.GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) conn_handle = 0xfff def get_handle(packet_bytes): if b'\x03\x0b\x00' in packet_bytes: nonlocal conn_handle cc_view = hci_packets.ConnectionCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet_bytes)))) conn_handle = cc_view.GetConnectionHandle() return True return False def event_handle(packet): packet_bytes = packet.event return get_handle(packet_bytes) def payload_handle(packet): packet_bytes = packet.payload return get_handle(packet_bytes) hci_event_asserts = EventAsserts(hci_event_stream) cert_hci_event_asserts.assert_event_occurs(payload_handle) cert_handle = conn_handle conn_handle = 0xfff hci_event_asserts.assert_event_occurs(event_handle) dut_handle = conn_handle if dut_handle == 0xfff: logging.warning("Failed to get the DUT handle") return False if cert_handle == 0xfff: logging.warning("Failed to get the CERT handle") return False # Send ACL Data self.enqueue_acl_data( dut_handle, hci_packets.PacketBoundaryFlag. FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeAclData')) self.send_hal_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData')) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: b'SomeAclData' in packet.payload) acl_data_asserts.assert_event_occurs( lambda packet: b'SomeMoreAclData' in packet.data) def test_connection_cert_connects(self): self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST) with EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \ EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: address = hci_packets.Address() def get_address_from_complete(packet): packet_bytes = packet.event if b'\x0e\x0a\x01\x09\x10' in packet_bytes: nonlocal address addr_view = hci_packets.ReadBdAddrCompleteView( hci_packets.CommandCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet_bytes))))) address = addr_view.GetBdAddr() return True return False # DUT Enables scans and gets its address self.enqueue_hci_command( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True) self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) dut_hci_event_asserts = EventAsserts(hci_event_stream) dut_hci_event_asserts.assert_event_occurs(get_address_from_complete) # Cert Connects self.send_hal_hci_command( hci_packets.CreateConnectionBuilder( address, 0x11, hci_packets.PageScanRepetitionMode.R0, 0x22, hci_packets.ClockOffsetValid.VALID, hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) # DUT Accepts connection_request = None def get_connect_request(packet): if b'\x04\x0a' in packet.event: nonlocal connection_request connection_request = hci_packets.ConnectionRequestView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.event)))) return True return False dut_hci_event_asserts.assert_event_occurs(get_connect_request) self.enqueue_hci_command( hci_packets.AcceptConnectionRequestBuilder( connection_request.GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE), False) conn_handle = 0xfff def get_handle(packet_bytes): if b'\x03\x0b\x00' in packet_bytes: nonlocal conn_handle cc_view = hci_packets.ConnectionCompleteView( hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet_bytes)))) conn_handle = cc_view.GetConnectionHandle() return True return False def event_handle(packet): packet_bytes = packet.event return get_handle(packet_bytes) def payload_handle(packet): packet_bytes = packet.payload return get_handle(packet_bytes) hci_event_asserts = EventAsserts(hci_event_stream) cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) cert_hci_event_asserts.assert_event_occurs(payload_handle) cert_handle = conn_handle conn_handle = 0xfff hci_event_asserts.assert_event_occurs(event_handle) dut_handle = conn_handle if dut_handle == 0xfff: logging.warning("Failed to get the DUT handle") return False if cert_handle == 0xfff: logging.warning("Failed to get the CERT handle") return False # Send ACL Data self.enqueue_acl_data( dut_handle, hci_packets.PacketBoundaryFlag. FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeAclData')) self.send_hal_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData')) acl_data_asserts = EventAsserts(acl_data_stream) cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) cert_acl_data_asserts.assert_event_occurs( lambda packet: b'SomeAclData' in packet.payload) acl_data_asserts.assert_event_occurs( lambda packet: b'SomeMoreAclData' in packet.data)