Loading system/gd/hci/cert/acl_manager_test.py +11 −94 Original line number Diff line number Diff line Loading @@ -39,20 +39,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): def setup_class(self): super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI') def enqueue_acl_data(self, handle, pb_flag, b_flag, acl): acl_msg = hci_facade.AclMsg( handle=int(handle), packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=acl) self.cert.hci.SendAclData(acl_msg) def test_dut_connects(self): self.cert.hci.register_for_events( hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading @@ -67,29 +54,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): address=bytes(cert_address, 'utf8')))) as connection_event_stream: # Cert Accepts connection_request = ConnectionRequestCapture() assertThat( cert_hci.get_event_stream()).emits(connection_request) cert_hci.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) # Cert gets ConnectionComplete with a handle and sends ACL data connection_complete = ConnectionCompleteCapture() assertThat( cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes( b'\x26\x00\x07\x00This is just SomeAclData from the Cert' )) cert_acl = cert_hci.accept_connection() cert_acl.send_first( b'\x26\x00\x07\x00This is just SomeAclData from the Cert') # DUT gets a connection complete event and sends and receives connection_complete = ConnectionCompleteCapture() Loading @@ -109,11 +76,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): lambda packet: b'SomeAclData' in packet.payload) def test_cert_connects(self): self.cert.hci.register_for_events( hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading @@ -124,15 +86,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): self.dut.neighbor.EnablePageScan( neighbor_facade.EnableMsg(enabled=True)) # Cert connects cert_hci.send_command_with_status( hci_packets.CreateConnectionBuilder( dut_address.decode('utf-8'), 0xcc18, # Packet Type hci_packets.PageScanRepetitionMode.R1, 0x0, hci_packets.ClockOffsetValid.INVALID, hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) cert_hci.initiate_connection(dut_address) # DUT gets a connection request connection_complete = ConnectionCompleteCapture() Loading @@ -146,16 +100,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' ))) connection_complete = ConnectionCompleteCapture() assertThat(cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes( b'\x26\x00\x07\x00This is just SomeAclData from the Cert')) cert_acl = cert_hci.complete_connection() cert_acl.send_first( b'\x26\x00\x07\x00This is just SomeAclData from the Cert') assertThat(cert_hci.get_acl_stream()).emits( lambda packet: b'SomeMoreAclData' in packet.data) Loading @@ -163,11 +110,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): lambda packet: b'SomeAclData' in packet.payload) def test_recombination_l2cap_packet(self): self.cert.hci.register_for_events( hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading @@ -183,35 +125,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): address=bytes(cert_address, 'utf8')))) as connection_event_stream: # Cert Accepts connection_request = ConnectionRequestCapture() assertThat( cert_hci.get_event_stream()).emits(connection_request) cert_hci.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) # Cert gets ConnectionComplete with a handle and sends ACL data connection_complete = ConnectionCompleteCapture() assertThat( cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x06\x00\x07\x00Hello')) self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\xe8\x03\x07\x00' + b'Hello' * 200)) cert_acl = cert_hci.accept_connection() cert_acl.send_first(b'\x06\x00\x07\x00Hello') cert_acl.send_continuing(b'!') cert_acl.send_first(b'\xe8\x03\x07\x00' + b'Hello' * 200) # DUT gets a connection complete event and sends and receives connection_complete = ConnectionCompleteCapture() Loading system/gd/hci/cert/py_hci.py +61 −0 Original line number Diff line number Diff line Loading @@ -17,14 +17,48 @@ from google.protobuf import empty_pb2 as empty_proto from cert.event_stream import EventStream from captures import ReadBdAddrCompleteCapture from captures import ConnectionCompleteCapture from captures import ConnectionRequestCapture from bluetooth_packets_python3 import hci_packets from cert.truth import assertThat from hci.facade import facade_pb2 as hci_facade class PyHciAclConnection(object): def __init__(self, handle, acl_stream, device): self.handle = handle self.acl_stream = acl_stream self.device = device def send(self, pb_flag, b_flag, data): acl_msg = hci_facade.AclMsg( handle=int(self.handle), packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=data) self.device.hci.SendAclData(acl_msg) def send_first(self, data): self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) def send_continuing(self, data): self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) class PyHci(object): def __init__(self, device): self.device = device self.device.hci.register_for_events( hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) self.event_stream = self.device.hci.new_event_stream() self.acl_stream = EventStream( self.device.hci.FetchAclPackets(empty_proto.Empty())) Loading Loading @@ -65,3 +99,30 @@ class PyHci(object): read_bd_addr = ReadBdAddrCompleteCapture() assertThat(self.event_stream).emits(read_bd_addr) return read_bd_addr.get().GetBdAddr() def initiate_connection(self, remote_addr): self.send_command_with_status( hci_packets.CreateConnectionBuilder( remote_addr.decode('utf-8'), 0xcc18, # Packet Type hci_packets.PageScanRepetitionMode.R1, 0x0, hci_packets.ClockOffsetValid.INVALID, hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) def accept_connection(self): connection_request = ConnectionRequestCapture() assertThat(self.event_stream).emits(connection_request) self.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) return self.complete_connection() def complete_connection(self): connection_complete = ConnectionCompleteCapture() assertThat(self.event_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() return PyHciAclConnection(handle, self.acl_stream, self.device) Loading
system/gd/hci/cert/acl_manager_test.py +11 −94 Original line number Diff line number Diff line Loading @@ -39,20 +39,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): def setup_class(self): super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI') def enqueue_acl_data(self, handle, pb_flag, b_flag, acl): acl_msg = hci_facade.AclMsg( handle=int(handle), packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=acl) self.cert.hci.SendAclData(acl_msg) def test_dut_connects(self): self.cert.hci.register_for_events( hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading @@ -67,29 +54,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): address=bytes(cert_address, 'utf8')))) as connection_event_stream: # Cert Accepts connection_request = ConnectionRequestCapture() assertThat( cert_hci.get_event_stream()).emits(connection_request) cert_hci.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) # Cert gets ConnectionComplete with a handle and sends ACL data connection_complete = ConnectionCompleteCapture() assertThat( cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes( b'\x26\x00\x07\x00This is just SomeAclData from the Cert' )) cert_acl = cert_hci.accept_connection() cert_acl.send_first( b'\x26\x00\x07\x00This is just SomeAclData from the Cert') # DUT gets a connection complete event and sends and receives connection_complete = ConnectionCompleteCapture() Loading @@ -109,11 +76,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): lambda packet: b'SomeAclData' in packet.payload) def test_cert_connects(self): self.cert.hci.register_for_events( hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading @@ -124,15 +86,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): self.dut.neighbor.EnablePageScan( neighbor_facade.EnableMsg(enabled=True)) # Cert connects cert_hci.send_command_with_status( hci_packets.CreateConnectionBuilder( dut_address.decode('utf-8'), 0xcc18, # Packet Type hci_packets.PageScanRepetitionMode.R1, 0x0, hci_packets.ClockOffsetValid.INVALID, hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) cert_hci.initiate_connection(dut_address) # DUT gets a connection request connection_complete = ConnectionCompleteCapture() Loading @@ -146,16 +100,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' ))) connection_complete = ConnectionCompleteCapture() assertThat(cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes( b'\x26\x00\x07\x00This is just SomeAclData from the Cert')) cert_acl = cert_hci.complete_connection() cert_acl.send_first( b'\x26\x00\x07\x00This is just SomeAclData from the Cert') assertThat(cert_hci.get_acl_stream()).emits( lambda packet: b'SomeMoreAclData' in packet.data) Loading @@ -163,11 +110,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): lambda packet: b'SomeAclData' in packet.payload) def test_recombination_l2cap_packet(self): self.cert.hci.register_for_events( hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading @@ -183,35 +125,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): address=bytes(cert_address, 'utf8')))) as connection_event_stream: # Cert Accepts connection_request = ConnectionRequestCapture() assertThat( cert_hci.get_event_stream()).emits(connection_request) cert_hci.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) # Cert gets ConnectionComplete with a handle and sends ACL data connection_complete = ConnectionCompleteCapture() assertThat( cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x06\x00\x07\x00Hello')) self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) self.enqueue_acl_data( cert_handle, hci_packets.PacketBoundaryFlag. FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\xe8\x03\x07\x00' + b'Hello' * 200)) cert_acl = cert_hci.accept_connection() cert_acl.send_first(b'\x06\x00\x07\x00Hello') cert_acl.send_continuing(b'!') cert_acl.send_first(b'\xe8\x03\x07\x00' + b'Hello' * 200) # DUT gets a connection complete event and sends and receives connection_complete = ConnectionCompleteCapture() Loading
system/gd/hci/cert/py_hci.py +61 −0 Original line number Diff line number Diff line Loading @@ -17,14 +17,48 @@ from google.protobuf import empty_pb2 as empty_proto from cert.event_stream import EventStream from captures import ReadBdAddrCompleteCapture from captures import ConnectionCompleteCapture from captures import ConnectionRequestCapture from bluetooth_packets_python3 import hci_packets from cert.truth import assertThat from hci.facade import facade_pb2 as hci_facade class PyHciAclConnection(object): def __init__(self, handle, acl_stream, device): self.handle = handle self.acl_stream = acl_stream self.device = device def send(self, pb_flag, b_flag, data): acl_msg = hci_facade.AclMsg( handle=int(self.handle), packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=data) self.device.hci.SendAclData(acl_msg) def send_first(self, data): self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) def send_continuing(self, data): self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) class PyHci(object): def __init__(self, device): self.device = device self.device.hci.register_for_events( hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) self.event_stream = self.device.hci.new_event_stream() self.acl_stream = EventStream( self.device.hci.FetchAclPackets(empty_proto.Empty())) Loading Loading @@ -65,3 +99,30 @@ class PyHci(object): read_bd_addr = ReadBdAddrCompleteCapture() assertThat(self.event_stream).emits(read_bd_addr) return read_bd_addr.get().GetBdAddr() def initiate_connection(self, remote_addr): self.send_command_with_status( hci_packets.CreateConnectionBuilder( remote_addr.decode('utf-8'), 0xcc18, # Packet Type hci_packets.PageScanRepetitionMode.R1, 0x0, hci_packets.ClockOffsetValid.INVALID, hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) def accept_connection(self): connection_request = ConnectionRequestCapture() assertThat(self.event_stream).emits(connection_request) self.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) return self.complete_connection() def complete_connection(self): connection_complete = ConnectionCompleteCapture() assertThat(self.event_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() return PyHciAclConnection(handle, self.acl_stream, self.device)