Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit caf06134 authored by Zach Johnson's avatar Zach Johnson Committed by Gerrit Code Review
Browse files

Merge changes Ib9f7e800,Ife714ad8,I9f73cf16,I6879d056,Ia180ec59, ...

* changes:
  Migrate remaining event stream asserts in L2capTest to truth
  Migrate information frames to matchers
  Migrate supervisory frame matching logic
  Replace a few more matchers
  Migrate connection response to L2capMatchers
  Start migrating l2cap packet matchers to their own file
  Fix config request/response order comments
  Replace hardcoded control packet handling with lookup table
parents 3238fc58 2c501312
Loading
Loading
Loading
Loading
+46 −0
Original line number Diff line number Diff line
@@ -340,3 +340,49 @@ class CertSelfTest(BaseTestClass):
            logging.debug(e)
            return True  # Failed as expected
        return False

    def test_assertThat_emitsNone_passes(self):
        with EventStream(FetchEvents(events=[1, 2, 3],
                                     delay_ms=50)) as event_stream:
            assertThat(event_stream).emitsNone(
                lambda data: data.value_ == 4,
                timeout=timedelta(seconds=0.15)).thenNone(
                    lambda data: data.value_ == 5,
                    timeout=timedelta(seconds=0.15))

    def test_assertThat_emitsNone_passes_after_1_second(self):
        with EventStream(FetchEvents(events=[1, 2, 3, 4],
                                     delay_ms=400)) as event_stream:
            assertThat(event_stream).emitsNone(
                lambda data: data.value_ == 4, timeout=timedelta(seconds=1))

    def test_assertThat_emitsNone_fails(self):
        try:
            with EventStream(FetchEvents(events=[1, 2, 3],
                                         delay_ms=50)) as event_stream:
                assertThat(event_stream).emitsNone(
                    lambda data: data.value_ == 2, timeout=timedelta(seconds=1))
        except Exception as e:
            logging.debug(e)
            return True  # Failed as expected
        return False

    def test_assertThat_emitsNone_zero_passes(self):
        with EventStream(FetchEvents(events=[], delay_ms=50)) as event_stream:
            assertThat(event_stream).emitsNone(
                timeout=timedelta(milliseconds=10)).thenNone(
                    timeout=timedelta(milliseconds=10))

    def test_assertThat_emitsNone_zero_passes_after_one_second(self):
        with EventStream(FetchEvents([1], delay_ms=1500)) as event_stream:
            assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1.0))

    def test_assertThat_emitsNone_zero_fails(self):
        try:
            with EventStream(FetchEvents(events=[17],
                                         delay_ms=50)) as event_stream:
                assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1))
        except Exception as e:
            logging.debug(e)
            return True  # Failed as expected
        return False
+40 −30
Original line number Diff line number Diff line
@@ -165,15 +165,7 @@ class EventStream(IEventStream, Closable):
        :param timeout: a timedelta object
        :return:
        """
        logging.debug("assert_none %fs" % (timeout.total_seconds()))
        try:
            event = self.event_queue.get(timeout=timeout.total_seconds())
            asserts.assert_true(
                event is None,
                msg=("Expected None, but got %s" % text_format.MessageToString(
                    event, as_one_line=True)))
        except Empty:
            return
        NOT_FOR_YOU_assert_none(self, timeout)

    def assert_none_matching(
            self, match_fn, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
@@ -185,27 +177,7 @@ class EventStream(IEventStream, Closable):
        :param timeout: a timedelta object
        :return:
        """
        logging.debug("assert_none_matching %fs" % (timeout.total_seconds()))
        event = None
        end_time = datetime.now() + timeout
        while event is None and datetime.now() < end_time:
            remaining = static_remaining_time_delta(end_time)
            logging.debug("Waiting for event (%fs remaining)" %
                          (remaining.total_seconds()))
            try:
                current_event = self.event_queue.get(
                    timeout=remaining.total_seconds())
                if match_fn(current_event):
                    event = current_event
            except Empty:
                continue
        logging.debug("Done waiting for an event")
        if event is None:
            return  # Avoid an assert in MessageToString(None, ...)
        asserts.assert_true(
            event is None,
            msg=("Expected None matching, but got %s" %
                 text_format.MessageToString(event, as_one_line=True)))
        NOT_FOR_YOU_assert_none_matching(self, match_fn, timeout)

    def assert_event_occurs(self,
                            match_fn,
@@ -339,3 +311,41 @@ def NOT_FOR_YOU_assert_all_events_occur(
        asserts.assert_true(
            correct_order, "Events not received in correct order %s %s" %
            (match_fns, matched_order))


def NOT_FOR_YOU_assert_none_matching(
        istream, match_fn, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
    logging.debug("assert_none_matching %fs" % (timeout.total_seconds()))
    event = None
    end_time = datetime.now() + timeout
    while event is None and datetime.now() < end_time:
        remaining = static_remaining_time_delta(end_time)
        logging.debug(
            "Waiting for event (%fs remaining)" % (remaining.total_seconds()))
        try:
            current_event = istream.get_event_queue().get(
                timeout=remaining.total_seconds())
            if match_fn(current_event):
                event = current_event
        except Empty:
            continue
    logging.debug("Done waiting for an event")
    if event is None:
        return  # Avoid an assert in MessageToString(None, ...)
    asserts.assert_true(
        event is None,
        msg=("Expected None matching, but got %s" % text_format.MessageToString(
            event, as_one_line=True)))


def NOT_FOR_YOU_assert_none(istream,
                            timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
    logging.debug("assert_none %fs" % (timeout.total_seconds()))
    try:
        event = istream.get_event_queue().get(timeout=timeout.total_seconds())
        asserts.assert_true(
            event is None,
            msg=("Expected None, but got %s" % text_format.MessageToString(
                event, as_one_line=True)))
    except Empty:
        return
+152 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import CommandCode
from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult


class L2capMatchers(object):

    @staticmethod
    def ConnectionResponse(scid):
        return lambda packet: L2capMatchers._is_matching_connection_response(packet, scid)

    @staticmethod
    def ConnectionRequest():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST)

    @staticmethod
    def ConfigurationResponse():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE)

    @staticmethod
    def ConfigurationRequest():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)

    @staticmethod
    def DisconnectionRequest():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST)

    @staticmethod
    def DisconnectionResponse(scid, dcid):
        return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid)

    @staticmethod
    def CommandReject():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT)

    @staticmethod
    def SupervisoryFrame(scid, req_seq=None, f=None, s=None, p=None):
        return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, scid, req_seq, f, s, p)

    @staticmethod
    def InformationFrame(scid, tx_seq=None, payload=None):
        return lambda packet: L2capMatchers._is_matching_information_frame(packet, scid, tx_seq, payload)

    @staticmethod
    def _basic_frame(packet):
        if packet is None:
            return None
        return l2cap_packets.BasicFrameView(
            bt_packets.PacketViewLittleEndian(list(packet.payload)))

    @staticmethod
    def _information_frame(packet, scid):
        frame = L2capMatchers._basic_frame(packet)
        if frame.GetChannelId() != scid:
            return None
        standard_frame = l2cap_packets.StandardFrameView(frame)
        if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
            return None
        return l2cap_packets.EnhancedInformationFrameView(standard_frame)

    @staticmethod
    def _supervisory_frame(packet, scid):
        frame = L2capMatchers._basic_frame(packet)
        if frame.GetChannelId() != scid:
            return None
        standard_frame = l2cap_packets.StandardFrameView(frame)
        if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME:
            return None
        return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame)

    @staticmethod
    def _is_matching_information_frame(packet, scid, tx_seq, payload):
        frame = L2capMatchers._information_frame(packet, scid)
        if frame is None:
            return False
        if tx_seq is not None and frame.GetTxSeq() != tx_seq:
            return False
        if payload is not None and frame.GetPayload(
        ) != payload:  # TODO(mylesgw) this doesn't work
            return False
        return True

    @staticmethod
    def _is_matching_supervisory_frame(packet, scid, req_seq, f, s, p):
        frame = L2capMatchers._supervisory_frame(packet, scid)
        if frame is None:
            return False
        if req_seq is not None and frame.GetReqSeq() != req_seq:
            return False
        if f is not None and frame.GetF() != f:
            return False
        if s is not None and frame.GetS() != s:
            return False
        if p is not None and frame.GetP() != p:
            return False
        return True

    @staticmethod
    def _control_frame(packet):
        frame = L2capMatchers._basic_frame(packet)
        if frame is None or frame.GetChannelId() != 1:
            return None
        return l2cap_packets.ControlView(frame.GetPayload())

    @staticmethod
    def _control_frame_with_code(packet, code):
        frame = L2capMatchers._control_frame(packet)
        if frame is None or frame.GetCode() != code:
            return None
        return frame

    @staticmethod
    def _is_control_frame_with_code(packet, code):
        return L2capMatchers._control_frame_with_code(packet, code) is not None

    @staticmethod
    def _is_matching_connection_response(packet, scid):
        frame = L2capMatchers._control_frame_with_code(
            packet, CommandCode.CONNECTION_RESPONSE)
        if frame is None:
            return False
        response = l2cap_packets.ConnectionResponseView(frame)
        return response.GetSourceCid() == scid and response.GetResult(
        ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid(
        ) != 0

    @staticmethod
    def _is_matching_disconnection_response(packet, scid, dcid):
        frame = L2capMatchers._control_frame_with_code(
            packet, CommandCode.DISCONNECTION_RESPONSE)
        if frame is None:
            return False
        response = l2cap_packets.DisconnectionResponseView(frame)
        return response.GetSourceCid() == scid and response.GetDestinationCid(
        ) == dcid
+24 −0
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ from mobly import signals
from cert.event_stream import IEventStream
from cert.event_stream import NOT_FOR_YOU_assert_event_occurs
from cert.event_stream import NOT_FOR_YOU_assert_all_events_occur
from cert.event_stream import NOT_FOR_YOU_assert_none_matching
from cert.event_stream import NOT_FOR_YOU_assert_none

import sys, traceback

@@ -78,6 +80,17 @@ class EventStreamSubject(ObjectSubject):
        else:
            return MultiMatchStreamSubject(self._value, match_fns, timeout)

    def emitsNone(self, *match_fns, timeout=DEFAULT_TIMEOUT):
        if len(match_fns) == 0:
            NOT_FOR_YOU_assert_none(self._value, timeout=timeout)
            return EventStreamContinuationSubject(self._value)
        elif len(match_fns) == 1:
            NOT_FOR_YOU_assert_none_matching(
                self._value, match_fns[0], timeout=timeout)
            return EventStreamContinuationSubject(self._value)
        else:
            raise signals.TestFailure("Cannot specify multiple match functions")


class MultiMatchStreamSubject(object):

@@ -121,6 +134,17 @@ class EventStreamContinuationSubject(ObjectSubject):
        else:
            return MultiMatchStreamSubject(self._value, match_fns, timeout)

    def thenNone(self, *match_fns, timeout=DEFAULT_TIMEOUT):
        if len(match_fns) == 0:
            NOT_FOR_YOU_assert_none(self._value, timeout=timeout)
            return EventStreamContinuationSubject(self._value)
        elif len(match_fns) == 1:
            NOT_FOR_YOU_assert_none_matching(
                self._value, match_fns[0], timeout=timeout)
            return EventStreamContinuationSubject(self._value)
        else:
            raise signals.TestFailure("Cannot specify multiple match functions")


class BooleanSubject(ObjectSubject):

+229 −376

File changed.

Preview size limit exceeded, changes collapsed.