Loading system/Android.mk +12 −5 Original line number Diff line number Diff line Loading @@ -18,6 +18,9 @@ LOCAL_host_executables := \ $(HOST_OUT_EXECUTABLES)/bt_topshim_facade \ $(HOST_OUT_EXECUTABLES)/root-canal LOCAL_host_python_hci_packets_library := \ $(SOONG_OUT_DIR)/.intermediates/packages/modules/Bluetooth/system/gd/gd_hci_packets_python3_gen/gen/hci_packets.py LOCAL_host_python_extension_libraries := \ $(HOST_OUT_SHARED_LIBRARIES)/bluetooth_packets_python3.so Loading Loading @@ -88,15 +91,19 @@ bluetooth_cert_src_and_bin_zip := \ $(bluetooth_cert_src_and_bin_zip): PRIVATE_bluetooth_project_dir := $(LOCAL_bluetooth_project_dir) $(bluetooth_cert_src_and_bin_zip): PRIVATE_cert_test_sources := $(LOCAL_cert_test_sources) $(bluetooth_cert_src_and_bin_zip): PRIVATE_host_executables := $(LOCAL_host_executables) $(bluetooth_cert_src_and_bin_zip): PRIVATE_host_python_extension_libraries := $(LOCAL_host_python_extension_libraries) $(bluetooth_cert_src_and_bin_zip): PRIVATE_host_libraries := $(LOCAL_host_libraries) $(bluetooth_cert_src_and_bin_zip): PRIVATE_host_python_extension_libraries := $(LOCAL_host_python_extension_libraries) $(bluetooth_cert_src_and_bin_zip): PRIVATE_host_python_hci_packets_library := $(LOCAL_host_python_hci_packets_library) $(bluetooth_cert_src_and_bin_zip): PRIVATE_target_executables := $(LOCAL_target_executables) $(bluetooth_cert_src_and_bin_zip): PRIVATE_target_libraries := $(LOCAL_target_libraries) $(bluetooth_cert_src_and_bin_zip): $(SOONG_ZIP) $(LOCAL_cert_test_sources) \ $(LOCAL_host_executables) $(LOCAL_host_libraries) $(LOCAL_host_python_extension_libraries) \ $(LOCAL_host_executables) $(LOCAL_host_libraries) $(LOCAL_host_python_libraries) \ $(LOCAL_host_python_extension_libraries) \ $(LOCAL_host_python_hci_packets_library) \ $(LOCAL_target_executables) $(LOCAL_target_libraries) $(hide) $(SOONG_ZIP) -d -o $@ \ -C $(PRIVATE_bluetooth_project_dir) $(addprefix -f ,$(PRIVATE_cert_test_sources)) \ -C $(dir $(PRIVATE_host_python_hci_packets_library)) -f $(PRIVATE_host_python_hci_packets_library) \ -C $(HOST_OUT_EXECUTABLES) $(addprefix -f ,$(PRIVATE_host_executables)) \ -C $(HOST_OUT_SHARED_LIBRARIES) $(addprefix -f ,$(PRIVATE_host_python_extension_libraries)) \ -P lib64 \ Loading system/blueberry/tests/gd/cert/captures.py +26 −43 Original line number Diff line number Diff line Loading @@ -15,7 +15,6 @@ # limitations under the License. import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import hci_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode from blueberry.tests.gd.cert.capture import Capture Loading @@ -23,42 +22,36 @@ from blueberry.tests.gd.cert.matchers import HciMatchers from blueberry.tests.gd.cert.matchers import L2capMatchers from blueberry.tests.gd.cert.matchers import SecurityMatchers from blueberry.facade.security.facade_pb2 import UiMsgType import hci_packets as hci class HalCaptures(object): @staticmethod def ReadBdAddrCompleteCapture(): return Capture( lambda packet: packet.payload[0:5] == b'\x0e\x0a\x01\x09\x10', lambda packet: hci_packets.ReadBdAddrCompleteView( hci_packets.CommandCompleteView( hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload)))))) return Capture(lambda packet: packet.payload[0:5] == b'\x0e\x0a\x01\x09\x10', lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def ConnectionRequestCapture(): return Capture( lambda packet: packet.payload[0:2] == b'\x04\x0a', lambda packet: hci_packets.ConnectionRequestView( hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload))))) return Capture(lambda packet: packet.payload[0:2] == b'\x04\x0a', lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def ConnectionCompleteCapture(): return Capture( lambda packet: packet.payload[0:3] == b'\x03\x0b\x00', lambda packet: hci_packets.ConnectionCompleteView( hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload))))) return Capture(lambda packet: packet.payload[0:3] == b'\x03\x0b\x00', lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def DisconnectionCompleteCapture(): return Capture( lambda packet: packet.payload[0:2] == b'\x05\x04', lambda packet: hci_packets.DisconnectionCompleteView( hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload))))) return Capture(lambda packet: packet.payload[0:2] == b'\x05\x04', lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def LeConnectionCompleteCapture(): return Capture( lambda packet: packet.payload[0] == 0x3e and (packet.payload[2] == 0x01 or packet.payload[2] == 0x0a), lambda packet: hci_packets.LeConnectionCompleteView( hci_packets.LeMetaEventView( hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload)))))) lambda packet: hci.Event.parse_all(packet.payload)) class HciCaptures(object): Loading @@ -66,43 +59,34 @@ class HciCaptures(object): @staticmethod def ReadLocalOobDataCompleteCapture(): return Capture( HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_DATA), lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_DATA) ) HciMatchers.CommandComplete(hci.OpCode.READ_LOCAL_OOB_DATA), lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci.OpCode.READ_LOCAL_OOB_DATA)) @staticmethod def ReadLocalOobExtendedDataCompleteCapture(): return Capture( HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA), lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA) ) HciMatchers.CommandComplete(hci.OpCode.READ_LOCAL_OOB_EXTENDED_DATA), lambda packet: HciMatchers. ExtractMatchingCommandComplete(packet.payload, hci.OpCode.READ_LOCAL_OOB_EXTENDED_DATA)) @staticmethod def ReadBdAddrCompleteCapture(): return Capture( HciMatchers.CommandComplete(hci_packets.OpCode.READ_BD_ADDR), lambda packet: hci_packets.ReadBdAddrCompleteView(HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_BD_ADDR))) return Capture(HciMatchers.CommandComplete(hci.OpCode.READ_BD_ADDR), lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def ConnectionRequestCapture(): return Capture( HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_REQUEST), lambda packet: hci_packets.ConnectionRequestView( HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_REQUEST))) return Capture(HciMatchers.EventWithCode(hci.EventCode.CONNECTION_REQUEST), lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def ConnectionCompleteCapture(): return Capture( HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_COMPLETE), lambda packet: hci_packets.ConnectionCompleteView( HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_COMPLETE))) return Capture(HciMatchers.EventWithCode(hci.EventCode.CONNECTION_COMPLETE), lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def DisconnectionCompleteCapture(): return Capture( HciMatchers.EventWithCode(hci_packets.EventCode.DISCONNECTION_COMPLETE), lambda packet: hci_packets.DisconnectionCompleteView( HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.DISCONNECTION_COMPLETE))) return Capture(HciMatchers.EventWithCode(hci.EventCode.DISCONNECTION_COMPLETE), lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def LeConnectionCompleteCapture(): Loading @@ -111,9 +95,8 @@ class HciCaptures(object): @staticmethod def SimplePairingCompleteCapture(): return Capture(HciMatchers.EventWithCode(hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE), lambda packet: hci_packets.SimplePairingCompleteView( HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE))) return Capture(HciMatchers.EventWithCode(hci.EventCode.SIMPLE_PAIRING_COMPLETE), lambda packet: hci.Event.parse_all(packet.payload)) class L2capCaptures(object): Loading Loading @@ -147,8 +130,8 @@ class L2capCaptures(object): @staticmethod def CreditBasedConnectionRequest(psm): return Capture( L2capMatchers.CreditBasedConnectionRequest(psm), L2capCaptures._extract_credit_based_connection_request) return Capture(L2capMatchers.CreditBasedConnectionRequest(psm), L2capCaptures._extract_credit_based_connection_request) @staticmethod def _extract_credit_based_connection_request(packet): Loading system/blueberry/tests/gd/cert/cert_self_test.py +28 −23 Original line number Diff line number Diff line Loading @@ -28,8 +28,8 @@ from blueberry.tests.gd.cert.behavior import ReplyStage from blueberry.tests.gd.cert.event_stream import EventStream, FilteringEventStream from blueberry.tests.gd.cert.metadata import metadata from blueberry.tests.gd.cert.truth import assertThat from bluetooth_packets_python3 import hci_packets from bluetooth_packets_python3 import l2cap_packets import hci_packets as hci from mobly import asserts from mobly import signals Loading Loading @@ -142,8 +142,9 @@ class CertSelfTest(base_test.BaseTestClass): def test_assert_occurs_at_least_passes(self): with EventStream(FetchEvents(events=[1, 2, 3, 1, 2, 3], delay_ms=40)) as event_stream: event_stream.assert_event_occurs( lambda data: data.value_ == 1, timeout=timedelta(milliseconds=300), at_least_times=2) event_stream.assert_event_occurs(lambda data: data.value_ == 1, timeout=timedelta(milliseconds=300), at_least_times=2) def test_assert_occurs_passes(self): with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: Loading @@ -160,14 +161,16 @@ class CertSelfTest(base_test.BaseTestClass): def test_assert_occurs_at_most_passes(self): with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: event_stream.assert_event_occurs_at_most( lambda data: data.value_ < 4, timeout=timedelta(seconds=1), at_most_times=3) event_stream.assert_event_occurs_at_most(lambda data: data.value_ < 4, timeout=timedelta(seconds=1), at_most_times=3) def test_assert_occurs_at_most_fails(self): try: with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: event_stream.assert_event_occurs_at_most( lambda data: data.value_ > 1, timeout=timedelta(seconds=1), at_most_times=2) event_stream.assert_event_occurs_at_most(lambda data: data.value_ > 1, timeout=timedelta(seconds=1), at_most_times=2) except Exception as e: logging.debug(e) return True # Failed as expected Loading @@ -179,12 +182,14 @@ class CertSelfTest(base_test.BaseTestClass): def test_nested_packets(self): handle = 123 inside = hci_packets.ReadScanEnableBuilder() logging.debug(inside.Serialize()) inside = hci.ReadScanEnable() logging.debug(inside.serialize()) logging.debug("building outside") outside = hci_packets.AclBuilder(handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, inside) logging.debug(outside.Serialize()) outside = hci.Acl(handle=handle, packet_boundary_flag=hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, broadcast_flag=hci.BroadcastFlag.POINT_TO_POINT, payload=inside.serialize()) logging.debug(outside.serialize()) logging.debug("Done!") def test_l2cap_config_options(self): Loading @@ -199,10 +204,12 @@ class CertSelfTest(base_test.BaseTestClass): [mtu_opt, fcs_opt]) request_b_frame = l2cap_packets.BasicFrameBuilder(0x01, request) handle = 123 wrapped = hci_packets.AclBuilder(handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, request_b_frame) wrapped = hci.Acl(handle=handle, packet_boundary_flag=hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, broadcast_flag=hci.BroadcastFlag.POINT_TO_POINT, payload=bytes(request_b_frame.Serialize())) # Size is ACL (4) + L2CAP (4) + Configure (8) + MTU (4) + FCS (3) asserts.assert_true(len(wrapped.Serialize()) == 23, "Packet serialized incorrectly") asserts.assert_true(len(wrapped.serialize()) == 23, "Packet serialized incorrectly") def test_assertThat_boolean_success(self): assertThat(True).isTrue() Loading Loading @@ -313,8 +320,7 @@ class CertSelfTest(base_test.BaseTestClass): def test_assertThat_emitsNone_passes(self): with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: assertThat(event_stream).emitsNone( lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15)).thenNone( assertThat(event_stream).emitsNone(lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15)).thenNone( lambda data: data.value_ == 5, timeout=timedelta(seconds=0.15)) def test_assertThat_emitsNone_passes_after_1_second(self): Loading @@ -332,8 +338,8 @@ class CertSelfTest(base_test.BaseTestClass): def test_assertThat_emitsNone_zero_passes(self): with EventStream(FetchEvents(events=[], delay_ms=50)) as event_stream: assertThat(event_stream).emitsNone(timeout=timedelta(milliseconds=10)).thenNone( timeout=timedelta(milliseconds=10)) assertThat(event_stream).emitsNone(timeout=timedelta(milliseconds=10)).thenNone(timeout=timedelta( milliseconds=10)) def test_assertThat_emitsNone_zero_passes_after_one_second(self): with EventStream(FetchEvents([1], delay_ms=1500)) as event_stream: Loading Loading @@ -449,8 +455,7 @@ class CertSelfTest(base_test.BaseTestClass): asserts.assert_true("pts_test_name" in e.extras, msg=("pts_test_name not in extra: %s" % str(e.extras))) asserts.assert_equal(e.extras["pts_test_name"], "Hello world") trace_str = traceback.format_exc() asserts.assert_true( "raise ValueError(failure_argument)" in trace_str, asserts.assert_true("raise ValueError(failure_argument)" in trace_str, msg="Failed test method not in error stack trace: %s" % trace_str) else: asserts.fail("Must throw an exception using @metadata decorator") Loading system/blueberry/tests/gd/cert/matchers.py +70 −104 File changed.Preview size limit exceeded, changes collapsed. Show changes system/blueberry/tests/gd/cert/py_acl_manager.py +7 −6 Original line number Diff line number Diff line Loading @@ -20,9 +20,10 @@ from blueberry.tests.gd.cert.event_stream import IEventStream from blueberry.tests.gd.cert.captures import HciCaptures from blueberry.tests.gd.cert.closable import Closable from blueberry.tests.gd.cert.closable import safeClose from bluetooth_packets_python3 import hci_packets from blueberry.tests.gd.cert.truth import assertThat from blueberry.facade.hci import acl_manager_facade_pb2 as acl_manager_facade from blueberry.utils import bluetooth import hci_packets as hci class PyAclManagerAclConnection(IEventStream, Closable): Loading @@ -35,7 +36,7 @@ class PyAclManagerAclConnection(IEventStream, Closable): self.acl_stream = EventStream(self.acl_manager.FetchAclData(acl_manager_facade.HandleMsg(handle=self.handle))) def disconnect(self, reason): packet_bytes = bytes(hci_packets.DisconnectBuilder(self.handle, reason).Serialize()) packet_bytes = hci.Disconnect(connection_handle=self.handle, reason=reason).serialize() self.acl_manager.ConnectionCommand(acl_manager_facade.ConnectionCommandMsg(packet=packet_bytes)) def close(self): Loading @@ -45,7 +46,7 @@ class PyAclManagerAclConnection(IEventStream, Closable): def wait_for_disconnection_complete(self): disconnection_complete = HciCaptures.DisconnectionCompleteCapture() assertThat(self.connection_event_stream).emits(disconnection_complete) self.disconnect_reason = disconnection_complete.get().GetReason() self.disconnect_reason = disconnection_complete.get().reason def send(self, data): self.acl_manager.SendAclData(acl_manager_facade.AclData(handle=self.handle, payload=bytes(data))) Loading @@ -72,7 +73,7 @@ class PyAclManager: def initiate_connection(self, remote_addr): assertThat(self.outgoing_connection_event_stream).isNone() remote_addr_bytes = bytes(remote_addr, 'utf8') if type(remote_addr) is str else bytes(remote_addr) remote_addr_bytes = remote_addr if isinstance(remote_addr, bytes) else remote_addr.encode('utf-8') self.outgoing_connection_event_stream = EventStream( self.acl_manager.CreateConnection(acl_manager_facade.ConnectionMsg(address=remote_addr_bytes))) Loading @@ -80,8 +81,8 @@ class PyAclManager: connection_complete = HciCaptures.ConnectionCompleteCapture() assertThat(event_stream).emits(connection_complete) complete = connection_complete.get() handle = complete.GetConnectionHandle() address = complete.GetBdAddr() handle = complete.connection_handle address = repr(complete.bd_addr) return PyAclManagerAclConnection(self.acl_manager, address, handle, event_stream) def complete_incoming_connection(self): Loading Loading
system/Android.mk +12 −5 Original line number Diff line number Diff line Loading @@ -18,6 +18,9 @@ LOCAL_host_executables := \ $(HOST_OUT_EXECUTABLES)/bt_topshim_facade \ $(HOST_OUT_EXECUTABLES)/root-canal LOCAL_host_python_hci_packets_library := \ $(SOONG_OUT_DIR)/.intermediates/packages/modules/Bluetooth/system/gd/gd_hci_packets_python3_gen/gen/hci_packets.py LOCAL_host_python_extension_libraries := \ $(HOST_OUT_SHARED_LIBRARIES)/bluetooth_packets_python3.so Loading Loading @@ -88,15 +91,19 @@ bluetooth_cert_src_and_bin_zip := \ $(bluetooth_cert_src_and_bin_zip): PRIVATE_bluetooth_project_dir := $(LOCAL_bluetooth_project_dir) $(bluetooth_cert_src_and_bin_zip): PRIVATE_cert_test_sources := $(LOCAL_cert_test_sources) $(bluetooth_cert_src_and_bin_zip): PRIVATE_host_executables := $(LOCAL_host_executables) $(bluetooth_cert_src_and_bin_zip): PRIVATE_host_python_extension_libraries := $(LOCAL_host_python_extension_libraries) $(bluetooth_cert_src_and_bin_zip): PRIVATE_host_libraries := $(LOCAL_host_libraries) $(bluetooth_cert_src_and_bin_zip): PRIVATE_host_python_extension_libraries := $(LOCAL_host_python_extension_libraries) $(bluetooth_cert_src_and_bin_zip): PRIVATE_host_python_hci_packets_library := $(LOCAL_host_python_hci_packets_library) $(bluetooth_cert_src_and_bin_zip): PRIVATE_target_executables := $(LOCAL_target_executables) $(bluetooth_cert_src_and_bin_zip): PRIVATE_target_libraries := $(LOCAL_target_libraries) $(bluetooth_cert_src_and_bin_zip): $(SOONG_ZIP) $(LOCAL_cert_test_sources) \ $(LOCAL_host_executables) $(LOCAL_host_libraries) $(LOCAL_host_python_extension_libraries) \ $(LOCAL_host_executables) $(LOCAL_host_libraries) $(LOCAL_host_python_libraries) \ $(LOCAL_host_python_extension_libraries) \ $(LOCAL_host_python_hci_packets_library) \ $(LOCAL_target_executables) $(LOCAL_target_libraries) $(hide) $(SOONG_ZIP) -d -o $@ \ -C $(PRIVATE_bluetooth_project_dir) $(addprefix -f ,$(PRIVATE_cert_test_sources)) \ -C $(dir $(PRIVATE_host_python_hci_packets_library)) -f $(PRIVATE_host_python_hci_packets_library) \ -C $(HOST_OUT_EXECUTABLES) $(addprefix -f ,$(PRIVATE_host_executables)) \ -C $(HOST_OUT_SHARED_LIBRARIES) $(addprefix -f ,$(PRIVATE_host_python_extension_libraries)) \ -P lib64 \ Loading
system/blueberry/tests/gd/cert/captures.py +26 −43 Original line number Diff line number Diff line Loading @@ -15,7 +15,6 @@ # limitations under the License. import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import hci_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode from blueberry.tests.gd.cert.capture import Capture Loading @@ -23,42 +22,36 @@ from blueberry.tests.gd.cert.matchers import HciMatchers from blueberry.tests.gd.cert.matchers import L2capMatchers from blueberry.tests.gd.cert.matchers import SecurityMatchers from blueberry.facade.security.facade_pb2 import UiMsgType import hci_packets as hci class HalCaptures(object): @staticmethod def ReadBdAddrCompleteCapture(): return Capture( lambda packet: packet.payload[0:5] == b'\x0e\x0a\x01\x09\x10', lambda packet: hci_packets.ReadBdAddrCompleteView( hci_packets.CommandCompleteView( hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload)))))) return Capture(lambda packet: packet.payload[0:5] == b'\x0e\x0a\x01\x09\x10', lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def ConnectionRequestCapture(): return Capture( lambda packet: packet.payload[0:2] == b'\x04\x0a', lambda packet: hci_packets.ConnectionRequestView( hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload))))) return Capture(lambda packet: packet.payload[0:2] == b'\x04\x0a', lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def ConnectionCompleteCapture(): return Capture( lambda packet: packet.payload[0:3] == b'\x03\x0b\x00', lambda packet: hci_packets.ConnectionCompleteView( hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload))))) return Capture(lambda packet: packet.payload[0:3] == b'\x03\x0b\x00', lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def DisconnectionCompleteCapture(): return Capture( lambda packet: packet.payload[0:2] == b'\x05\x04', lambda packet: hci_packets.DisconnectionCompleteView( hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload))))) return Capture(lambda packet: packet.payload[0:2] == b'\x05\x04', lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def LeConnectionCompleteCapture(): return Capture( lambda packet: packet.payload[0] == 0x3e and (packet.payload[2] == 0x01 or packet.payload[2] == 0x0a), lambda packet: hci_packets.LeConnectionCompleteView( hci_packets.LeMetaEventView( hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload)))))) lambda packet: hci.Event.parse_all(packet.payload)) class HciCaptures(object): Loading @@ -66,43 +59,34 @@ class HciCaptures(object): @staticmethod def ReadLocalOobDataCompleteCapture(): return Capture( HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_DATA), lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_DATA) ) HciMatchers.CommandComplete(hci.OpCode.READ_LOCAL_OOB_DATA), lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci.OpCode.READ_LOCAL_OOB_DATA)) @staticmethod def ReadLocalOobExtendedDataCompleteCapture(): return Capture( HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA), lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA) ) HciMatchers.CommandComplete(hci.OpCode.READ_LOCAL_OOB_EXTENDED_DATA), lambda packet: HciMatchers. ExtractMatchingCommandComplete(packet.payload, hci.OpCode.READ_LOCAL_OOB_EXTENDED_DATA)) @staticmethod def ReadBdAddrCompleteCapture(): return Capture( HciMatchers.CommandComplete(hci_packets.OpCode.READ_BD_ADDR), lambda packet: hci_packets.ReadBdAddrCompleteView(HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_BD_ADDR))) return Capture(HciMatchers.CommandComplete(hci.OpCode.READ_BD_ADDR), lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def ConnectionRequestCapture(): return Capture( HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_REQUEST), lambda packet: hci_packets.ConnectionRequestView( HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_REQUEST))) return Capture(HciMatchers.EventWithCode(hci.EventCode.CONNECTION_REQUEST), lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def ConnectionCompleteCapture(): return Capture( HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_COMPLETE), lambda packet: hci_packets.ConnectionCompleteView( HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_COMPLETE))) return Capture(HciMatchers.EventWithCode(hci.EventCode.CONNECTION_COMPLETE), lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def DisconnectionCompleteCapture(): return Capture( HciMatchers.EventWithCode(hci_packets.EventCode.DISCONNECTION_COMPLETE), lambda packet: hci_packets.DisconnectionCompleteView( HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.DISCONNECTION_COMPLETE))) return Capture(HciMatchers.EventWithCode(hci.EventCode.DISCONNECTION_COMPLETE), lambda packet: hci.Event.parse_all(packet.payload)) @staticmethod def LeConnectionCompleteCapture(): Loading @@ -111,9 +95,8 @@ class HciCaptures(object): @staticmethod def SimplePairingCompleteCapture(): return Capture(HciMatchers.EventWithCode(hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE), lambda packet: hci_packets.SimplePairingCompleteView( HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE))) return Capture(HciMatchers.EventWithCode(hci.EventCode.SIMPLE_PAIRING_COMPLETE), lambda packet: hci.Event.parse_all(packet.payload)) class L2capCaptures(object): Loading Loading @@ -147,8 +130,8 @@ class L2capCaptures(object): @staticmethod def CreditBasedConnectionRequest(psm): return Capture( L2capMatchers.CreditBasedConnectionRequest(psm), L2capCaptures._extract_credit_based_connection_request) return Capture(L2capMatchers.CreditBasedConnectionRequest(psm), L2capCaptures._extract_credit_based_connection_request) @staticmethod def _extract_credit_based_connection_request(packet): Loading
system/blueberry/tests/gd/cert/cert_self_test.py +28 −23 Original line number Diff line number Diff line Loading @@ -28,8 +28,8 @@ from blueberry.tests.gd.cert.behavior import ReplyStage from blueberry.tests.gd.cert.event_stream import EventStream, FilteringEventStream from blueberry.tests.gd.cert.metadata import metadata from blueberry.tests.gd.cert.truth import assertThat from bluetooth_packets_python3 import hci_packets from bluetooth_packets_python3 import l2cap_packets import hci_packets as hci from mobly import asserts from mobly import signals Loading Loading @@ -142,8 +142,9 @@ class CertSelfTest(base_test.BaseTestClass): def test_assert_occurs_at_least_passes(self): with EventStream(FetchEvents(events=[1, 2, 3, 1, 2, 3], delay_ms=40)) as event_stream: event_stream.assert_event_occurs( lambda data: data.value_ == 1, timeout=timedelta(milliseconds=300), at_least_times=2) event_stream.assert_event_occurs(lambda data: data.value_ == 1, timeout=timedelta(milliseconds=300), at_least_times=2) def test_assert_occurs_passes(self): with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: Loading @@ -160,14 +161,16 @@ class CertSelfTest(base_test.BaseTestClass): def test_assert_occurs_at_most_passes(self): with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: event_stream.assert_event_occurs_at_most( lambda data: data.value_ < 4, timeout=timedelta(seconds=1), at_most_times=3) event_stream.assert_event_occurs_at_most(lambda data: data.value_ < 4, timeout=timedelta(seconds=1), at_most_times=3) def test_assert_occurs_at_most_fails(self): try: with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: event_stream.assert_event_occurs_at_most( lambda data: data.value_ > 1, timeout=timedelta(seconds=1), at_most_times=2) event_stream.assert_event_occurs_at_most(lambda data: data.value_ > 1, timeout=timedelta(seconds=1), at_most_times=2) except Exception as e: logging.debug(e) return True # Failed as expected Loading @@ -179,12 +182,14 @@ class CertSelfTest(base_test.BaseTestClass): def test_nested_packets(self): handle = 123 inside = hci_packets.ReadScanEnableBuilder() logging.debug(inside.Serialize()) inside = hci.ReadScanEnable() logging.debug(inside.serialize()) logging.debug("building outside") outside = hci_packets.AclBuilder(handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, inside) logging.debug(outside.Serialize()) outside = hci.Acl(handle=handle, packet_boundary_flag=hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, broadcast_flag=hci.BroadcastFlag.POINT_TO_POINT, payload=inside.serialize()) logging.debug(outside.serialize()) logging.debug("Done!") def test_l2cap_config_options(self): Loading @@ -199,10 +204,12 @@ class CertSelfTest(base_test.BaseTestClass): [mtu_opt, fcs_opt]) request_b_frame = l2cap_packets.BasicFrameBuilder(0x01, request) handle = 123 wrapped = hci_packets.AclBuilder(handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, request_b_frame) wrapped = hci.Acl(handle=handle, packet_boundary_flag=hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, broadcast_flag=hci.BroadcastFlag.POINT_TO_POINT, payload=bytes(request_b_frame.Serialize())) # Size is ACL (4) + L2CAP (4) + Configure (8) + MTU (4) + FCS (3) asserts.assert_true(len(wrapped.Serialize()) == 23, "Packet serialized incorrectly") asserts.assert_true(len(wrapped.serialize()) == 23, "Packet serialized incorrectly") def test_assertThat_boolean_success(self): assertThat(True).isTrue() Loading Loading @@ -313,8 +320,7 @@ class CertSelfTest(base_test.BaseTestClass): def test_assertThat_emitsNone_passes(self): with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: assertThat(event_stream).emitsNone( lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15)).thenNone( assertThat(event_stream).emitsNone(lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15)).thenNone( lambda data: data.value_ == 5, timeout=timedelta(seconds=0.15)) def test_assertThat_emitsNone_passes_after_1_second(self): Loading @@ -332,8 +338,8 @@ class CertSelfTest(base_test.BaseTestClass): def test_assertThat_emitsNone_zero_passes(self): with EventStream(FetchEvents(events=[], delay_ms=50)) as event_stream: assertThat(event_stream).emitsNone(timeout=timedelta(milliseconds=10)).thenNone( timeout=timedelta(milliseconds=10)) assertThat(event_stream).emitsNone(timeout=timedelta(milliseconds=10)).thenNone(timeout=timedelta( milliseconds=10)) def test_assertThat_emitsNone_zero_passes_after_one_second(self): with EventStream(FetchEvents([1], delay_ms=1500)) as event_stream: Loading Loading @@ -449,8 +455,7 @@ class CertSelfTest(base_test.BaseTestClass): asserts.assert_true("pts_test_name" in e.extras, msg=("pts_test_name not in extra: %s" % str(e.extras))) asserts.assert_equal(e.extras["pts_test_name"], "Hello world") trace_str = traceback.format_exc() asserts.assert_true( "raise ValueError(failure_argument)" in trace_str, asserts.assert_true("raise ValueError(failure_argument)" in trace_str, msg="Failed test method not in error stack trace: %s" % trace_str) else: asserts.fail("Must throw an exception using @metadata decorator") Loading
system/blueberry/tests/gd/cert/matchers.py +70 −104 File changed.Preview size limit exceeded, changes collapsed. Show changes
system/blueberry/tests/gd/cert/py_acl_manager.py +7 −6 Original line number Diff line number Diff line Loading @@ -20,9 +20,10 @@ from blueberry.tests.gd.cert.event_stream import IEventStream from blueberry.tests.gd.cert.captures import HciCaptures from blueberry.tests.gd.cert.closable import Closable from blueberry.tests.gd.cert.closable import safeClose from bluetooth_packets_python3 import hci_packets from blueberry.tests.gd.cert.truth import assertThat from blueberry.facade.hci import acl_manager_facade_pb2 as acl_manager_facade from blueberry.utils import bluetooth import hci_packets as hci class PyAclManagerAclConnection(IEventStream, Closable): Loading @@ -35,7 +36,7 @@ class PyAclManagerAclConnection(IEventStream, Closable): self.acl_stream = EventStream(self.acl_manager.FetchAclData(acl_manager_facade.HandleMsg(handle=self.handle))) def disconnect(self, reason): packet_bytes = bytes(hci_packets.DisconnectBuilder(self.handle, reason).Serialize()) packet_bytes = hci.Disconnect(connection_handle=self.handle, reason=reason).serialize() self.acl_manager.ConnectionCommand(acl_manager_facade.ConnectionCommandMsg(packet=packet_bytes)) def close(self): Loading @@ -45,7 +46,7 @@ class PyAclManagerAclConnection(IEventStream, Closable): def wait_for_disconnection_complete(self): disconnection_complete = HciCaptures.DisconnectionCompleteCapture() assertThat(self.connection_event_stream).emits(disconnection_complete) self.disconnect_reason = disconnection_complete.get().GetReason() self.disconnect_reason = disconnection_complete.get().reason def send(self, data): self.acl_manager.SendAclData(acl_manager_facade.AclData(handle=self.handle, payload=bytes(data))) Loading @@ -72,7 +73,7 @@ class PyAclManager: def initiate_connection(self, remote_addr): assertThat(self.outgoing_connection_event_stream).isNone() remote_addr_bytes = bytes(remote_addr, 'utf8') if type(remote_addr) is str else bytes(remote_addr) remote_addr_bytes = remote_addr if isinstance(remote_addr, bytes) else remote_addr.encode('utf-8') self.outgoing_connection_event_stream = EventStream( self.acl_manager.CreateConnection(acl_manager_facade.ConnectionMsg(address=remote_addr_bytes))) Loading @@ -80,8 +81,8 @@ class PyAclManager: connection_complete = HciCaptures.ConnectionCompleteCapture() assertThat(event_stream).emits(connection_complete) complete = connection_complete.get() handle = complete.GetConnectionHandle() address = complete.GetBdAddr() handle = complete.connection_handle address = repr(complete.bd_addr) return PyAclManagerAclConnection(self.acl_manager, address, handle, event_stream) def complete_incoming_connection(self): Loading