Loading system/gd/cert/captures.py +8 −0 Original line number Diff line number Diff line Loading @@ -165,6 +165,14 @@ class L2capCaptures(object): frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE) return l2cap_packets.LeCreditBasedConnectionResponseView(frame) @staticmethod def LinkSecurityInterfaceCallbackEvent(type): return Capture(L2capMatchers.LinkSecurityInterfaceCallbackEvent(type), L2capCaptures._extract_address) @staticmethod def _extract_address(packet): return packet.address class SecurityCaptures(object): Loading system/gd/cert/matchers.py +5 −1 Original line number Diff line number Diff line Loading @@ -144,7 +144,7 @@ class HciMatchers(object): @staticmethod def LogEventCode(): return lambda event: logging.info("Received event: %x" % hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(event.event))).GetEventCode()) return lambda event: logging.info("Received event: %x" % hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(event.payload))).GetEventCode()) @staticmethod def LinkKeyRequest(): Loading Loading @@ -698,6 +698,10 @@ class L2capMatchers(object): return response.GetResult() == result and (result != LeCreditBasedConnectionResponseResult.SUCCESS or response.GetDestinationCid() != 0) @staticmethod def LinkSecurityInterfaceCallbackEvent(type): return lambda event: True if event.event_type == type else False class SecurityMatchers(object): Loading system/gd/cert/py_l2cap.py +59 −2 Original line number Diff line number Diff line Loading @@ -17,14 +17,18 @@ from google.protobuf import empty_pb2 as empty_proto from l2cap.classic import facade_pb2 as l2cap_facade_pb2 from l2cap.classic.facade_pb2 import LinkSecurityInterfaceCallbackEventType from l2cap.le import facade_pb2 as l2cap_le_facade_pb2 from l2cap.le.facade_pb2 import SecurityLevel from bluetooth_packets_python3 import hci_packets from bluetooth_packets_python3 import l2cap_packets from cert.event_stream import FilteringEventStream from cert.event_stream import EventStream, IEventStream from cert.closable import Closable, safeClose from cert.truth import assertThat from cert.py_hci import PyHci from cert.matchers import HciMatchers from cert.matchers import L2capMatchers from cert.truth import assertThat from facade import common_pb2 as common Loading Loading @@ -69,13 +73,20 @@ class _ClassicConnectionResponseFutureWrapper(object): class PyL2cap(Closable): def __init__(self, device, cert_address): def __init__(self, device, cert_address, has_security=False): self._device = device self._cert_address = cert_address self._hci = PyHci(device) self._l2cap_stream = EventStream(self._device.l2cap.FetchL2capData(empty_proto.Empty())) self._security_connection_event_stream = EventStream( self._device.l2cap.FetchSecurityConnectionEvents(empty_proto.Empty())) if has_security == False: self._hci.register_for_events(hci_packets.EventCode.LINK_KEY_REQUEST) def close(self): safeClose(self._l2cap_stream) safeClose(self._security_connection_event_stream) safeClose(self._hci) def register_dynamic_channel(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC): self._device.l2cap.SetDynamicChannel( Loading @@ -96,6 +107,52 @@ class PyL2cap(Closable): def get_channel_queue_buffer_size(self): return self._device.l2cap.GetChannelQueueDepth(empty_proto.Empty()).size def initiate_connection_for_security(self): """ Establish an ACL for the specific purpose of pairing devices """ self._device.l2cap.InitiateConnectionForSecurity(self._cert_address) def get_security_connection_event_stream(self): """ Stream of Link related events. Events are returned with an address. Events map to the LinkSecurityInterfaceListener callbacks """ return self._security_connection_event_stream def security_link_hold(self): """ Holds open the ACL indefinitely allowing for the security handshake to take place """ self._device.l2cap.SecurityLinkHold(self._cert_address) def security_link_ensure_authenticated(self): """ Triggers authentication process by sending HCI event AUTHENTICATION_REQUESTED """ self._device.l2cap.SecurityLinkEnsureAuthenticated(self._cert_address) def security_link_release(self): """ Releases a Held open ACL allowing for the ACL to time out after the default time """ self._device.l2cap.SecurityLinkRelease(self._cert_address) def security_link_disconnect(self): """ Immediately release and disconnect ACL """ self._device.l2cap.SecurityLinkDisconnect(self._cert_address) def verify_security_connection(self): """ Verify that we get a connection and a link key request """ assertThat(self.get_security_connection_event_stream()).emits( lambda event: event.event_type == LinkSecurityInterfaceCallbackEventType.ON_CONNECTED) assertThat(self._hci.get_event_stream()).emits(HciMatchers.LinkKeyRequest()) class PyLeL2capFixedChannel(IEventStream): Loading system/gd/l2cap/classic/cert/cert_l2cap.py +4 −0 Original line number Diff line number Diff line Loading @@ -195,6 +195,10 @@ class CertL2cap(Closable, IHasBehaviors): self._device, 1, 1, self._acl.acl_stream, self._acl, control_channel=None) self._acl.acl_stream.register_callback(self._handle_control_packet) def accept_incoming_connection(self): self._acl_manager.listen_for_an_incoming_connection() self._acl = self._acl_manager.complete_incoming_connection() def open_channel(self, signal_id, psm, scid, fcs=None): self.control_channel.send(l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid)) Loading system/gd/l2cap/classic/cert/l2cap_test.py +12 −0 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ from bluetooth_packets_python3.l2cap_packets import Poll from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction from cert.behavior import when, anything, wait_until from cert.event_stream import EventStream from cert.gd_base_test import GdBaseTestClass from cert.matchers import L2capMatchers from cert.metadata import metadata Loading Loading @@ -1274,3 +1275,14 @@ class L2capTest(L2capTestBase): (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(mode=RetransmissionFlowControlMode.ERTM) cert_channel.send_configure_request(CertL2cap.config_option_basic_explicit()) cert_channel.verify_disconnect_request() def test_initiate_connection_for_security(self): """ This will test the PyL2cap API for initiating a connection for security via the security api """ self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) self.cert.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) self.dut_l2cap.initiate_connection_for_security() self.cert_l2cap.accept_incoming_connection() self.dut_l2cap.verify_security_connection() Loading
system/gd/cert/captures.py +8 −0 Original line number Diff line number Diff line Loading @@ -165,6 +165,14 @@ class L2capCaptures(object): frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE) return l2cap_packets.LeCreditBasedConnectionResponseView(frame) @staticmethod def LinkSecurityInterfaceCallbackEvent(type): return Capture(L2capMatchers.LinkSecurityInterfaceCallbackEvent(type), L2capCaptures._extract_address) @staticmethod def _extract_address(packet): return packet.address class SecurityCaptures(object): Loading
system/gd/cert/matchers.py +5 −1 Original line number Diff line number Diff line Loading @@ -144,7 +144,7 @@ class HciMatchers(object): @staticmethod def LogEventCode(): return lambda event: logging.info("Received event: %x" % hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(event.event))).GetEventCode()) return lambda event: logging.info("Received event: %x" % hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(event.payload))).GetEventCode()) @staticmethod def LinkKeyRequest(): Loading Loading @@ -698,6 +698,10 @@ class L2capMatchers(object): return response.GetResult() == result and (result != LeCreditBasedConnectionResponseResult.SUCCESS or response.GetDestinationCid() != 0) @staticmethod def LinkSecurityInterfaceCallbackEvent(type): return lambda event: True if event.event_type == type else False class SecurityMatchers(object): Loading
system/gd/cert/py_l2cap.py +59 −2 Original line number Diff line number Diff line Loading @@ -17,14 +17,18 @@ from google.protobuf import empty_pb2 as empty_proto from l2cap.classic import facade_pb2 as l2cap_facade_pb2 from l2cap.classic.facade_pb2 import LinkSecurityInterfaceCallbackEventType from l2cap.le import facade_pb2 as l2cap_le_facade_pb2 from l2cap.le.facade_pb2 import SecurityLevel from bluetooth_packets_python3 import hci_packets from bluetooth_packets_python3 import l2cap_packets from cert.event_stream import FilteringEventStream from cert.event_stream import EventStream, IEventStream from cert.closable import Closable, safeClose from cert.truth import assertThat from cert.py_hci import PyHci from cert.matchers import HciMatchers from cert.matchers import L2capMatchers from cert.truth import assertThat from facade import common_pb2 as common Loading Loading @@ -69,13 +73,20 @@ class _ClassicConnectionResponseFutureWrapper(object): class PyL2cap(Closable): def __init__(self, device, cert_address): def __init__(self, device, cert_address, has_security=False): self._device = device self._cert_address = cert_address self._hci = PyHci(device) self._l2cap_stream = EventStream(self._device.l2cap.FetchL2capData(empty_proto.Empty())) self._security_connection_event_stream = EventStream( self._device.l2cap.FetchSecurityConnectionEvents(empty_proto.Empty())) if has_security == False: self._hci.register_for_events(hci_packets.EventCode.LINK_KEY_REQUEST) def close(self): safeClose(self._l2cap_stream) safeClose(self._security_connection_event_stream) safeClose(self._hci) def register_dynamic_channel(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC): self._device.l2cap.SetDynamicChannel( Loading @@ -96,6 +107,52 @@ class PyL2cap(Closable): def get_channel_queue_buffer_size(self): return self._device.l2cap.GetChannelQueueDepth(empty_proto.Empty()).size def initiate_connection_for_security(self): """ Establish an ACL for the specific purpose of pairing devices """ self._device.l2cap.InitiateConnectionForSecurity(self._cert_address) def get_security_connection_event_stream(self): """ Stream of Link related events. Events are returned with an address. Events map to the LinkSecurityInterfaceListener callbacks """ return self._security_connection_event_stream def security_link_hold(self): """ Holds open the ACL indefinitely allowing for the security handshake to take place """ self._device.l2cap.SecurityLinkHold(self._cert_address) def security_link_ensure_authenticated(self): """ Triggers authentication process by sending HCI event AUTHENTICATION_REQUESTED """ self._device.l2cap.SecurityLinkEnsureAuthenticated(self._cert_address) def security_link_release(self): """ Releases a Held open ACL allowing for the ACL to time out after the default time """ self._device.l2cap.SecurityLinkRelease(self._cert_address) def security_link_disconnect(self): """ Immediately release and disconnect ACL """ self._device.l2cap.SecurityLinkDisconnect(self._cert_address) def verify_security_connection(self): """ Verify that we get a connection and a link key request """ assertThat(self.get_security_connection_event_stream()).emits( lambda event: event.event_type == LinkSecurityInterfaceCallbackEventType.ON_CONNECTED) assertThat(self._hci.get_event_stream()).emits(HciMatchers.LinkKeyRequest()) class PyLeL2capFixedChannel(IEventStream): Loading
system/gd/l2cap/classic/cert/cert_l2cap.py +4 −0 Original line number Diff line number Diff line Loading @@ -195,6 +195,10 @@ class CertL2cap(Closable, IHasBehaviors): self._device, 1, 1, self._acl.acl_stream, self._acl, control_channel=None) self._acl.acl_stream.register_callback(self._handle_control_packet) def accept_incoming_connection(self): self._acl_manager.listen_for_an_incoming_connection() self._acl = self._acl_manager.complete_incoming_connection() def open_channel(self, signal_id, psm, scid, fcs=None): self.control_channel.send(l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid)) Loading
system/gd/l2cap/classic/cert/l2cap_test.py +12 −0 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ from bluetooth_packets_python3.l2cap_packets import Poll from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction from cert.behavior import when, anything, wait_until from cert.event_stream import EventStream from cert.gd_base_test import GdBaseTestClass from cert.matchers import L2capMatchers from cert.metadata import metadata Loading Loading @@ -1274,3 +1275,14 @@ class L2capTest(L2capTestBase): (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(mode=RetransmissionFlowControlMode.ERTM) cert_channel.send_configure_request(CertL2cap.config_option_basic_explicit()) cert_channel.verify_disconnect_request() def test_initiate_connection_for_security(self): """ This will test the PyL2cap API for initiating a connection for security via the security api """ self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) self.cert.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) self.dut_l2cap.initiate_connection_for_security() self.cert_l2cap.accept_incoming_connection() self.dut_l2cap.verify_security_connection()