Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 611c9dfc authored by Hansong Zhang's avatar Hansong Zhang
Browse files

LeL2cap Cert: Allow DUT initiate connection request

Add test cases for testing DUT to initiate COC connection request. Fix
some bugs with LE signalling manager and link.

Test: cert/run --host
Bug: 145707677
Change-Id: I27dcb047cecee219793ccbba3d735edc5fab4551
parent 58d3ee2e
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -73,6 +73,18 @@ class L2capCaptures(object):
            packet, CommandCode.CONNECTION_RESPONSE)
        return l2cap_packets.ConnectionResponseView(frame)

    @staticmethod
    def CreditBasedConnectionRequest(psm):
        return Capture(
            L2capMatchers.CreditBasedConnectionRequest(psm),
            L2capCaptures._extract_credit_based_connection_request)

    @staticmethod
    def _extract_credit_based_connection_request(packet):
        frame = L2capMatchers.le_control_frame_with_code(
            packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
        return l2cap_packets.LeCreditBasedConnectionRequestView(frame)

    @staticmethod
    def CreditBasedConnectionResponse(scid):
        return Capture(
+11 −2
Original line number Diff line number Diff line
@@ -58,8 +58,8 @@ class L2capMatchers(object):
        return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)

    @staticmethod
    def CreditBasedConnectionRequest():
        return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
    def CreditBasedConnectionRequest(psm):
        return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm)

    @staticmethod
    def CreditBasedConnectionResponse(
@@ -281,6 +281,15 @@ class L2capMatchers(object):
            return False
        return True

    @staticmethod
    def _is_matching_credit_based_connection_request(packet, psm):
        frame = L2capMatchers.le_control_frame_with_code(
            packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
        if frame is None:
            return False
        request = l2cap_packets.LeCreditBasedConnectionRequestView(frame)
        return request.GetLePsm() == psm

    @staticmethod
    def _is_matching_credit_based_connection_response(packet, scid, result):
        frame = L2capMatchers.le_control_frame_with_code(
+43 −3
Original line number Diff line number Diff line
@@ -17,8 +17,11 @@
from l2cap.classic import facade_pb2 as l2cap_facade_pb2
from l2cap.le import facade_pb2 as l2cap_le_facade_pb2
from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult
from cert.event_stream import EventStream
from cert.closable import Closable, safeClose
from cert.truth import assertThat
from facade import common_pb2 as common
from google.protobuf import empty_pb2 as empty_proto


@@ -63,6 +66,27 @@ class PyLeL2capChannel(object):
            l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload))


class CreditBasedConnectionResponseFutureWrapper(object):
    """
    The future object returned when we send a connection request from DUT. Can be used to get connection status and
    create the corresponding PyLeL2capChannel object later
    """

    def __init__(self, grpc_response_future, device, psm):
        self._grpc_response_future = grpc_response_future
        self._device = device
        self._psm = psm

    def get_status(self):
        return l2cap_packets.LeCreditBasedConnectionResponseResult(
            self._grpc_response_future.result().status)

    def get_channel(self):
        assertThat(self.get_status()).isEqualTo(
            l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS)
        return PyLeL2capChannel(self._device, self._psm)


class PyLeL2cap(Closable):

    def __init__(self, device):
@@ -76,8 +100,24 @@ class PyLeL2cap(Closable):
    def get_le_l2cap_stream(self):
        return self.le_l2cap_stream

    def open_credit_based_flow_control_channel(self, psm=0x33):
        # todo, I don't understand what SetDynamicChannel means?
    def register_coc(self, psm=0x33):
        self._device.l2cap_le.SetDynamicChannel(
            l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm))
            l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(
                psm=psm, enable=True))
        return PyLeL2capChannel(self._device, psm)

    def connect_coc_to_cert(self, psm=0x33):
        """
        Send open LE COC request to CERT. Get a future for connection result, to be used after CERT accepts request
        """
        self.register_coc(psm)
        # TODO: Update CERT device random address in ACL manager
        response_future = self._device.l2cap_le.OpenDynamicChannel.future(
            l2cap_le_facade_pb2.OpenDynamicChannelRequest(
                psm=psm,
                remote=common.BluetoothAddressWithType(
                    address=common.BluetoothAddress(
                        address=b"22:33:ff:ff:11:00"))))

        return CreditBasedConnectionResponseFutureWrapper(
            response_future, self._device, psm)
+49 −15
Original line number Diff line number Diff line
@@ -20,7 +20,8 @@ from cert.py_le_acl_manager import PyLeAclManager
from cert.truth import assertThat
import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import CommandCode
from bluetooth_packets_python3.l2cap_packets import LeCommandCode
from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult
from cert.event_stream import FilteringEventStream
from cert.event_stream import IEventStream
from cert.matchers import L2capMatchers
@@ -78,9 +79,9 @@ class CertLeL2cap(Closable):
        self._le_acl = None

        self.control_table = {
            CommandCode.DISCONNECTION_REQUEST:
            LeCommandCode.DISCONNECTION_REQUEST:
            self._on_disconnection_request_default,
            CommandCode.DISCONNECTION_RESPONSE:
            LeCommandCode.DISCONNECTION_RESPONSE:
            self._on_disconnection_response_default,
        }

@@ -120,6 +121,40 @@ class CertLeL2cap(Closable):
                                  self._get_acl_stream(), self._le_acl,
                                  self.control_channel)

    def verify_and_respond_open_channel_from_remote(
            self, psm=0x33,
            result=LeCreditBasedConnectionResponseResult.SUCCESS):
        request = L2capCaptures.CreditBasedConnectionRequest(psm)
        assertThat(self.control_channel).emits(request)
        (scid, dcid) = self._respond_connection_request_default(
            request.get(), result)
        return CertLeL2capChannel(self._device, scid, dcid,
                                  self._get_acl_stream(), self._le_acl,
                                  self.control_channel)

    def verify_and_reject_open_channel_from_remote(self, psm=0x33):
        request = L2capCaptures.CreditBasedConnectionRequest(psm)
        assertThat(self.control_channel).emits(request)
        sid = request.get().GetIdentifier()
        reject = l2cap_packets.LeCommandRejectNotUnderstoodBuilder(sid)
        self.control_channel.send(reject)

    def _respond_connection_request_default(
            self, request,
            result=LeCreditBasedConnectionResponseResult.SUCCESS):
        sid = request.GetIdentifier()
        their_scid = request.GetSourceCid()
        mtu = request.GetMtu()
        mps = request.GetMps()
        initial_credits = request.GetInitialCredits()
        # Here we use the same value - their scid as their scid
        our_scid = their_scid
        our_dcid = their_scid
        response = l2cap_packets.LeCreditBasedConnectionResponseBuilder(
            sid, our_scid, mtu, mps, initial_credits, result)
        self.control_channel.send(response)
        return (our_scid, our_dcid)

    # prefer to use channel abstraction instead, if at all possible
    def send_acl(self, packet):
        self._acl.send(packet.Serialize())
@@ -130,19 +165,18 @@ class CertLeL2cap(Closable):
    def _get_acl_stream(self):
        return self._le_acl_manager.get_le_acl_stream()

    def _on_disconnection_request_default(self, l2cap_control_view):
        disconnection_request = l2cap_packets.DisconnectionRequestView(
            l2cap_control_view)
    def _on_disconnection_request_default(self, request):
        disconnection_request = l2cap_packets.LeDisconnectionRequestView(
            request)
        sid = disconnection_request.GetIdentifier()
        scid = disconnection_request.GetSourceCid()
        dcid = disconnection_request.GetDestinationCid()
        disconnection_response = l2cap_packets.DisconnectionResponseBuilder(
            sid, dcid, scid)
        self.control_channel.send(disconnection_response)
        response = l2cap_packets.LeDisconnectionResponseBuilder(sid, dcid, scid)
        self.control_channel.send(response)

    def _on_disconnection_response_default(self, l2cap_control_view):
        disconnection_response = l2cap_packets.DisconnectionResponseView(
            l2cap_control_view)
    def _on_disconnection_response_default(self, request):
        disconnection_response = l2cap_packets.LeDisconnectionResponseView(
            request)

    def _handle_control_packet(self, l2cap_packet):
        packet_bytes = l2cap_packet.payload
@@ -150,8 +184,8 @@ class CertLeL2cap(Closable):
            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
        if l2cap_view.GetChannelId() != 5:
            return
        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
        fn = self.control_table.get(l2cap_control_view.GetCode())
        request = l2cap_packets.LeControlView(l2cap_view.GetPayload())
        fn = self.control_table.get(request.GetCode())
        if fn is not None:
            fn(l2cap_control_view)
            fn(request)
        return
+65 −26
Original line number Diff line number Diff line
@@ -27,7 +27,7 @@ from cert.matchers import L2capMatchers
from facade import common_pb2 as common
from facade import rootservice_pb2 as facade_rootservice
from google.protobuf import empty_pb2 as empty_proto
from l2cap.classic import facade_pb2 as l2cap_facade_pb2
from l2cap.le import facade_pb2 as l2cap_facade_pb2
from neighbor.facade import facade_pb2 as neighbor_facade
from hci.facade import acl_manager_facade_pb2 as acl_manager_facade
from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade
@@ -87,7 +87,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
            request)
        self.cert_l2cap.connect_le_acl(bytes(b'0D:05:04:03:02:01'))

    def _open_unvalidated_channel(self,
    def _open_channel_from_cert(self,
                                signal_id=1,
                                scid=0x0101,
                                psm=0x33,
@@ -95,18 +95,25 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
                                mps=100,
                                initial_credit=6):

        dut_channel = self.dut_l2cap.open_credit_based_flow_control_channel(psm)
        dut_channel = self.dut_l2cap.register_coc(psm)
        cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid, mtu,
                                                    mps, initial_credit)

        return (dut_channel, cert_channel)

    def _open_channel_from_dut(self, psm=0x33):
        response_future = self.dut_l2cap.connect_coc_to_cert(psm)
        cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm)
        dut_channel = response_future.get_channel()
        return (dut_channel, cert_channel)

    def test_segmentation(self):
        """
        L2CAP/COS/CFC/BV-01-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel(
        (dut_channel, cert_channel) = self._open_channel_from_cert(
            mtu=1000, mps=102)
        dut_channel.send(b'hello' * 20 + b'world')
        # The first LeInformation packet contains 2 bytes of SDU size.
@@ -121,7 +128,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        L2CAP/COS/CFC/BV-02-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel(
        (dut_channel, cert_channel) = self._open_channel_from_cert(
            mtu=1000, mps=202)
        dut_channel.send(b'hello' * 40)
        assertThat(cert_channel).emits(
@@ -132,7 +139,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        L2CAP/COS/CFC/BV-03-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        sdu_size_for_two_sample_packet = 12
        cert_channel.send_first_le_i_frame(sdu_size_for_two_sample_packet,
                                           SAMPLE_PACKET)
@@ -145,7 +152,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        L2CAP/COS/CFC/BV-04-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
        assertThat(self.dut_l2cap.le_l2cap_stream).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))
@@ -162,16 +169,6 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.LeCommandReject())

    def test_credit_based_connection_response_on_supported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-03-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        dut_channel.send(b'hello')
        assertThat(cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

    def test_credit_based_connection_request_unsupported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-05-C
@@ -192,7 +189,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        """
        self._setup_link_from_cert()
        (dut_channel,
         cert_channel) = self._open_unvalidated_channel(initial_credit=0)
         cert_channel) = self._open_channel_from_cert(initial_credit=0)
        for _ in range(4):
            dut_channel.send(b'hello')
        cert_channel.send_credits(1)
@@ -206,12 +203,54 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5),
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

    def test_le_credit_based_connection_request_legacy_peer(self):
        """
        L2CAP/LE/CFC/BV-01-C
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33)
        self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33)
        assertThat(response_future.get_status()).isNotEqualTo(
            LeCreditBasedConnectionResponseResult.SUCCESS)

    def test_le_credit_based_connection_request_on_supported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-02-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_dut()
        cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
        assertThat(self.dut_l2cap.le_l2cap_stream).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))

    def test_credit_based_connection_response_on_supported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-03-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        dut_channel.send(b'hello')
        assertThat(cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

    def test_credit_based_connection_request_on_an_unsupported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-04-C
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)
        assertThat(response_future.get_status()).isEqualTo(
            LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)

    def test_credit_exchange_exceed_initial_credits(self):
        """
        L2CAP/LE/CFC/BI-01-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        cert_channel.send_credits(65535)
        cert_channel.verify_disconnect_request()

@@ -220,5 +259,5 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        L2CAP/LE/CFC/BV-09-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        cert_channel.disconnect_and_verify()
Loading