Loading system/gd/cert/matchers.py +13 −9 Original line number Diff line number Diff line Loading @@ -87,10 +87,10 @@ class HciMatchers(object): @staticmethod def _extract_matching_le_event(packet_bytes, subevent_code): event = hci_packets.LeMetaEventView( HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT)) if event is None: inner_event = HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT) if inner_event is None: return None event = hci_packets.LeMetaEventView(inner_event) if event.GetSubeventCode() != subevent_code: return None return event Loading @@ -105,12 +105,16 @@ class HciMatchers(object): @staticmethod def _extract_le_connection_complete(packet_bytes): event = hci_packets.LeConnectionCompleteView( HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE)) if event is not None: return event return hci_packets.LeEnhancedConnectionCompleteView( HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)) inner_event = HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE) if inner_event is not None: return hci_packets.LeConnectionCompleteView(inner_event) inner_event = HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE) if inner_event is not None: return hci_packets.LeEnhancedConnectionCompleteView(inner_event) return None @staticmethod def LogEventCode(): Loading system/gd/cert/py_hal.py +7 −0 Original line number Diff line number Diff line Loading @@ -180,6 +180,13 @@ class PyHal(Closable): handle = connection_complete.get().GetConnectionHandle() return PyHalAclConnection(handle, self.acl_stream, self.device) def complete_le_connection(self): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.hci_event_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() return PyHalAclConnection(handle, self.acl_stream, self.device) def create_advertisement(self, handle, own_address, Loading system/gd/hal/cert/simple_hal_test.py +8 −27 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ from cert.event_stream import EventStream from cert.truth import assertThat from cert.py_hal import PyHal from cert.matchers import HciMatchers from cert.captures import HciCaptures from google.protobuf import empty_pb2 from facade import rootservice_pb2 as facade_rootservice_pb2 from hal import facade_pb2 as hal_facade_pb2 Loading Loading @@ -136,28 +137,11 @@ class SimpleHalTest(GdBaseTestClass): advertisement.set_scan_response(b'Im_The_D') advertisement.start() conn_handle = 0xfff def payload_handle(packet): packet_bytes = packet.payload if b'\x3e\x13\x01\x00' in packet_bytes: nonlocal conn_handle cc_view = hci_packets.LeConnectionCompleteView( hci_packets.LeMetaEventView( hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes))))) conn_handle = cc_view.GetConnectionHandle() return True return False assertThat(self.cert_hal.get_hci_event_stream()).emits(payload_handle) cert_handle = conn_handle conn_handle = 0xfff assertThat(self.dut_hal.get_hci_event_stream()).emits(payload_handle) dut_handle = conn_handle # Send ACL Data self.dut_hal.send_acl_first(dut_handle, bytes(b'Just SomeAclData')) self.cert_hal.send_acl_first(cert_handle, bytes(b'Just SomeMoreAclData')) cert_acl = self.cert_hal.complete_le_connection() dut_acl = self.dut_hal.complete_le_connection() dut_acl.send_first(b'Just SomeAclData') cert_acl.send_first(b'Just SomeMoreAclData') assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) assertThat(self.dut_hal.get_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload) Loading Loading @@ -192,8 +176,5 @@ class SimpleHalTest(GdBaseTestClass): advertisement.set_data(b'Im_A_Cert') advertisement.start() # LeConnectionComplete assertThat(self.cert_hal.get_hci_event_stream()).emits( lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20)) assertThat(self.dut_hal.get_hci_event_stream()).emits( lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20)) assertThat(self.cert_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete()) assertThat(self.dut_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete()) Loading
system/gd/cert/matchers.py +13 −9 Original line number Diff line number Diff line Loading @@ -87,10 +87,10 @@ class HciMatchers(object): @staticmethod def _extract_matching_le_event(packet_bytes, subevent_code): event = hci_packets.LeMetaEventView( HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT)) if event is None: inner_event = HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT) if inner_event is None: return None event = hci_packets.LeMetaEventView(inner_event) if event.GetSubeventCode() != subevent_code: return None return event Loading @@ -105,12 +105,16 @@ class HciMatchers(object): @staticmethod def _extract_le_connection_complete(packet_bytes): event = hci_packets.LeConnectionCompleteView( HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE)) if event is not None: return event return hci_packets.LeEnhancedConnectionCompleteView( HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)) inner_event = HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE) if inner_event is not None: return hci_packets.LeConnectionCompleteView(inner_event) inner_event = HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE) if inner_event is not None: return hci_packets.LeEnhancedConnectionCompleteView(inner_event) return None @staticmethod def LogEventCode(): Loading
system/gd/cert/py_hal.py +7 −0 Original line number Diff line number Diff line Loading @@ -180,6 +180,13 @@ class PyHal(Closable): handle = connection_complete.get().GetConnectionHandle() return PyHalAclConnection(handle, self.acl_stream, self.device) def complete_le_connection(self): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.hci_event_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() return PyHalAclConnection(handle, self.acl_stream, self.device) def create_advertisement(self, handle, own_address, Loading
system/gd/hal/cert/simple_hal_test.py +8 −27 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ from cert.event_stream import EventStream from cert.truth import assertThat from cert.py_hal import PyHal from cert.matchers import HciMatchers from cert.captures import HciCaptures from google.protobuf import empty_pb2 from facade import rootservice_pb2 as facade_rootservice_pb2 from hal import facade_pb2 as hal_facade_pb2 Loading Loading @@ -136,28 +137,11 @@ class SimpleHalTest(GdBaseTestClass): advertisement.set_scan_response(b'Im_The_D') advertisement.start() conn_handle = 0xfff def payload_handle(packet): packet_bytes = packet.payload if b'\x3e\x13\x01\x00' in packet_bytes: nonlocal conn_handle cc_view = hci_packets.LeConnectionCompleteView( hci_packets.LeMetaEventView( hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes))))) conn_handle = cc_view.GetConnectionHandle() return True return False assertThat(self.cert_hal.get_hci_event_stream()).emits(payload_handle) cert_handle = conn_handle conn_handle = 0xfff assertThat(self.dut_hal.get_hci_event_stream()).emits(payload_handle) dut_handle = conn_handle # Send ACL Data self.dut_hal.send_acl_first(dut_handle, bytes(b'Just SomeAclData')) self.cert_hal.send_acl_first(cert_handle, bytes(b'Just SomeMoreAclData')) cert_acl = self.cert_hal.complete_le_connection() dut_acl = self.dut_hal.complete_le_connection() dut_acl.send_first(b'Just SomeAclData') cert_acl.send_first(b'Just SomeMoreAclData') assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) assertThat(self.dut_hal.get_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload) Loading Loading @@ -192,8 +176,5 @@ class SimpleHalTest(GdBaseTestClass): advertisement.set_data(b'Im_A_Cert') advertisement.start() # LeConnectionComplete assertThat(self.cert_hal.get_hci_event_stream()).emits( lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20)) assertThat(self.dut_hal.get_hci_event_stream()).emits( lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20)) assertThat(self.cert_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete()) assertThat(self.dut_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete())