Loading system/gd/cert/py_hal.py +49 −0 Original line number Diff line number Diff line Loading @@ -16,8 +16,12 @@ from google.protobuf import empty_pb2 as empty_proto from cert.event_stream import EventStream from cert.event_stream import FilteringEventStream from cert.event_stream import IEventStream from cert.closable import Closable from cert.closable import safeClose from cert.captures import HciCaptures from cert.truth import assertThat from hal import facade_pb2 as hal_facade from bluetooth_packets_python3.hci_packets import WriteScanEnableBuilder from bluetooth_packets_python3.hci_packets import ScanEnable Loading @@ -25,6 +29,25 @@ from bluetooth_packets_python3.hci_packets import AclPacketBuilder from bluetooth_packets_python3 import RawBuilder from bluetooth_packets_python3.hci_packets import BroadcastFlag from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag from bluetooth_packets_python3 import hci_packets class PyHalAclConnection(IEventStream): def __init__(self, handle, acl_stream, device): self.handle = int(handle) self.device = device self.our_acl_stream = FilteringEventStream(acl_stream, None) def send(self, pb_flag, b_flag, data): acl = AclPacketBuilder(self.handle, pb_flag, b_flag, RawBuilder(data)) self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl.Serialize()))) def send_first(self, data): self.send(PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(data)) def get_event_queue(self): return self.our_acl_stream.get_event_queue() class PyHal(Closable): Loading Loading @@ -59,3 +82,29 @@ class PyHal(Closable): def enable_inquiry_and_page_scan(self): self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN)) def initiate_connection(self, remote_addr): self.send_hci_command( hci_packets.CreateConnectionBuilder( remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'), 0xcc18, # Packet Type hci_packets.PageScanRepetitionMode.R1, 0x0, hci_packets.ClockOffsetValid.INVALID, hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) def accept_connection(self): connection_request = HciCaptures.ConnectionRequestCapture() assertThat(self.hci_event_stream).emits(connection_request) self.send_hci_command( hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) return self.complete_connection() def complete_connection(self): connection_complete = HciCaptures.ConnectionCompleteCapture() assertThat(self.hci_event_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() return PyHalAclConnection(handle, self.acl_stream, self.device) system/gd/cert/py_hci.py +1 −1 Original line number Diff line number Diff line Loading @@ -115,7 +115,7 @@ class PyHci(Closable): def initiate_connection(self, remote_addr): self.send_command_with_status( hci_packets.CreateConnectionBuilder( remote_addr.decode('utf-8'), remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'), 0xcc18, # Packet Type hci_packets.PageScanRepetitionMode.R1, 0x0, Loading system/gd/hci/cert/direct_hci_test.py +13 −63 Original line number Diff line number Diff line Loading @@ -323,17 +323,6 @@ class DirectHciTest(GdBaseTestClass): # LeConnectionComplete self._verify_le_connection_complete() def _verify_connection_complete(self): cert_connection_complete_capture = HalCaptures.ConnectionCompleteCapture() assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_connection_complete_capture) cert_handle = cert_connection_complete_capture.get().GetConnectionHandle() dut_connection_complete_capture = HciCaptures.ConnectionCompleteCapture() assertThat(self.dut_hci.get_event_stream()).emits(dut_connection_complete_capture) dut_handle = dut_connection_complete_capture.get().GetConnectionHandle() return (dut_handle, cert_handle) def test_connection_dut_connects(self): self.dut_hci.send_command_with_complete(WritePageTimeoutBuilder(0x4000)) Loading @@ -344,32 +333,15 @@ class DirectHciTest(GdBaseTestClass): assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_read_bd_addr_capture) address = cert_read_bd_addr_capture.get().GetBdAddr() self.cert_hal.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN)) # DUT Connects self.dut_hci.send_command_with_status( CreateConnectionBuilder( address, 0xcc18, # Packet Type PageScanRepetitionMode.R0, 0, ClockOffsetValid.INVALID, CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) # Cert Accepts connect_request_capture = HalCaptures.ConnectionRequestCapture() assertThat(self.cert_hal.get_hci_event_stream()).emits(connect_request_capture, timeout=timedelta(seconds=20)) connection_request = connect_request_capture.get() self.cert_hal.send_hci_command( AcceptConnectionRequestBuilder(connection_request.GetBdAddr(), AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) self.cert_hal.enable_inquiry_and_page_scan() (dut_handle, cert_handle) = self._verify_connection_complete() self.dut_hci.initiate_connection(address) cert_acl = self.cert_hal.accept_connection() dut_acl = self.dut_hci.complete_connection() # Send ACL Data self.enqueue_acl_data(dut_handle, PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData')) self.cert_hal.send_acl_first(cert_handle, bytes(b'Just SomeMoreAclData')) dut_acl.send_first(b'Just SomeAclData') cert_acl.send_first(b'Just SomeMoreAclData') assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.data) Loading @@ -377,38 +349,16 @@ class DirectHciTest(GdBaseTestClass): def test_connection_cert_connects(self): self.cert_hal.send_hci_command(WritePageTimeoutBuilder(0x4000)) # DUT Enables scans and gets its address self.dut_hci.send_command_with_complete(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN)) self.dut_hci.send_command_with_complete(ReadBdAddrBuilder()) self.dut_hci.enable_inquiry_and_page_scan() address = self.dut_hci.read_own_address() read_bd_addr_capture = HciCaptures.ReadBdAddrCompleteCapture() assertThat(self.dut_hci.get_event_stream()).emits(read_bd_addr_capture) address = read_bd_addr_capture.get().GetBdAddr() # Cert Connects self.cert_hal.send_hci_command( CreateConnectionBuilder( address, 0xcc18, # Packet Type PageScanRepetitionMode.R0, 0, ClockOffsetValid.INVALID, CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) # DUT Accepts connection_request_capture = HciCaptures.ConnectionRequestCapture() assertThat(self.dut_hci.get_event_stream()).emits(connection_request_capture, timeout=timedelta(seconds=20)) connection_request = connection_request_capture.get() self.dut_hci.send_command_with_status( AcceptConnectionRequestBuilder(connection_request.GetBdAddr(), AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) (dut_handle, cert_handle) = self._verify_connection_complete() self.cert_hal.initiate_connection(address) dut_acl = self.dut_hci.accept_connection() cert_acl = self.cert_hal.complete_connection() # Send ACL Data self.enqueue_acl_data(dut_handle, PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeAclData')) self.cert_hal.send_acl_first(cert_handle, bytes(b'This is just SomeMoreAclData')) dut_acl.send_first(b'This is just SomeAclData') cert_acl.send_first(b'This is just SomeMoreAclData') assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.data) Loading
system/gd/cert/py_hal.py +49 −0 Original line number Diff line number Diff line Loading @@ -16,8 +16,12 @@ from google.protobuf import empty_pb2 as empty_proto from cert.event_stream import EventStream from cert.event_stream import FilteringEventStream from cert.event_stream import IEventStream from cert.closable import Closable from cert.closable import safeClose from cert.captures import HciCaptures from cert.truth import assertThat from hal import facade_pb2 as hal_facade from bluetooth_packets_python3.hci_packets import WriteScanEnableBuilder from bluetooth_packets_python3.hci_packets import ScanEnable Loading @@ -25,6 +29,25 @@ from bluetooth_packets_python3.hci_packets import AclPacketBuilder from bluetooth_packets_python3 import RawBuilder from bluetooth_packets_python3.hci_packets import BroadcastFlag from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag from bluetooth_packets_python3 import hci_packets class PyHalAclConnection(IEventStream): def __init__(self, handle, acl_stream, device): self.handle = int(handle) self.device = device self.our_acl_stream = FilteringEventStream(acl_stream, None) def send(self, pb_flag, b_flag, data): acl = AclPacketBuilder(self.handle, pb_flag, b_flag, RawBuilder(data)) self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl.Serialize()))) def send_first(self, data): self.send(PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(data)) def get_event_queue(self): return self.our_acl_stream.get_event_queue() class PyHal(Closable): Loading Loading @@ -59,3 +82,29 @@ class PyHal(Closable): def enable_inquiry_and_page_scan(self): self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN)) def initiate_connection(self, remote_addr): self.send_hci_command( hci_packets.CreateConnectionBuilder( remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'), 0xcc18, # Packet Type hci_packets.PageScanRepetitionMode.R1, 0x0, hci_packets.ClockOffsetValid.INVALID, hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) def accept_connection(self): connection_request = HciCaptures.ConnectionRequestCapture() assertThat(self.hci_event_stream).emits(connection_request) self.send_hci_command( hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) return self.complete_connection() def complete_connection(self): connection_complete = HciCaptures.ConnectionCompleteCapture() assertThat(self.hci_event_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() return PyHalAclConnection(handle, self.acl_stream, self.device)
system/gd/cert/py_hci.py +1 −1 Original line number Diff line number Diff line Loading @@ -115,7 +115,7 @@ class PyHci(Closable): def initiate_connection(self, remote_addr): self.send_command_with_status( hci_packets.CreateConnectionBuilder( remote_addr.decode('utf-8'), remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'), 0xcc18, # Packet Type hci_packets.PageScanRepetitionMode.R1, 0x0, Loading
system/gd/hci/cert/direct_hci_test.py +13 −63 Original line number Diff line number Diff line Loading @@ -323,17 +323,6 @@ class DirectHciTest(GdBaseTestClass): # LeConnectionComplete self._verify_le_connection_complete() def _verify_connection_complete(self): cert_connection_complete_capture = HalCaptures.ConnectionCompleteCapture() assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_connection_complete_capture) cert_handle = cert_connection_complete_capture.get().GetConnectionHandle() dut_connection_complete_capture = HciCaptures.ConnectionCompleteCapture() assertThat(self.dut_hci.get_event_stream()).emits(dut_connection_complete_capture) dut_handle = dut_connection_complete_capture.get().GetConnectionHandle() return (dut_handle, cert_handle) def test_connection_dut_connects(self): self.dut_hci.send_command_with_complete(WritePageTimeoutBuilder(0x4000)) Loading @@ -344,32 +333,15 @@ class DirectHciTest(GdBaseTestClass): assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_read_bd_addr_capture) address = cert_read_bd_addr_capture.get().GetBdAddr() self.cert_hal.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN)) # DUT Connects self.dut_hci.send_command_with_status( CreateConnectionBuilder( address, 0xcc18, # Packet Type PageScanRepetitionMode.R0, 0, ClockOffsetValid.INVALID, CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) # Cert Accepts connect_request_capture = HalCaptures.ConnectionRequestCapture() assertThat(self.cert_hal.get_hci_event_stream()).emits(connect_request_capture, timeout=timedelta(seconds=20)) connection_request = connect_request_capture.get() self.cert_hal.send_hci_command( AcceptConnectionRequestBuilder(connection_request.GetBdAddr(), AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) self.cert_hal.enable_inquiry_and_page_scan() (dut_handle, cert_handle) = self._verify_connection_complete() self.dut_hci.initiate_connection(address) cert_acl = self.cert_hal.accept_connection() dut_acl = self.dut_hci.complete_connection() # Send ACL Data self.enqueue_acl_data(dut_handle, PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData')) self.cert_hal.send_acl_first(cert_handle, bytes(b'Just SomeMoreAclData')) dut_acl.send_first(b'Just SomeAclData') cert_acl.send_first(b'Just SomeMoreAclData') assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.data) Loading @@ -377,38 +349,16 @@ class DirectHciTest(GdBaseTestClass): def test_connection_cert_connects(self): self.cert_hal.send_hci_command(WritePageTimeoutBuilder(0x4000)) # DUT Enables scans and gets its address self.dut_hci.send_command_with_complete(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN)) self.dut_hci.send_command_with_complete(ReadBdAddrBuilder()) self.dut_hci.enable_inquiry_and_page_scan() address = self.dut_hci.read_own_address() read_bd_addr_capture = HciCaptures.ReadBdAddrCompleteCapture() assertThat(self.dut_hci.get_event_stream()).emits(read_bd_addr_capture) address = read_bd_addr_capture.get().GetBdAddr() # Cert Connects self.cert_hal.send_hci_command( CreateConnectionBuilder( address, 0xcc18, # Packet Type PageScanRepetitionMode.R0, 0, ClockOffsetValid.INVALID, CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) # DUT Accepts connection_request_capture = HciCaptures.ConnectionRequestCapture() assertThat(self.dut_hci.get_event_stream()).emits(connection_request_capture, timeout=timedelta(seconds=20)) connection_request = connection_request_capture.get() self.dut_hci.send_command_with_status( AcceptConnectionRequestBuilder(connection_request.GetBdAddr(), AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) (dut_handle, cert_handle) = self._verify_connection_complete() self.cert_hal.initiate_connection(address) dut_acl = self.dut_hci.accept_connection() cert_acl = self.cert_hal.complete_connection() # Send ACL Data self.enqueue_acl_data(dut_handle, PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeAclData')) self.cert_hal.send_acl_first(cert_handle, bytes(b'This is just SomeMoreAclData')) dut_acl.send_first(b'This is just SomeAclData') cert_acl.send_first(b'This is just SomeMoreAclData') assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.data)