Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d3424eff authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge changes I5c0eea0d,I27dcb047

* changes:
  LeL2capTest: Get incoming packet stream for each channel
  LeL2cap Cert: Allow DUT initiate connection request
parents ac3a6efb d4a89d77
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -73,6 +73,18 @@ class L2capCaptures(object):
            packet, CommandCode.CONNECTION_RESPONSE)
        return l2cap_packets.ConnectionResponseView(frame)

    @staticmethod
    def CreditBasedConnectionRequest(psm):
        return Capture(
            L2capMatchers.CreditBasedConnectionRequest(psm),
            L2capCaptures._extract_credit_based_connection_request)

    @staticmethod
    def _extract_credit_based_connection_request(packet):
        frame = L2capMatchers.le_control_frame_with_code(
            packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
        return l2cap_packets.LeCreditBasedConnectionRequestView(frame)

    @staticmethod
    def CreditBasedConnectionResponse(scid):
        return Capture(
+16 −2
Original line number Diff line number Diff line
@@ -58,8 +58,8 @@ class L2capMatchers(object):
        return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)

    @staticmethod
    def CreditBasedConnectionRequest():
        return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
    def CreditBasedConnectionRequest(psm):
        return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm)

    @staticmethod
    def CreditBasedConnectionResponse(
@@ -100,6 +100,11 @@ class L2capMatchers(object):
    def PacketPayloadRawData(payload):
        return lambda packet: payload in packet.payload

    # this is a hack - should be removed
    @staticmethod
    def PacketPayloadWithMatchingPsm(psm):
        return lambda packet: None if psm != packet.psm else packet

    @staticmethod
    def ExtractBasicFrame(scid):
        return lambda packet: L2capMatchers._basic_frame_for(packet, scid)
@@ -281,6 +286,15 @@ class L2capMatchers(object):
            return False
        return True

    @staticmethod
    def _is_matching_credit_based_connection_request(packet, psm):
        frame = L2capMatchers.le_control_frame_with_code(
            packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
        if frame is None:
            return False
        request = l2cap_packets.LeCreditBasedConnectionRequestView(frame)
        return request.GetLePsm() == psm

    @staticmethod
    def _is_matching_credit_based_connection_response(packet, scid, result):
        frame = L2capMatchers.le_control_frame_with_code(
+59 −12
Original line number Diff line number Diff line
@@ -17,8 +17,13 @@
from l2cap.classic import facade_pb2 as l2cap_facade_pb2
from l2cap.le import facade_pb2 as l2cap_le_facade_pb2
from bluetooth_packets_python3 import l2cap_packets
from cert.event_stream import EventStream
from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult
from cert.event_stream import FilteringEventStream
from cert.event_stream import EventStream, IEventStream
from cert.closable import Closable, safeClose
from cert.truth import assertThat
from cert.matchers import L2capMatchers
from facade import common_pb2 as common
from google.protobuf import empty_pb2 as empty_proto


@@ -52,32 +57,74 @@ class PyL2cap(Closable):
        return PyL2capChannel(self._device, psm)


class PyLeL2capChannel(object):
class PyLeL2capChannel(IEventStream):

    def __init__(self, device, psm):
    def __init__(self, device, psm, l2cap_stream):
        self._device = device
        self._psm = psm
        self._le_l2cap_stream = l2cap_stream
        self._our_le_l2cap_view = FilteringEventStream(
            self._le_l2cap_stream,
            L2capMatchers.PacketPayloadWithMatchingPsm(self._psm))

    def get_event_queue(self):
        return self._our_le_l2cap_view.get_event_queue()

    def send(self, payload):
        self._device.l2cap_le.SendDynamicChannelPacket(
            l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload))


class CreditBasedConnectionResponseFutureWrapper(object):
    """
    The future object returned when we send a connection request from DUT. Can be used to get connection status and
    create the corresponding PyLeL2capChannel object later
    """

    def __init__(self, grpc_response_future, device, psm, le_l2cap_stream):
        self._grpc_response_future = grpc_response_future
        self._device = device
        self._psm = psm
        self._le_l2cap_stream = le_l2cap_stream

    def get_status(self):
        return l2cap_packets.LeCreditBasedConnectionResponseResult(
            self._grpc_response_future.result().status)

    def get_channel(self):
        assertThat(self.get_status()).isEqualTo(
            l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS)
        return PyLeL2capChannel(self._device, self._psm, self._le_l2cap_stream)


class PyLeL2cap(Closable):

    def __init__(self, device):
        self._device = device
        self.le_l2cap_stream = EventStream(
        self._le_l2cap_stream = EventStream(
            self._device.l2cap_le.FetchL2capData(empty_proto.Empty()))

    def close(self):
        safeClose(self.le_l2cap_stream)
        safeClose(self._le_l2cap_stream)

    def get_le_l2cap_stream(self):
        return self.le_l2cap_stream

    def open_credit_based_flow_control_channel(self, psm=0x33):
        # todo, I don't understand what SetDynamicChannel means?
    def register_coc(self, psm=0x33):
        self._device.l2cap_le.SetDynamicChannel(
            l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm))
        return PyLeL2capChannel(self._device, psm)
            l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(
                psm=psm, enable=True))
        return PyLeL2capChannel(self._device, psm, self._le_l2cap_stream)

    def connect_coc_to_cert(self, psm=0x33):
        """
        Send open LE COC request to CERT. Get a future for connection result, to be used after CERT accepts request
        """
        self.register_coc(psm)
        # TODO: Update CERT device random address in ACL manager
        response_future = self._device.l2cap_le.OpenDynamicChannel.future(
            l2cap_le_facade_pb2.OpenDynamicChannelRequest(
                psm=psm,
                remote=common.BluetoothAddressWithType(
                    address=common.BluetoothAddress(
                        address=b"22:33:ff:ff:11:00"))))

        return CreditBasedConnectionResponseFutureWrapper(
            response_future, self._device, psm, self._le_l2cap_stream)
+49 −15
Original line number Diff line number Diff line
@@ -20,7 +20,8 @@ from cert.py_le_acl_manager import PyLeAclManager
from cert.truth import assertThat
import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import CommandCode
from bluetooth_packets_python3.l2cap_packets import LeCommandCode
from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult
from cert.event_stream import FilteringEventStream
from cert.event_stream import IEventStream
from cert.matchers import L2capMatchers
@@ -78,9 +79,9 @@ class CertLeL2cap(Closable):
        self._le_acl = None

        self.control_table = {
            CommandCode.DISCONNECTION_REQUEST:
            LeCommandCode.DISCONNECTION_REQUEST:
            self._on_disconnection_request_default,
            CommandCode.DISCONNECTION_RESPONSE:
            LeCommandCode.DISCONNECTION_RESPONSE:
            self._on_disconnection_response_default,
        }

@@ -120,6 +121,40 @@ class CertLeL2cap(Closable):
                                  self._get_acl_stream(), self._le_acl,
                                  self.control_channel)

    def verify_and_respond_open_channel_from_remote(
            self, psm=0x33,
            result=LeCreditBasedConnectionResponseResult.SUCCESS):
        request = L2capCaptures.CreditBasedConnectionRequest(psm)
        assertThat(self.control_channel).emits(request)
        (scid, dcid) = self._respond_connection_request_default(
            request.get(), result)
        return CertLeL2capChannel(self._device, scid, dcid,
                                  self._get_acl_stream(), self._le_acl,
                                  self.control_channel)

    def verify_and_reject_open_channel_from_remote(self, psm=0x33):
        request = L2capCaptures.CreditBasedConnectionRequest(psm)
        assertThat(self.control_channel).emits(request)
        sid = request.get().GetIdentifier()
        reject = l2cap_packets.LeCommandRejectNotUnderstoodBuilder(sid)
        self.control_channel.send(reject)

    def _respond_connection_request_default(
            self, request,
            result=LeCreditBasedConnectionResponseResult.SUCCESS):
        sid = request.GetIdentifier()
        their_scid = request.GetSourceCid()
        mtu = request.GetMtu()
        mps = request.GetMps()
        initial_credits = request.GetInitialCredits()
        # Here we use the same value - their scid as their scid
        our_scid = their_scid
        our_dcid = their_scid
        response = l2cap_packets.LeCreditBasedConnectionResponseBuilder(
            sid, our_scid, mtu, mps, initial_credits, result)
        self.control_channel.send(response)
        return (our_scid, our_dcid)

    # prefer to use channel abstraction instead, if at all possible
    def send_acl(self, packet):
        self._acl.send(packet.Serialize())
@@ -130,19 +165,18 @@ class CertLeL2cap(Closable):
    def _get_acl_stream(self):
        return self._le_acl_manager.get_le_acl_stream()

    def _on_disconnection_request_default(self, l2cap_control_view):
        disconnection_request = l2cap_packets.DisconnectionRequestView(
            l2cap_control_view)
    def _on_disconnection_request_default(self, request):
        disconnection_request = l2cap_packets.LeDisconnectionRequestView(
            request)
        sid = disconnection_request.GetIdentifier()
        scid = disconnection_request.GetSourceCid()
        dcid = disconnection_request.GetDestinationCid()
        disconnection_response = l2cap_packets.DisconnectionResponseBuilder(
            sid, dcid, scid)
        self.control_channel.send(disconnection_response)
        response = l2cap_packets.LeDisconnectionResponseBuilder(sid, dcid, scid)
        self.control_channel.send(response)

    def _on_disconnection_response_default(self, l2cap_control_view):
        disconnection_response = l2cap_packets.DisconnectionResponseView(
            l2cap_control_view)
    def _on_disconnection_response_default(self, request):
        disconnection_response = l2cap_packets.LeDisconnectionResponseView(
            request)

    def _handle_control_packet(self, l2cap_packet):
        packet_bytes = l2cap_packet.payload
@@ -150,8 +184,8 @@ class CertLeL2cap(Closable):
            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
        if l2cap_view.GetChannelId() != 5:
            return
        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
        fn = self.control_table.get(l2cap_control_view.GetCode())
        request = l2cap_packets.LeControlView(l2cap_view.GetPayload())
        fn = self.control_table.get(request.GetCode())
        if fn is not None:
            fn(l2cap_control_view)
            fn(request)
        return
+95 −28
Original line number Diff line number Diff line
@@ -27,7 +27,7 @@ from cert.matchers import L2capMatchers
from facade import common_pb2 as common
from facade import rootservice_pb2 as facade_rootservice
from google.protobuf import empty_pb2 as empty_proto
from l2cap.classic import facade_pb2 as l2cap_facade_pb2
from l2cap.le import facade_pb2 as l2cap_facade_pb2
from neighbor.facade import facade_pb2 as neighbor_facade
from hci.facade import acl_manager_facade_pb2 as acl_manager_facade
from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade
@@ -87,7 +87,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
            request)
        self.cert_l2cap.connect_le_acl(bytes(b'0D:05:04:03:02:01'))

    def _open_unvalidated_channel(self,
    def _open_channel_from_cert(self,
                                signal_id=1,
                                scid=0x0101,
                                psm=0x33,
@@ -95,18 +95,25 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
                                mps=100,
                                initial_credit=6):

        dut_channel = self.dut_l2cap.open_credit_based_flow_control_channel(psm)
        dut_channel = self.dut_l2cap.register_coc(psm)
        cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid, mtu,
                                                    mps, initial_credit)

        return (dut_channel, cert_channel)

    def _open_channel_from_dut(self, psm=0x33):
        response_future = self.dut_l2cap.connect_coc_to_cert(psm)
        cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm)
        dut_channel = response_future.get_channel()
        return (dut_channel, cert_channel)

    def test_segmentation(self):
        """
        L2CAP/COS/CFC/BV-01-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel(
        (dut_channel, cert_channel) = self._open_channel_from_cert(
            mtu=1000, mps=102)
        dut_channel.send(b'hello' * 20 + b'world')
        # The first LeInformation packet contains 2 bytes of SDU size.
@@ -121,7 +128,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        L2CAP/COS/CFC/BV-02-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel(
        (dut_channel, cert_channel) = self._open_channel_from_cert(
            mtu=1000, mps=202)
        dut_channel.send(b'hello' * 40)
        assertThat(cert_channel).emits(
@@ -132,12 +139,12 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        L2CAP/COS/CFC/BV-03-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        sdu_size_for_two_sample_packet = 12
        cert_channel.send_first_le_i_frame(sdu_size_for_two_sample_packet,
                                           SAMPLE_PACKET)
        cert_channel.send(SAMPLE_PACKET)
        assertThat(self.dut_l2cap.le_l2cap_stream).emits(
        assertThat(dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00' * 2))

    def test_data_receiving(self):
@@ -145,9 +152,37 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        L2CAP/COS/CFC/BV-04-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
        assertThat(self.dut_l2cap.le_l2cap_stream).emits(
        assertThat(dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))

    def test_multiple_channels_with_interleaved_data_streams(self):
        """
        L2CAP/COS/CFC/BV-05-C
        """
        self._setup_link_from_cert()
        (dut_channel_x, cert_channel_x) = self._open_channel_from_cert(
            signal_id=1, scid=0x0103, psm=0x33)
        (dut_channel_y, cert_channel_y) = self._open_channel_from_cert(
            signal_id=2, scid=0x0105, psm=0x35)
        (dut_channel_z, cert_channel_z) = self._open_channel_from_cert(
            signal_id=3, scid=0x0107, psm=0x37)
        cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET)
        # TODO: We should assert two events in order, but it got stuck
        assertThat(dut_channel_y).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'),
            at_least_times=3)
        assertThat(dut_channel_z).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'),
            L2capMatchers.PacketPayloadRawData(
                b'\x01\x01\x02\x00\x00\x00')).inOrder()
        cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET)
        assertThat(dut_channel_z).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))

    def test_reject_unknown_command_in_le_sigling_channel(self):
@@ -162,16 +197,6 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.LeCommandReject())

    def test_credit_based_connection_response_on_supported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-03-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        dut_channel.send(b'hello')
        assertThat(cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

    def test_credit_based_connection_request_unsupported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-05-C
@@ -192,7 +217,7 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        """
        self._setup_link_from_cert()
        (dut_channel,
         cert_channel) = self._open_unvalidated_channel(initial_credit=0)
         cert_channel) = self._open_channel_from_cert(initial_credit=0)
        for _ in range(4):
            dut_channel.send(b'hello')
        cert_channel.send_credits(1)
@@ -206,12 +231,54 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5),
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

    def test_le_credit_based_connection_request_legacy_peer(self):
        """
        L2CAP/LE/CFC/BV-01-C
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33)
        self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33)
        assertThat(response_future.get_status()).isNotEqualTo(
            LeCreditBasedConnectionResponseResult.SUCCESS)

    def test_le_credit_based_connection_request_on_supported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-02-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_dut()
        cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
        assertThat(dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))

    def test_credit_based_connection_response_on_supported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-03-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        dut_channel.send(b'hello')
        assertThat(cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

    def test_credit_based_connection_request_on_an_unsupported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-04-C
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)
        assertThat(response_future.get_status()).isEqualTo(
            LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)

    def test_credit_exchange_exceed_initial_credits(self):
        """
        L2CAP/LE/CFC/BI-01-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        cert_channel.send_credits(65535)
        cert_channel.verify_disconnect_request()

@@ -220,5 +287,5 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
        L2CAP/LE/CFC/BV-09-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_unvalidated_channel()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        cert_channel.disconnect_and_verify()
Loading