Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 83fa9c90 authored by Zach Johnson's avatar Zach Johnson
Browse files

Add function to send s_frames & clean up sending

Test: cert/run --host --test_filter=L2capTest
Change-Id: I3bc7d2f0489a8a99febbc7bf746f355a2c77223d
parent 5e19d822
Loading
Loading
Loading
Loading
+11 −0
Original line number Diff line number Diff line
@@ -23,6 +23,8 @@ from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import CommandCode
from bluetooth_packets_python3.l2cap_packets import Final
from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly
from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction
from bluetooth_packets_python3.l2cap_packets import Poll
from cert.event_stream import FilteringEventStream
from cert.event_stream import IEventStream
from cert.matchers import L2capMatchers
@@ -57,6 +59,15 @@ class CertL2capChannel(IEventStream):
            self._dcid, tx_seq, f, req_seq, sar, payload)
        self._acl.send(frame.Serialize())

    def send_s_frame(self,
                     req_seq,
                     s=SupervisoryFunction.RECEIVER_READY,
                     p=Poll.NOT_SET,
                     f=Final.NOT_SET):
        frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            self._dcid, s, p, f, req_seq)
        self._acl.send(frame.Serialize())


class CertL2cap(Closable):

+39 −106
Original line number Diff line number Diff line
@@ -35,6 +35,8 @@ from bluetooth_packets_python3 import hci_packets, l2cap_packets
from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly
from bluetooth_packets_python3.l2cap_packets import Final
from bluetooth_packets_python3.l2cap_packets import CommandCode
from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction
from bluetooth_packets_python3.l2cap_packets import Poll
from cert_l2cap import CertL2cap

# Assemble a sample packet. TODO: Use RawBuilder
@@ -124,7 +126,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self._open_channel(scid=0x41, psm=0x33)

        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
            0x99, 0, l2cap_packets.Final.NOT_SET, 1,
            0x99, 0, Final.NOT_SET, 1,
            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
        self.cert_l2cap.send_acl(i_frame)
        assertThat(self.cert_channel).emitsNone(
@@ -569,10 +571,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self._setup_link_from_cert()
        self.cert_l2cap.turn_on_ertm(tx_window_size=1)

        scid = 0x41
        self._open_channel(scid=scid, psm=0x33, use_ertm=True)

        dcid = self.cert_l2cap.get_dcid(scid)
        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)

        self.dut_channel.send(b'abc')
        self.dut_channel.send(b'def')
@@ -581,10 +580,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            L2capMatchers.IFrame(tx_seq=0, payload=b'abc'))
        assertThat(self.cert_channel).emitsNone(
            L2capMatchers.IFrame(tx_seq=1, payload=b'def'))
        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1)
        self.cert_send_b_frame(s_frame)

        self.cert_channel.send_s_frame(req_seq=1, f=Final.POLL_RESPONSE)
        assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=1))

    def test_resume_transmitting_when_acknowledge_previously_sent(self):
@@ -643,18 +640,11 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self._setup_link_from_cert()
        self.cert_l2cap.turn_on_ertm()

        scid = 0x41
        self._open_channel(scid=scid, psm=0x33, use_ertm=True)

        dcid = self.cert_l2cap.get_dcid(scid)

        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
            l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
        self.cert_send_b_frame(s_frame)
        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)

        self.cert_channel.send_s_frame(req_seq=0, p=Poll.POLL)
        assertThat(self.cert_channel).emits(
            L2capMatchers.SFrame(f=l2cap_packets.Final.POLL_RESPONSE))
            L2capMatchers.SFrame(f=Final.POLL_RESPONSE))

    def test_s_frame_transmissions_exceed_max_transmit(self):
        """
@@ -683,20 +673,12 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self._setup_link_from_cert()
        self.cert_l2cap.turn_on_ertm()

        scid = 0x41
        self._open_channel(scid=scid, psm=0x33, use_ertm=True)

        dcid = self.cert_l2cap.get_dcid(scid)
        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)

        self.dut_channel.send(b'abc')

        assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))

        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
        self.cert_send_b_frame(s_frame)

        self.cert_channel.send_s_frame(req_seq=0, f=Final.POLL_RESPONSE)
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.DisconnectionRequest())

@@ -709,9 +691,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_l2cap.turn_on_ertm(tx_window_size=2, max_transmit=2)

        scid = 0x41
        self._open_channel(scid=scid, psm=0x33, use_ertm=True)

        dcid = self.cert_l2cap.get_dcid(scid)
        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)

        self.dut_channel.send(b'abc')
        self.dut_channel.send(b'abc')
@@ -719,10 +699,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            assertThat(self.cert_channel).emits(
                L2capMatchers.IFrame(tx_seq=i), timeout=timedelta(seconds=0.5))

        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.REJECT,
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
        self.cert_send_b_frame(s_frame)
        self.cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT)

        for i in range(2):
            assertThat(self.cert_channel).emits(
@@ -737,10 +714,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self._setup_link_from_cert()
        self.cert_l2cap.turn_on_ertm()

        scid = 0x41
        self._open_channel(scid=scid, psm=0x33, use_ertm=True)

        dcid = self.cert_l2cap.get_dcid(scid)
        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)

        self.dut_channel.send(b'abc')

@@ -749,11 +723,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        assertThat(self.cert_channel).emits(
            L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL))

        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
        self.cert_send_b_frame(s_frame)

        self.cert_channel.send_s_frame(req_seq=0, f=Final.POLL_RESPONSE)
        assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))

    def test_receive_i_frame_final_bit_set(self):
@@ -771,8 +741,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):

        # TODO: Always use their retransmission timeout value
        time.sleep(2)
        assertThat(self.cert_channel).emits(
            L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL))
        assertThat(self.cert_channel).emits(L2capMatchers.SFrame(p=Poll.POLL))

        self.cert_channel.send_i_frame(
            tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET)
@@ -788,10 +757,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self._setup_link_from_cert()
        self.cert_l2cap.turn_on_ertm()

        scid = 0x41
        self._open_channel(scid=scid, psm=0x33, use_ertm=True)

        dcid = self.cert_l2cap.get_dcid(scid)
        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)

        self.dut_channel.send(b'abc')

@@ -800,11 +766,10 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        assertThat(self.cert_channel).emits(
            L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL))

        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY,
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
        self.cert_send_b_frame(s_frame)

        self.cert_channel.send_s_frame(
            req_seq=0,
            s=SupervisoryFunction.RECEIVER_NOT_READY,
            f=Final.POLL_RESPONSE)
        assertThat(self.cert_channel).emitsNone(L2capMatchers.IFrame(tx_seq=0))

    def test_sent_rej_lost(self):
@@ -817,10 +782,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_l2cap.turn_on_ertm(tx_window_size=5)
        ertm_tx_window_size = 5

        scid = 0x41
        self._open_channel(scid=scid, psm=0x41, use_ertm=True)

        dcid = self.cert_l2cap.get_dcid(scid)
        self._open_channel(scid=0x41, psm=0x41, use_ertm=True)

        self.cert_channel.send_i_frame(
            tx_seq=0, req_seq=0, payload=SAMPLE_PACKET)
@@ -829,12 +791,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_channel.send_i_frame(
            tx_seq=ertm_tx_window_size - 1, req_seq=0, payload=SAMPLE_PACKET)
        assertThat(self.cert_channel).emits(
            L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.REJECT))
            L2capMatchers.SFrame(s=SupervisoryFunction.REJECT))

        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
            l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
        self.cert_send_b_frame(s_frame)
        self.cert_channel.send_s_frame(req_seq=0, p=Poll.POLL)

        assertThat(self.cert_channel).emits(
            L2capMatchers.SFrame(
@@ -853,10 +812,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self._setup_link_from_cert()
        self.cert_l2cap.turn_on_ertm()

        scid = 0x41
        self._open_channel(scid=scid, psm=0x33, use_ertm=True)

        dcid = self.cert_l2cap.get_dcid(scid)
        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)

        self.dut_channel.send(b'abc')
        self.dut_channel.send(b'abc')
@@ -864,22 +820,16 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            L2capMatchers.IFrame(tx_seq=0), timeout=timedelta(0.5))
        assertThat(self.cert_channel).emits(
            L2capMatchers.IFrame(tx_seq=1), timeout=timedelta(0.5))
        assertThat(self.cert_channel).emits(
            L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL))

        # Send SREJ with F not set
        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
        self.cert_send_b_frame(s_frame)
        assertThat(self.cert_channel).emits(L2capMatchers.SFrame(p=Poll.POLL))

        self.cert_channel.send_s_frame(
            req_seq=0, s=SupervisoryFunction.SELECT_REJECT)
        assertThat(self.cert_channel).emitsNone(timeout=timedelta(seconds=0.5))
        # Send SREJ with F set
        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
        self.cert_send_b_frame(s_frame)

        self.cert_channel.send_s_frame(
            req_seq=0,
            s=SupervisoryFunction.SELECT_REJECT,
            f=Final.POLL_RESPONSE)
        assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))

    def test_handle_receipt_rej_and_rr_with_f_set(self):
@@ -892,10 +842,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self._setup_link_from_cert()
        self.cert_l2cap.turn_on_ertm()

        scid = 0x41
        self._open_channel(scid=scid, psm=0x33, use_ertm=True)

        dcid = self.cert_l2cap.get_dcid(scid)
        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)

        self.dut_channel.send(b'abc')
        self.dut_channel.send(b'abc')
@@ -907,20 +854,12 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL),
            timeout=timedelta(2))

        # Send REJ with F not set
        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.REJECT,
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
        self.cert_send_b_frame(s_frame)

        self.cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT)
        assertThat(self.cert_channel).emitsNone(timeout=timedelta(seconds=0.5))

        # Send RR with F set
        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.REJECT,
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
        self.cert_send_b_frame(s_frame)

        self.cert_channel.send_s_frame(
            req_seq=0, s=SupervisoryFunction.REJECT, f=Final.POLL_RESPONSE)
        assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))
        assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=1))

@@ -933,10 +872,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self._setup_link_from_cert()
        self.cert_l2cap.turn_on_ertm()

        scid = 0x41
        self._open_channel(scid=scid, psm=0x33, use_ertm=True)

        dcid = self.cert_l2cap.get_dcid(scid)
        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)

        self.dut_channel.send(b'abc')
        self.dut_channel.send(b'abc')
@@ -949,11 +885,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            timeout=timedelta(2))

        # Send SREJ with F not set
        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
        self.cert_send_b_frame(s_frame)

        self.cert_channel.send_s_frame(
            req_seq=0, s=SupervisoryFunction.SELECT_REJECT)
        assertThat(self.cert_channel).emitsNone(timeout=timedelta(seconds=0.5))

        self.cert_channel.send_i_frame(