Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2d98eb67 authored by Treehugger Robot's avatar Treehugger Robot Committed by Automerger Merge Worker
Browse files

Merge changes from topic "gd_l2cap_init_security" am: 51b22919 am: ab7ed972

Original change: https://android-review.googlesource.com/c/platform/system/bt/+/1551780

MUST ONLY BE SUBMITTED BY AUTOMERGER

Change-Id: I096b15556da78eecc7544221ec1aec5208cb1642
parents 9c9f5e61 ab7ed972
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -165,6 +165,14 @@ class L2capCaptures(object):
        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE)
        return l2cap_packets.LeCreditBasedConnectionResponseView(frame)

    @staticmethod
    def LinkSecurityInterfaceCallbackEvent(type):
        return Capture(L2capMatchers.LinkSecurityInterfaceCallbackEvent(type), L2capCaptures._extract_address)

    @staticmethod
    def _extract_address(packet):
        return packet.address


class SecurityCaptures(object):

+5 −1
Original line number Diff line number Diff line
@@ -144,7 +144,7 @@ class HciMatchers(object):

    @staticmethod
    def LogEventCode():
        return lambda event: logging.info("Received event: %x" % hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(event.event))).GetEventCode())
        return lambda event: logging.info("Received event: %x" % hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(event.payload))).GetEventCode())

    @staticmethod
    def LinkKeyRequest():
@@ -698,6 +698,10 @@ class L2capMatchers(object):
        return response.GetResult() == result and (result != LeCreditBasedConnectionResponseResult.SUCCESS or
                                                   response.GetDestinationCid() != 0)

    @staticmethod
    def LinkSecurityInterfaceCallbackEvent(type):
        return lambda event: True if event.event_type == type else False


class SecurityMatchers(object):

+59 −2
Original line number Diff line number Diff line
@@ -17,14 +17,18 @@
from google.protobuf import empty_pb2 as empty_proto

from l2cap.classic import facade_pb2 as l2cap_facade_pb2
from l2cap.classic.facade_pb2 import LinkSecurityInterfaceCallbackEventType
from l2cap.le import facade_pb2 as l2cap_le_facade_pb2
from l2cap.le.facade_pb2 import SecurityLevel
from bluetooth_packets_python3 import hci_packets
from bluetooth_packets_python3 import l2cap_packets
from cert.event_stream import FilteringEventStream
from cert.event_stream import EventStream, IEventStream
from cert.closable import Closable, safeClose
from cert.truth import assertThat
from cert.py_hci import PyHci
from cert.matchers import HciMatchers
from cert.matchers import L2capMatchers
from cert.truth import assertThat
from facade import common_pb2 as common


@@ -69,13 +73,20 @@ class _ClassicConnectionResponseFutureWrapper(object):

class PyL2cap(Closable):

    def __init__(self, device, cert_address):
    def __init__(self, device, cert_address, has_security=False):
        self._device = device
        self._cert_address = cert_address
        self._hci = PyHci(device)
        self._l2cap_stream = EventStream(self._device.l2cap.FetchL2capData(empty_proto.Empty()))
        self._security_connection_event_stream = EventStream(
            self._device.l2cap.FetchSecurityConnectionEvents(empty_proto.Empty()))
        if has_security == False:
            self._hci.register_for_events(hci_packets.EventCode.LINK_KEY_REQUEST)

    def close(self):
        safeClose(self._l2cap_stream)
        safeClose(self._security_connection_event_stream)
        safeClose(self._hci)

    def register_dynamic_channel(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
        self._device.l2cap.SetDynamicChannel(
@@ -96,6 +107,52 @@ class PyL2cap(Closable):
    def get_channel_queue_buffer_size(self):
        return self._device.l2cap.GetChannelQueueDepth(empty_proto.Empty()).size

    def initiate_connection_for_security(self):
        """
        Establish an ACL for the specific purpose of pairing devices
        """
        self._device.l2cap.InitiateConnectionForSecurity(self._cert_address)

    def get_security_connection_event_stream(self):
        """
        Stream of Link related events.  Events are returned with an address.
        Events map to the LinkSecurityInterfaceListener callbacks
        """
        return self._security_connection_event_stream

    def security_link_hold(self):
        """
        Holds open the ACL indefinitely allowing for the security handshake
        to take place
        """
        self._device.l2cap.SecurityLinkHold(self._cert_address)

    def security_link_ensure_authenticated(self):
        """
        Triggers authentication process by sending HCI event AUTHENTICATION_REQUESTED
        """
        self._device.l2cap.SecurityLinkEnsureAuthenticated(self._cert_address)

    def security_link_release(self):
        """
        Releases a Held open ACL allowing for the ACL to time out after the default time
        """
        self._device.l2cap.SecurityLinkRelease(self._cert_address)

    def security_link_disconnect(self):
        """
        Immediately release and disconnect ACL
        """
        self._device.l2cap.SecurityLinkDisconnect(self._cert_address)

    def verify_security_connection(self):
        """
        Verify that we get a connection and a link key request
        """
        assertThat(self.get_security_connection_event_stream()).emits(
            lambda event: event.event_type == LinkSecurityInterfaceCallbackEventType.ON_CONNECTED)
        assertThat(self._hci.get_event_stream()).emits(HciMatchers.LinkKeyRequest())


class PyLeL2capFixedChannel(IEventStream):

+4 −0
Original line number Diff line number Diff line
@@ -195,6 +195,10 @@ class CertL2cap(Closable, IHasBehaviors):
            self._device, 1, 1, self._acl.acl_stream, self._acl, control_channel=None)
        self._acl.acl_stream.register_callback(self._handle_control_packet)

    def accept_incoming_connection(self):
        self._acl_manager.listen_for_an_incoming_connection()
        self._acl = self._acl_manager.complete_incoming_connection()

    def open_channel(self, signal_id, psm, scid, fcs=None):
        self.control_channel.send(l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid))

+12 −0
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ from bluetooth_packets_python3.l2cap_packets import Poll
from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly
from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction
from cert.behavior import when, anything, wait_until
from cert.event_stream import EventStream
from cert.gd_base_test import GdBaseTestClass
from cert.matchers import L2capMatchers
from cert.metadata import metadata
@@ -1274,3 +1275,14 @@ class L2capTest(L2capTestBase):
        (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(mode=RetransmissionFlowControlMode.ERTM)
        cert_channel.send_configure_request(CertL2cap.config_option_basic_explicit())
        cert_channel.verify_disconnect_request()

    def test_initiate_connection_for_security(self):
        """
        This will test the PyL2cap API for initiating a connection for security
        via the security api
        """
        self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
        self.cert.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
        self.dut_l2cap.initiate_connection_for_security()
        self.cert_l2cap.accept_incoming_connection()
        self.dut_l2cap.verify_security_connection()
Loading