Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c137bc39 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

LeL2cap: Add PTS test cases and allow verify from DUT

Bug: 145707677
Test: cert/run --host
Change-Id: I028566a37491c06bdc89864751e83844a33a894e
parent b43531ea
Loading
Loading
Loading
Loading
+33 −0
Original line number Diff line number Diff line
@@ -53,6 +53,10 @@ class L2capMatchers(object):
    def CommandReject():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT)

    @staticmethod
    def LeCommandReject():
        return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)

    @staticmethod
    def CreditBasedConnectionRequest():
        return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
@@ -66,6 +70,10 @@ class L2capMatchers(object):
    def LeDisconnectionRequest(scid, dcid):
        return lambda packet: L2capMatchers._is_matching_le_disconnection_request(packet, scid, dcid)

    @staticmethod
    def LeDisconnectionResponse(scid, dcid):
        return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid)

    @staticmethod
    def SFrame(req_seq=None, f=None, s=None, p=None):
        return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p)
@@ -78,11 +86,20 @@ class L2capMatchers(object):
    def Data(payload):
        return lambda packet: packet.GetPayload().GetBytes() == payload

    @staticmethod
    def FirstLeIFrame(payload, sdu_size):
        return lambda packet: L2capMatchers._is_matching_first_le_i_frame(packet, payload, sdu_size)

    # this is a hack - should be removed
    @staticmethod
    def PartialData(payload):
        return lambda packet: payload in packet.GetPayload().GetBytes()

    # this is a hack - should be removed
    @staticmethod
    def PacketPayloadRawData(payload):
        return lambda packet: payload in packet.payload

    @staticmethod
    def ExtractBasicFrame(scid):
        return lambda packet: L2capMatchers._basic_frame_for(packet, scid)
@@ -150,6 +167,12 @@ class L2capMatchers(object):
            return False
        return True

    @staticmethod
    def _is_matching_first_le_i_frame(packet, payload, sdu_size):
        first_le_i_frame = l2cap_packets.FirstLeInformationFrameView(packet)
        return first_le_i_frame.GetPayload().GetBytes(
        ) == payload and first_le_i_frame.GetL2capSduLength() == sdu_size

    @staticmethod
    def _control_frame(packet):
        if packet.GetChannelId() != 1:
@@ -206,6 +229,16 @@ class L2capMatchers(object):
        return response.GetSourceCid() == scid and response.GetDestinationCid(
        ) == dcid

    @staticmethod
    def _is_matching_le_disconnection_response(packet, scid, dcid):
        frame = L2capMatchers.le_control_frame_with_code(
            packet, LeCommandCode.DISCONNECTION_RESPONSE)
        if frame is None:
            return False
        response = l2cap_packets.LeDisconnectionResponseView(frame)
        return response.GetSourceCid() == scid and response.GetDestinationCid(
        ) == dcid

    @staticmethod
    def _is_matching_le_disconnection_request(packet, scid, dcid):
        frame = L2capMatchers.le_control_frame_with_code(
+33 −6
Original line number Diff line number Diff line
@@ -17,6 +17,9 @@
from l2cap.classic import facade_pb2 as l2cap_facade_pb2
from l2cap.le import facade_pb2 as l2cap_le_facade_pb2
from bluetooth_packets_python3 import l2cap_packets
from cert.event_stream import EventStream
from cert.closable import Closable, safeClose
from google.protobuf import empty_pb2 as empty_proto


class PyL2capChannel(object):
@@ -29,16 +32,15 @@ class PyL2capChannel(object):
        self._device.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload))

    def send_le(self, payload):
        self._device.l2cap_le.SendDynamicChannelPacket(
            l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload))


class PyL2cap(object):
class PyL2cap(Closable):

    def __init__(self, device):
        self._device = device

    def close(self):
        pass

    def open_channel(self,
                     psm=0x33,
                     mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
@@ -49,8 +51,33 @@ class PyL2cap(object):
                psm=psm, retransmission_mode=mode))
        return PyL2capChannel(self._device, psm)


class PyLeL2capChannel(object):

    def __init__(self, device, psm):
        self._device = device
        self._psm = psm

    def send(self, payload):
        self._device.l2cap_le.SendDynamicChannelPacket(
            l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload))


class PyLeL2cap(Closable):

    def __init__(self, device):
        self._device = device
        self.le_l2cap_stream = EventStream(
            self._device.l2cap_le.FetchL2capData(empty_proto.Empty()))

    def close(self):
        safeClose(self.le_l2cap_stream)

    def get_le_l2cap_stream(self):
        return self.le_l2cap_stream

    def open_credit_based_flow_control_channel(self, psm=0x33):
        # todo, I don't understand what SetDynamicChannel means?
        self._device.l2cap_le.SetDynamicChannel(
            l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm))
        return PyL2capChannel(self._device, psm)
        return PyLeL2capChannel(self._device, psm)
+1 −0
Original line number Diff line number Diff line
@@ -63,6 +63,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):

    def teardown_test(self):
        self.cert_l2cap.close()
        self.dut_l2cap.close()
        super().teardown_test()

    def cert_send_b_frame(self, b_frame):
+21 −92
Original line number Diff line number Diff line
@@ -46,6 +46,11 @@ class CertLeL2capChannel(IEventStream):
        frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet)
        self._acl.send(frame.Serialize())

    def send_first_le_i_frame(self, sdu_size, packet):
        frame = l2cap_packets.FirstLeInformationFrameBuilder(
            self._dcid, sdu_size, packet)
        self._acl.send(frame.Serialize())

    def disconnect_and_verify(self):
        assertThat(self._scid).isNotEqualTo(1)
        self._control_channel.send(
@@ -53,13 +58,16 @@ class CertLeL2capChannel(IEventStream):
                                                      self._scid))

        assertThat(self._control_channel).emits(
            L2capMatchers.DisconnectionResponse(self._scid, self._dcid))
            L2capMatchers.LeDisconnectionResponse(self._scid, self._dcid))

    def get_scid(self):
        return self._scid
    def verify_disconnect_request(self):
        assertThat(self._control_channel).emits(
            L2capMatchers.LeDisconnectionRequest(self._dcid, self._scid))

    def get_dcid(self):
        return self._dcid
    def send_credits(self, num_credits):
        self._control_channel.send(
            l2cap_packets.LeFlowControlCreditBuilder(2, self._scid,
                                                     num_credits))


class CertLeL2cap(Closable):
@@ -94,10 +102,16 @@ class CertLeL2cap(Closable):
            control_channel=None)
        self._get_acl_stream().register_callback(self._handle_control_packet)

    def open_channel(self, signal_id, psm, scid, initial_credit=6):
    def open_channel(self,
                     signal_id,
                     psm,
                     scid,
                     mtu=1000,
                     mps=100,
                     initial_credit=6):
        self.control_channel.send(
            l2cap_packets.LeCreditBasedConnectionRequestBuilder(
                signal_id, psm, scid, 2000, 1000, initial_credit))
                signal_id, psm, scid, mtu, mps, initial_credit))

        response = L2capCaptures.CreditBasedConnectionResponse(scid)
        assertThat(self.control_channel).emits(response)
@@ -116,91 +130,6 @@ class CertLeL2cap(Closable):
    def _get_acl_stream(self):
        return self._le_acl_manager.get_le_acl_stream()

    def _on_connection_request_default(self, l2cap_control_view):
        connection_request_view = l2cap_packets.ConnectionRequestView(
            l2cap_control_view)
        sid = connection_request_view.GetIdentifier()
        cid = connection_request_view.GetSourceCid()

        self.scid_to_dcid[cid] = cid

        connection_response = l2cap_packets.ConnectionResponseBuilder(
            sid, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS,
            l2cap_packets.ConnectionResponseStatus.
            NO_FURTHER_INFORMATION_AVAILABLE)
        self.control_channel.send(connection_response)
        return True

    def _on_connection_response_default(self, l2cap_control_view):
        connection_response_view = l2cap_packets.ConnectionResponseView(
            l2cap_control_view)
        sid = connection_response_view.GetIdentifier()
        scid = connection_response_view.GetSourceCid()
        dcid = connection_response_view.GetDestinationCid()
        self.scid_to_dcid[scid] = dcid

        options = []
        if self.ertm_option is not None:
            options.append(self.ertm_option)
        if self.fcs_option is not None:
            options.append(self.fcs_option)

        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 1, dcid, l2cap_packets.Continuation.END, options)
        self.control_channel.send(config_request)
        return True

    def _on_connection_response_configuration_request_with_unknown_options_and_hint(
            self, l2cap_control_view):
        connection_response_view = l2cap_packets.ConnectionResponseView(
            l2cap_control_view)
        sid = connection_response_view.GetIdentifier()
        scid = connection_response_view.GetSourceCid()
        dcid = connection_response_view.GetDestinationCid()
        self.scid_to_dcid[scid] = dcid

        mtu_opt = l2cap_packets.MtuConfigurationOption()
        mtu_opt.mtu = 0x1234
        mtu_opt.is_hint = l2cap_packets.ConfigurationOptionIsHint.OPTION_IS_A_HINT

        options = [mtu_opt]
        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 1, dcid, l2cap_packets.Continuation.END, options)
        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
            1, config_request)

        byte_array = bytearray(config_request_l2cap.Serialize())
        ## Modify configuration option type to be a unknown
        byte_array[12] |= 0x7f
        self._acl.send(bytes(byte_array))
        return True

    def _on_connection_response_configuration_request_with_continuation_flag(
            self, l2cap_control_view):
        connection_response_view = l2cap_packets.ConnectionResponseView(
            l2cap_control_view)
        sid = connection_response_view.GetIdentifier()
        scid = connection_response_view.GetSourceCid()
        dcid = connection_response_view.GetDestinationCid()
        self.scid_to_dcid[scid] = dcid

        mtu_opt = l2cap_packets.MtuConfigurationOption()
        mtu_opt.mtu = 0x1234

        options = [mtu_opt]
        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 1, dcid, l2cap_packets.Continuation.CONTINUE, options)

        self.control_channel.send(config_request)

        flush_timeout_option = l2cap_packets.FlushTimeoutConfigurationOption()
        flush_timeout_option.flush_timeout = 65535
        option = [flush_timeout_option]
        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 2, dcid, l2cap_packets.Continuation.END, option)
        self.get_control_channel().send(config_request)
        return True

    def _on_disconnection_request_default(self, l2cap_control_view):
        disconnection_request = l2cap_packets.DisconnectionRequestView(
            l2cap_control_view)
+93 −28
Original line number Diff line number Diff line
@@ -21,7 +21,7 @@ from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
from cert.event_stream import EventStream
from cert.truth import assertThat
from cert.closable import safeClose
from cert.py_l2cap import PyL2cap
from cert.py_l2cap import PyLeL2cap
from cert.py_acl_manager import PyAclManager
from cert.matchers import L2capMatchers
from facade import common_pb2 as common
@@ -53,11 +53,12 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
            empty_proto.Empty()).address
        self.cert_address = common.BluetoothAddress(address=self.cert.address)

        self.dut_l2cap = PyL2cap(self.dut)
        self.dut_l2cap = PyLeL2cap(self.dut)
        self.cert_l2cap = CertLeL2cap(self.cert)

    def teardown_test(self):
        self.cert_l2cap.close()
        self.dut_l2cap.close()
        super().teardown_test()

    def _setup_link_from_cert(self):
@@ -90,22 +91,86 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
                                  signal_id=1,
                                  scid=0x0101,
                                  psm=0x33,
                                  mtu=1000,
                                  mps=100,
                                  initial_credit=6):

        dut_channel = self.dut_l2cap.open_credit_based_flow_control_channel(psm)
        cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid,
                                                    initial_credit)
        cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid, mtu,
                                                    mps, initial_credit)

        return (dut_channel, cert_channel)

    def test_segmentation(self):
        """
        L2CAP/COS/CFC/BV-01-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel(
            mtu=1000, mps=102)
        dut_channel.send(b'hello' * 20 + b'world')
        # The first LeInformation packet contains 2 bytes of SDU size.
        # The packet is divided into first 100 bytes from 'hellohello....'
        # and remaining 5 bytes 'world'
        assertThat(cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello' * 20, sdu_size=105),
            L2capMatchers.Data(b'world')).inOrder()

    def test_no_segmentation(self):
        """
        L2CAP/COS/CFC/BV-02-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel(
            mtu=1000, mps=202)
        dut_channel.send(b'hello' * 40)
        assertThat(cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello' * 40, sdu_size=200))

    def test_reassembling(self):
        """
        L2CAP/COS/CFC/BV-03-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        sdu_size_for_two_sample_packet = 12
        cert_channel.send_first_le_i_frame(sdu_size_for_two_sample_packet,
                                           SAMPLE_PACKET)
        cert_channel.send(SAMPLE_PACKET)
        assertThat(self.dut_l2cap.le_l2cap_stream).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00' * 2))

    def test_data_receiving(self):
        """
        L2CAP/COS/CFC/BV-04-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
        assertThat(self.dut_l2cap.le_l2cap_stream).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))

    def test_reject_unknown_command_in_le_sigling_channel(self):
        """
        L2CAP/LE/REJ/BI-01-C
        """
        self._setup_link_from_cert()
        self.cert_l2cap.get_control_channel().send(
            l2cap_packets.InformationRequestBuilder(
                2, l2cap_packets.InformationRequestInfoType.
                EXTENDED_FEATURES_SUPPORTED))
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.LeCommandReject())

    def test_credit_based_connection_response_on_supported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-03-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        dut_channel.send_le(b'hello')
        assertThat(cert_channel).emits(L2capMatchers.PartialData(b'hello'))
        dut_channel.send(b'hello')
        assertThat(cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

    def test_credit_based_connection_request_unsupported_le_psm(self):
        """
@@ -129,31 +194,31 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        (dut_channel,
         cert_channel) = self._open_unvalidated_channel(initial_credit=0)
        for _ in range(4):
            dut_channel.send_le(b'hello')
        self.cert_l2cap.get_control_channel().send(
            l2cap_packets.LeFlowControlCreditBuilder(2, cert_channel.get_scid(),
                                                     1))
        assertThat(cert_channel).emits(L2capMatchers.PartialData(b'hello'))
        self.cert_l2cap.get_control_channel().send(
            l2cap_packets.LeFlowControlCreditBuilder(3, cert_channel.get_scid(),
                                                     1))
        assertThat(cert_channel).emits(L2capMatchers.PartialData(b'hello'))
        self.cert_l2cap.get_control_channel().send(
            l2cap_packets.LeFlowControlCreditBuilder(4, cert_channel.get_scid(),
                                                     2))
            dut_channel.send(b'hello')
        cert_channel.send_credits(1)
        assertThat(cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))
        cert_channel.send_credits(1)
        assertThat(cert_channel).emits(
            L2capMatchers.PartialData(b'hello'), at_least_times=2)
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))
        cert_channel.send_credits(2)
        assertThat(cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5),
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

    def test_acredit_exchange_exceed_initial_credits(self):
    def test_credit_exchange_exceed_initial_credits(self):
        """
        L2CAP/LE/CFC/BI-01-C
        """
        self._setup_link_from_cert()
        (dut_channel,
         cert_channel) = self._open_unvalidated_channel(initial_credit=100)
        self.cert_l2cap.get_control_channel().send(
            l2cap_packets.LeFlowControlCreditBuilder(2, cert_channel.get_scid(),
                                                     65500))
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.LeDisconnectionRequest(cert_channel.get_dcid(),
                                                 cert_channel.get_scid()))
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        cert_channel.send_credits(65535)
        cert_channel.verify_disconnect_request()

    def test_disconnection_response(self):
        """
        L2CAP/LE/CFC/BV-09-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        cert_channel.disconnect_and_verify()
Loading