Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a528df65 authored by Zach Johnson's avatar Zach Johnson
Browse files

Migrate supervisory frame matching logic

By using matchers, we can use python named parameters
to express optionality, and improve simplicity & readability.

Test: cert/run --host --test_filter=L2capTest
Change-Id: I9f73cf1651172b7179a9963f9c07e865b126e426
parent 1bbe7090
Loading
Loading
Loading
Loading
+29 −0
Original line number Diff line number Diff line
@@ -50,6 +50,10 @@ class L2capMatchers(object):
    def CommandReject():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT)

    @staticmethod
    def SupervisoryFrame(scid, req_seq=None, f=None, s=None, p=None):
        return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, scid, req_seq, f, s, p)

    @staticmethod
    def _basic_frame(packet):
        if packet is None:
@@ -57,6 +61,31 @@ class L2capMatchers(object):
        return l2cap_packets.BasicFrameView(
            bt_packets.PacketViewLittleEndian(list(packet.payload)))

    @staticmethod
    def _supervisory_frame(packet, scid):
        frame = L2capMatchers._basic_frame(packet)
        if frame.GetChannelId() != scid:
            return None
        standard_frame = l2cap_packets.StandardFrameView(frame)
        if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME:
            return None
        return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame)

    @staticmethod
    def _is_matching_supervisory_frame(packet, scid, req_seq, f, s, p):
        frame = L2capMatchers._supervisory_frame(packet, scid)
        if frame is None:
            return False
        if req_seq is not None and frame.GetReqSeq() != req_seq:
            return False
        if f is not None and frame.GetF() != f:
            return False
        if s is not None and frame.GetS() != s:
            return False
        if p is not None and frame.GetP() != p:
            return False
        return True

    @staticmethod
    def _control_frame(packet):
        frame = L2capMatchers._basic_frame(packet)
+22 −66
Original line number Diff line number Diff line
@@ -287,15 +287,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
    def cert_send_b_frame(self, b_frame):
        self.cert_acl.send(b_frame.Serialize())

    def get_enhanced_supervisory_frame(self, scid, packet):
        l2cap_view = self.get_basic_frame(packet)
        if l2cap_view.GetChannelId() != scid:
            return None
        standard_view = l2cap_packets.StandardFrameView(l2cap_view)
        if standard_view.GetFrameType() != l2cap_packets.FrameType.S_FRAME:
            return None
        return l2cap_packets.EnhancedSupervisoryFrameView(standard_view)

    def get_enhanced_information_frame(self, scid, packet):
        l2cap_view = self.get_basic_frame(packet)
        if l2cap_view.GetChannelId() != scid:
@@ -305,30 +296,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            return None
        return l2cap_packets.EnhancedInformationFrameView(standard_view)

    def get_req_seq_from_ertm_s_frame(self, scid, packet):
        s_frame = self.get_enhanced_supervisory_frame(scid, packet)
        if s_frame is None:
            return False
        return s_frame.GetReqSeq()

    def get_s_from_ertm_s_frame(self, scid, packet):
        s_frame = self.get_enhanced_supervisory_frame(scid, packet)
        if s_frame is None:
            return False
        return s_frame.GetS()

    def get_p_from_ertm_s_frame(self, scid, packet):
        s_frame = self.get_enhanced_supervisory_frame(scid, packet)
        if s_frame is None:
            return False
        return s_frame.GetP()

    def get_f_from_ertm_s_frame(self, scid, packet):
        s_frame = self.get_enhanced_supervisory_frame(scid, packet)
        if s_frame is None:
            return False
        return s_frame.GetF()

    def get_tx_seq_from_ertm_i_frame(self, scid, packet):
        i_frame = self.get_enhanced_information_frame(scid, packet)
        if i_frame is None:
@@ -400,7 +367,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
        self.cert_send_b_frame(i_frame)
        cert_acl_data_stream.assert_none_matching(
            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4,
            L2capMatchers.SupervisoryFrame(scid, req_seq=4),
            timedelta(seconds=1))

    def test_open_two_channels(self):
@@ -819,32 +786,28 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
                SAMPLE_PACKET)
            self.cert_send_b_frame(i_frame)
            assertThat(self.cert_acl).emits(
                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
            )
                L2capMatchers.SupervisoryFrame(scid, req_seq=i + 1))

        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
            dcid, 3, l2cap_packets.Final.NOT_SET, 0,
            l2cap_packets.SegmentationAndReassembly.START, SAMPLE_PACKET)
        self.cert_send_b_frame(i_frame)
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4
        )
            L2capMatchers.SupervisoryFrame(scid, req_seq=4))

        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
            dcid, 4, l2cap_packets.Final.NOT_SET, 0,
            l2cap_packets.SegmentationAndReassembly.CONTINUATION, SAMPLE_PACKET)
        self.cert_send_b_frame(i_frame)
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 5
        )
            L2capMatchers.SupervisoryFrame(scid, req_seq=5))

        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
            dcid, 5, l2cap_packets.Final.NOT_SET, 0,
            l2cap_packets.SegmentationAndReassembly.END, SAMPLE_PACKET)
        self.cert_send_b_frame(i_frame)
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 6
        )
            L2capMatchers.SupervisoryFrame(scid, req_seq=6))

    def test_acknowledging_received_i_frames(self):
        """
@@ -879,11 +842,10 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
                SAMPLE_PACKET)
            self.cert_send_b_frame(i_frame)
            assertThat(self.cert_acl).emits(
                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
            )
                L2capMatchers.SupervisoryFrame(scid, req_seq=i + 1))

        cert_acl_data_stream.assert_none_matching(
            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4,
            L2capMatchers.SupervisoryFrame(scid, req_seq=4),
            timedelta(seconds=1))

    def test_resume_transmitting_when_received_rr(self):
@@ -1012,8 +974,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        # TODO: Always use their retransmission timeout value
        time.sleep(2)
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
        )
            L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL))

    def test_transmit_s_frame_rr_with_final_bit_set(self):
        """
@@ -1046,8 +1007,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_send_b_frame(s_frame)

        assertThat(self.cert_acl).emits(
            lambda packet: self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE
        )
            L2capMatchers.SupervisoryFrame(
                scid, f=l2cap_packets.Final.POLL_RESPONSE))

    def test_s_frame_transmissions_exceed_max_transmit(self):
        """
@@ -1195,8 +1156,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        # TODO: Always use their retransmission timeout value
        time.sleep(2)
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
        )
            L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL))

        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
@@ -1237,8 +1197,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        # TODO: Always use their retransmission timeout value
        time.sleep(2)
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
        )
            L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL))

        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
            dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
@@ -1280,8 +1239,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        # TODO: Always use their retransmission timeout value
        time.sleep(2)
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
        )
            L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL))

        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY,
@@ -1322,16 +1280,15 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
        self.cert_send_b_frame(i_frame)
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1
        )
            L2capMatchers.SupervisoryFrame(scid, req_seq=1))

        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
            dcid, self.ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET, 0,
            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
        self.cert_send_b_frame(i_frame)
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_s_from_ertm_s_frame(scid, packet) == l2cap_packets.SupervisoryFunction.REJECT
        )
            L2capMatchers.SupervisoryFrame(
                scid, s=l2cap_packets.SupervisoryFunction.REJECT))

        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
@@ -1339,7 +1296,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_send_b_frame(s_frame)

        assertThat(self.cert_acl).emits(
            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1 and self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE)
            L2capMatchers.SupervisoryFrame(
                scid, req_seq=1, f=l2cap_packets.Final.POLL_RESPONSE))
        for i in range(1, self.ertm_tx_window_size):
            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
                dcid, i, l2cap_packets.Final.NOT_SET, 0,
@@ -1347,8 +1305,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
                SAMPLE_PACKET)
            self.cert_send_b_frame(i_frame)
            assertThat(self.cert_acl).emits(
                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
            )
                L2capMatchers.SupervisoryFrame(scid, req_seq=i + 1))

    def test_handle_duplicate_srej(self):
        """
@@ -1386,8 +1343,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
            timeout=timedelta(0.5))
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
        )
            L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL))

        # Send SREJ with F not set
        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
@@ -1443,7 +1399,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
            timeout=timedelta(0.5))
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
            L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL),
            timeout=timedelta(2))

        # Send REJ with F not set
@@ -1502,7 +1458,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
            timeout=timedelta(0.5))
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
            L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL),
            timeout=timedelta(2))

        # Send SREJ with F not set