Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2c501312 authored by Zach Johnson's avatar Zach Johnson
Browse files

Migrate remaining event stream asserts in L2capTest to truth

Test: cert/run --host
Change-Id: Ib9f7e8004aadc61ed9686240382443299357da1e
parent 964616d1
Loading
Loading
Loading
Loading
+46 −0
Original line number Diff line number Diff line
@@ -340,3 +340,49 @@ class CertSelfTest(BaseTestClass):
            logging.debug(e)
            return True  # Failed as expected
        return False

    def test_assertThat_emitsNone_passes(self):
        with EventStream(FetchEvents(events=[1, 2, 3],
                                     delay_ms=50)) as event_stream:
            assertThat(event_stream).emitsNone(
                lambda data: data.value_ == 4,
                timeout=timedelta(seconds=0.15)).thenNone(
                    lambda data: data.value_ == 5,
                    timeout=timedelta(seconds=0.15))

    def test_assertThat_emitsNone_passes_after_1_second(self):
        with EventStream(FetchEvents(events=[1, 2, 3, 4],
                                     delay_ms=400)) as event_stream:
            assertThat(event_stream).emitsNone(
                lambda data: data.value_ == 4, timeout=timedelta(seconds=1))

    def test_assertThat_emitsNone_fails(self):
        try:
            with EventStream(FetchEvents(events=[1, 2, 3],
                                         delay_ms=50)) as event_stream:
                assertThat(event_stream).emitsNone(
                    lambda data: data.value_ == 2, timeout=timedelta(seconds=1))
        except Exception as e:
            logging.debug(e)
            return True  # Failed as expected
        return False

    def test_assertThat_emitsNone_zero_passes(self):
        with EventStream(FetchEvents(events=[], delay_ms=50)) as event_stream:
            assertThat(event_stream).emitsNone(
                timeout=timedelta(milliseconds=10)).thenNone(
                    timeout=timedelta(milliseconds=10))

    def test_assertThat_emitsNone_zero_passes_after_one_second(self):
        with EventStream(FetchEvents([1], delay_ms=1500)) as event_stream:
            assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1.0))

    def test_assertThat_emitsNone_zero_fails(self):
        try:
            with EventStream(FetchEvents(events=[17],
                                         delay_ms=50)) as event_stream:
                assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1))
        except Exception as e:
            logging.debug(e)
            return True  # Failed as expected
        return False
+40 −30
Original line number Diff line number Diff line
@@ -165,15 +165,7 @@ class EventStream(IEventStream, Closable):
        :param timeout: a timedelta object
        :return:
        """
        logging.debug("assert_none %fs" % (timeout.total_seconds()))
        try:
            event = self.event_queue.get(timeout=timeout.total_seconds())
            asserts.assert_true(
                event is None,
                msg=("Expected None, but got %s" % text_format.MessageToString(
                    event, as_one_line=True)))
        except Empty:
            return
        NOT_FOR_YOU_assert_none(self, timeout)

    def assert_none_matching(
            self, match_fn, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
@@ -185,27 +177,7 @@ class EventStream(IEventStream, Closable):
        :param timeout: a timedelta object
        :return:
        """
        logging.debug("assert_none_matching %fs" % (timeout.total_seconds()))
        event = None
        end_time = datetime.now() + timeout
        while event is None and datetime.now() < end_time:
            remaining = static_remaining_time_delta(end_time)
            logging.debug("Waiting for event (%fs remaining)" %
                          (remaining.total_seconds()))
            try:
                current_event = self.event_queue.get(
                    timeout=remaining.total_seconds())
                if match_fn(current_event):
                    event = current_event
            except Empty:
                continue
        logging.debug("Done waiting for an event")
        if event is None:
            return  # Avoid an assert in MessageToString(None, ...)
        asserts.assert_true(
            event is None,
            msg=("Expected None matching, but got %s" %
                 text_format.MessageToString(event, as_one_line=True)))
        NOT_FOR_YOU_assert_none_matching(self, match_fn, timeout)

    def assert_event_occurs(self,
                            match_fn,
@@ -339,3 +311,41 @@ def NOT_FOR_YOU_assert_all_events_occur(
        asserts.assert_true(
            correct_order, "Events not received in correct order %s %s" %
            (match_fns, matched_order))


def NOT_FOR_YOU_assert_none_matching(
        istream, match_fn, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
    logging.debug("assert_none_matching %fs" % (timeout.total_seconds()))
    event = None
    end_time = datetime.now() + timeout
    while event is None and datetime.now() < end_time:
        remaining = static_remaining_time_delta(end_time)
        logging.debug(
            "Waiting for event (%fs remaining)" % (remaining.total_seconds()))
        try:
            current_event = istream.get_event_queue().get(
                timeout=remaining.total_seconds())
            if match_fn(current_event):
                event = current_event
        except Empty:
            continue
    logging.debug("Done waiting for an event")
    if event is None:
        return  # Avoid an assert in MessageToString(None, ...)
    asserts.assert_true(
        event is None,
        msg=("Expected None matching, but got %s" % text_format.MessageToString(
            event, as_one_line=True)))


def NOT_FOR_YOU_assert_none(istream,
                            timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
    logging.debug("assert_none %fs" % (timeout.total_seconds()))
    try:
        event = istream.get_event_queue().get(timeout=timeout.total_seconds())
        asserts.assert_true(
            event is None,
            msg=("Expected None, but got %s" % text_format.MessageToString(
                event, as_one_line=True)))
    except Empty:
        return
+24 −0
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ from mobly import signals
from cert.event_stream import IEventStream
from cert.event_stream import NOT_FOR_YOU_assert_event_occurs
from cert.event_stream import NOT_FOR_YOU_assert_all_events_occur
from cert.event_stream import NOT_FOR_YOU_assert_none_matching
from cert.event_stream import NOT_FOR_YOU_assert_none

import sys, traceback

@@ -78,6 +80,17 @@ class EventStreamSubject(ObjectSubject):
        else:
            return MultiMatchStreamSubject(self._value, match_fns, timeout)

    def emitsNone(self, *match_fns, timeout=DEFAULT_TIMEOUT):
        if len(match_fns) == 0:
            NOT_FOR_YOU_assert_none(self._value, timeout=timeout)
            return EventStreamContinuationSubject(self._value)
        elif len(match_fns) == 1:
            NOT_FOR_YOU_assert_none_matching(
                self._value, match_fns[0], timeout=timeout)
            return EventStreamContinuationSubject(self._value)
        else:
            raise signals.TestFailure("Cannot specify multiple match functions")


class MultiMatchStreamSubject(object):

@@ -121,6 +134,17 @@ class EventStreamContinuationSubject(ObjectSubject):
        else:
            return MultiMatchStreamSubject(self._value, match_fns, timeout)

    def thenNone(self, *match_fns, timeout=DEFAULT_TIMEOUT):
        if len(match_fns) == 0:
            NOT_FOR_YOU_assert_none(self._value, timeout=timeout)
            return EventStreamContinuationSubject(self._value)
        elif len(match_fns) == 1:
            NOT_FOR_YOU_assert_none_matching(
                self._value, match_fns[0], timeout=timeout)
            return EventStreamContinuationSubject(self._value)
        else:
            raise signals.TestFailure("Cannot specify multiple match functions")


class BooleanSubject(ObjectSubject):

+12 −21
Original line number Diff line number Diff line
@@ -333,7 +333,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):

    def test_receive_packet_from_unknown_channel(self):
        self._setup_link_from_cert()
        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
        psm = 0x33
        scid = 0x41
        self._open_channel(1, scid, psm)
@@ -341,9 +340,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            0x99, 0, l2cap_packets.Final.NOT_SET, 1,
            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
        self.cert_send_b_frame(i_frame)
        cert_acl_data_stream.assert_none_matching(
        assertThat(self.cert_acl).emitsNone(
            L2capMatchers.SupervisoryFrame(scid, req_seq=4),
            timedelta(seconds=1))
            timeout=timedelta(seconds=1))

    def test_open_two_channels(self):
        self._setup_link_from_cert()
@@ -420,7 +419,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        L2CAP/COS/CED/BV-08-C
        """
        self._setup_link_from_cert()
        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()

        scid = 0x41
        psm = 0x33
@@ -431,7 +429,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):

        self._open_channel(1, scid, psm)

        cert_acl_data_stream.assert_none_matching(
        assertThat(self.cert_acl).emitsNone(
            L2capMatchers.ConfigurationResponse())

    def test_retry_config_after_rejection(self):
@@ -791,7 +789,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        Lower Tester
        """
        self._setup_link_from_cert()
        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
        self.control_table[
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
@@ -819,9 +816,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            assertThat(self.cert_acl).emits(
                L2capMatchers.SupervisoryFrame(scid, req_seq=i + 1))

        cert_acl_data_stream.assert_none_matching(
        assertThat(self.cert_acl).emitsNone(
            L2capMatchers.SupervisoryFrame(scid, req_seq=4),
            timedelta(seconds=1))
            timeout=timedelta(seconds=1))

    def test_resume_transmitting_when_received_rr(self):
        """
@@ -832,7 +829,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        """
        self.ertm_tx_window_size = 1
        self._setup_link_from_cert()
        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
        self.control_table[
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
@@ -859,7 +855,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        # TODO: Besides checking TxSeq, we also want to check payload, once we can get it from packet view
        assertThat(self.cert_acl).emits(
            L2capMatchers.InformationFrame(scid, tx_seq=0))
        cert_acl_data_stream.assert_none_matching(
        assertThat(self.cert_acl).emitsNone(
            L2capMatchers.InformationFrame(scid, tx_seq=1))
        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
@@ -877,7 +873,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        """
        self.ertm_tx_window_size = 1
        self._setup_link_from_cert()
        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
        self.control_table[
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
@@ -904,9 +899,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        assertThat(self.cert_acl).emits(
            L2capMatchers.InformationFrame(scid, tx_seq=0))
        # TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout
        cert_acl_data_stream.assert_none_matching(
        assertThat(self.cert_acl).emitsNone(
            L2capMatchers.InformationFrame(scid, tx_seq=1),
            timedelta(seconds=1))
            timeout=timedelta(seconds=1))

        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
            dcid, 0, l2cap_packets.Final.NOT_SET, 1,
@@ -1188,7 +1183,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        Lower Tester (S-frame [RNR]).
        """
        self._setup_link_from_cert()
        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
        self.control_table[
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
@@ -1220,7 +1214,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
        self.cert_send_b_frame(s_frame)

        cert_acl_data_stream.assert_none_matching(
        assertThat(self.cert_acl).emitsNone(
            L2capMatchers.InformationFrame(scid, tx_seq=0))

    def test_sent_rej_lost(self):
@@ -1287,7 +1281,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        Verify the IUT will only retransmit the requested I-frame once after receiving a duplicate SREJ.
        """
        self._setup_link_from_cert()
        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
        self.control_table[
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
@@ -1325,7 +1318,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
        self.cert_send_b_frame(s_frame)

        cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
        assertThat(self.cert_acl).emitsNone(timeout=timedelta(seconds=0.5))
        # Send SREJ with F set
        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
            dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
@@ -1343,7 +1336,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        retransmitted.
        """
        self._setup_link_from_cert()
        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
        self.control_table[
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
@@ -1382,7 +1374,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
        self.cert_send_b_frame(s_frame)

        cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
        assertThat(self.cert_acl).emitsNone(timeout=timedelta(seconds=0.5))

        # Send RR with F set
        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
@@ -1402,7 +1394,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        followed by an I-frame with the Final bit set that indicates the same I-frames should be retransmitted.
        """
        self._setup_link_from_cert()
        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
        self.control_table[
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
@@ -1441,7 +1432,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
        self.cert_send_b_frame(s_frame)

        cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
        assertThat(self.cert_acl).emitsNone(timeout=timedelta(seconds=0.5))

        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
            dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,