Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 964616d1 authored by Zach Johnson's avatar Zach Johnson
Browse files

Migrate information frames to matchers

Test: cert/run --host --test_filter=L2capTest
Change-Id: Ife714ad8eedfdb960ecd515cfcee26e2f7873db9
parent a528df65
Loading
Loading
Loading
Loading
+26 −0
Original line number Diff line number Diff line
@@ -54,6 +54,10 @@ class L2capMatchers(object):
    def SupervisoryFrame(scid, req_seq=None, f=None, s=None, p=None):
        return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, scid, req_seq, f, s, p)

    @staticmethod
    def InformationFrame(scid, tx_seq=None, payload=None):
        return lambda packet: L2capMatchers._is_matching_information_frame(packet, scid, tx_seq, payload)

    @staticmethod
    def _basic_frame(packet):
        if packet is None:
@@ -61,6 +65,16 @@ class L2capMatchers(object):
        return l2cap_packets.BasicFrameView(
            bt_packets.PacketViewLittleEndian(list(packet.payload)))

    @staticmethod
    def _information_frame(packet, scid):
        frame = L2capMatchers._basic_frame(packet)
        if frame.GetChannelId() != scid:
            return None
        standard_frame = l2cap_packets.StandardFrameView(frame)
        if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
            return None
        return l2cap_packets.EnhancedInformationFrameView(standard_frame)

    @staticmethod
    def _supervisory_frame(packet, scid):
        frame = L2capMatchers._basic_frame(packet)
@@ -71,6 +85,18 @@ class L2capMatchers(object):
            return None
        return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame)

    @staticmethod
    def _is_matching_information_frame(packet, scid, tx_seq, payload):
        frame = L2capMatchers._information_frame(packet, scid)
        if frame is None:
            return False
        if tx_seq is not None and frame.GetTxSeq() != tx_seq:
            return False
        if payload is not None and frame.GetPayload(
        ) != payload:  # TODO(mylesgw) this doesn't work
            return False
        return True

    @staticmethod
    def _is_matching_supervisory_frame(packet, scid, req_seq, f, s, p):
        frame = L2capMatchers._supervisory_frame(packet, scid)
+23 −49
Original line number Diff line number Diff line
@@ -280,34 +280,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            fn(l2cap_control_view)
        return

    def get_basic_frame(self, l2cap_packet):
        return l2cap_packets.BasicFrameView(
            bt_packets.PacketViewLittleEndian(list(l2cap_packet.payload)))

    def cert_send_b_frame(self, b_frame):
        self.cert_acl.send(b_frame.Serialize())

    def get_enhanced_information_frame(self, scid, packet):
        l2cap_view = self.get_basic_frame(packet)
        if l2cap_view.GetChannelId() != scid:
            return None
        standard_view = l2cap_packets.StandardFrameView(l2cap_view)
        if standard_view.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
            return None
        return l2cap_packets.EnhancedInformationFrameView(standard_view)

    def get_tx_seq_from_ertm_i_frame(self, scid, packet):
        i_frame = self.get_enhanced_information_frame(scid, packet)
        if i_frame is None:
            return False
        return i_frame.GetTxSeq()

    def get_payload_from_ertm_i_frame(self, scid, packet):
        i_frame = self.get_enhanced_information_frame(scid, packet)
        if i_frame is None:
            return False
        return i_frame.GetPayload()  # TODO(mylesgw): This doesn't work!

    def _setup_link_from_cert(self):

        self.dut.neighbor.EnablePageScan(
@@ -883,16 +858,15 @@ class L2capTest(GdFacadeOnlyBaseTestClass):

        # TODO: Besides checking TxSeq, we also want to check payload, once we can get it from packet view
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
            L2capMatchers.InformationFrame(scid, tx_seq=0))
        cert_acl_data_stream.assert_none_matching(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
        )
            L2capMatchers.InformationFrame(scid, tx_seq=1))
        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1)
        self.cert_send_b_frame(s_frame)
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
            L2capMatchers.InformationFrame(scid, tx_seq=1))

    def test_resume_transmitting_when_acknowledge_previously_sent(self):
        """
@@ -928,10 +902,10 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))

        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
            L2capMatchers.InformationFrame(scid, tx_seq=0))
        # TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout
        cert_acl_data_stream.assert_none_matching(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
            L2capMatchers.InformationFrame(scid, tx_seq=1),
            timedelta(seconds=1))

        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
@@ -940,7 +914,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_send_b_frame(i_frame)

        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
            L2capMatchers.InformationFrame(scid, tx_seq=1))

        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
            dcid, 1, l2cap_packets.Final.NOT_SET, 2,
@@ -1071,7 +1045,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))

        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
            L2capMatchers.InformationFrame(scid, tx_seq=0))

        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
@@ -1112,7 +1086,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
        for i in range(2):
            assertThat(self.cert_acl).emits(
                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
                L2capMatchers.InformationFrame(scid, tx_seq=i),
                timeout=timedelta(seconds=0.5))

        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
@@ -1122,7 +1096,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):

        for i in range(2):
            assertThat(self.cert_acl).emits(
                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
                L2capMatchers.InformationFrame(scid, tx_seq=i),
                timeout=timedelta(seconds=0.5))

    def test_receive_s_frame_rr_final_bit_set(self):
@@ -1164,7 +1138,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_send_b_frame(s_frame)

        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
            L2capMatchers.InformationFrame(scid, tx_seq=0))

    def test_receive_i_frame_final_bit_set(self):
        """
@@ -1205,7 +1179,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_send_b_frame(i_frame)

        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
            L2capMatchers.InformationFrame(scid, tx_seq=0))

    def test_recieve_rnr(self):
        """
@@ -1247,7 +1221,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_send_b_frame(s_frame)

        cert_acl_data_stream.assert_none_matching(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
            L2capMatchers.InformationFrame(scid, tx_seq=0))

    def test_sent_rej_lost(self):
        """
@@ -1337,10 +1311,10 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.dut.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
            L2capMatchers.InformationFrame(scid, tx_seq=0),
            timeout=timedelta(0.5))
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
            L2capMatchers.InformationFrame(scid, tx_seq=1),
            timeout=timedelta(0.5))
        assertThat(self.cert_acl).emits(
            L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL))
@@ -1359,7 +1333,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_send_b_frame(s_frame)

        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
            L2capMatchers.InformationFrame(scid, tx_seq=0))

    def test_handle_receipt_rej_and_rr_with_f_set(self):
        """
@@ -1393,10 +1367,10 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.dut.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
            L2capMatchers.InformationFrame(scid, tx_seq=0),
            timeout=timedelta(0.5))
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
            L2capMatchers.InformationFrame(scid, tx_seq=1),
            timeout=timedelta(0.5))
        assertThat(self.cert_acl).emits(
            L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL),
@@ -1417,9 +1391,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_send_b_frame(s_frame)

        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
            L2capMatchers.InformationFrame(scid, tx_seq=0))
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
            L2capMatchers.InformationFrame(scid, tx_seq=1))

    def test_handle_rej_and_i_frame_with_f_set(self):
        """
@@ -1452,10 +1426,10 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.dut.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
            L2capMatchers.InformationFrame(scid, tx_seq=0),
            timeout=timedelta(0.5))
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
            L2capMatchers.InformationFrame(scid, tx_seq=1),
            timeout=timedelta(0.5))
        assertThat(self.cert_acl).emits(
            L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL),
@@ -1475,9 +1449,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_send_b_frame(i_frame)

        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
            L2capMatchers.InformationFrame(scid, tx_seq=0))
        assertThat(self.cert_acl).emits(
            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
            L2capMatchers.InformationFrame(scid, tx_seq=1))

    def test_initiated_configuration_request_ertm(self):
        """