Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d3231f60 authored by Myles Watson's avatar Myles Watson Committed by android-build-merger
Browse files

Merge "Cert: Add tests for event_asserts"

am: ac92bb00

Change-Id: I8d49f3ac231f746f9d9e675d6e9766cefdc1086b
parents 8e672956 ac92bb00
Loading
Loading
Loading
Loading
+198 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2019 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from __future__ import print_function

import os
import sys
import logging
import time

sys.path.append(os.environ['ANDROID_BUILD_TOP'] + '/packages/modules/Bluetooth/system/gd')

from datetime import datetime, timedelta
from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
from cert.event_callback_stream import EventCallbackStream
from cert.event_asserts import EventAsserts


class BogusProto:

    class BogusType:

        def __init__(self):
            self.name = "BogusProto"
            self.is_extension = False
            self.cpp_type = False

        def type(self):
            return 'BogusRpc'

        def label(self):
            return "label"

    class BogusDescriptor:

        def __init__(self, name):
            self.full_name = name

    def __init__(self, value):
        self.value_ = value
        self.DESCRIPTOR = BogusProto.BogusDescriptor(str(value))

    def __str__(self):
        return "BogusRpc value = " + str(self.value_)

    def ListFields(self):
        for field in [BogusProto.BogusType()]:
            yield [field, self.value_]


class FetchEvents:

    def __init__(self, events, delay_ms):
        self.events_ = events
        self.sleep_time_ = (delay_ms * 1.0) / 1000
        self.index_ = 0
        self.done_ = False
        self.then_ = datetime.now()

    def __iter__(self):
        for event in self.events_:
            time.sleep(self.sleep_time_)
            if self.done_:
                return
            logging.debug("yielding %d" % event)
            yield BogusProto(event)

    def done(self):
        return self.done_

    def cancel(self):
        logging.debug("cancel")
        self.done_ = True
        return None


class CertSelfTest(GdFacadeOnlyBaseTestClass):

    def setup_test(self):
        return True

    def teardown_test(self):
        return True

    def test_assert_none_passes(self):
        with EventCallbackStream(FetchEvents(events=[],
                                             delay_ms=50)) as event_stream:
            event_asserts = EventAsserts(event_stream)
            event_asserts.assert_none(timeout=timedelta(milliseconds=10))

    def test_assert_none_passes_after_one_second(self):
        with EventCallbackStream(FetchEvents([1],
                                             delay_ms=1500)) as event_stream:
            event_asserts = EventAsserts(event_stream)
            event_asserts.assert_none(timeout=timedelta(seconds=1.0))

    def test_assert_none_fails(self):
        try:
            with EventCallbackStream(FetchEvents(events=[17],
                                                 delay_ms=50)) as event_stream:
                event_asserts = EventAsserts(event_stream)
                event_asserts.assert_none(timeout=timedelta(seconds=1))
        except Exception as e:
            logging.debug(e)
            return True  # Failed as expected
        return False

    def test_assert_none_matching_passes(self):
        with EventCallbackStream(FetchEvents(events=[1, 2, 3],
                                             delay_ms=50)) as event_stream:
            event_asserts = EventAsserts(event_stream)
            event_asserts.assert_none_matching(
                lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15))

    def test_assert_none_matching_passes_after_1_second(self):
        with EventCallbackStream(
                FetchEvents(events=[1, 2, 3, 4], delay_ms=400)) as event_stream:
            event_asserts = EventAsserts(event_stream)
            event_asserts.assert_none_matching(
                lambda data: data.value_ == 4, timeout=timedelta(seconds=1))

    def test_assert_none_matching_fails(self):
        try:
            with EventCallbackStream(
                    FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
                event_asserts = EventAsserts(event_stream)
                event_asserts.assert_none_matching(
                    lambda data: data.value_ == 2, timeout=timedelta(seconds=1))
        except Exception as e:
            logging.debug(e)
            return True  # Failed as expected
        return False

    def test_assert_occurs_at_least_passes(self):
        with EventCallbackStream(
                FetchEvents(events=[1, 2, 3, 1, 2, 3],
                            delay_ms=40)) as event_stream:
            event_asserts = EventAsserts(event_stream)
            event_asserts.assert_event_occurs(
                lambda data: data.value_ == 1,
                timeout=timedelta(milliseconds=150),
                at_least_times=2)

    def test_assert_occurs_passes(self):
        with EventCallbackStream(FetchEvents(events=[1, 2, 3],
                                             delay_ms=50)) as event_stream:
            event_asserts = EventAsserts(event_stream)
            event_asserts.assert_event_occurs(
                lambda data: data.value_ == 1, timeout=timedelta(seconds=1))

    def test_assert_occurs_fails(self):
        try:
            with EventCallbackStream(
                    FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
                event_asserts = EventAsserts(event_stream)
                event_asserts.assert_event_occurs(
                    lambda data: data.value_ == 4, timeout=timedelta(seconds=1))
        except Exception as e:
            logging.debug(e)
            return True  # Failed as expected
        return False

    def test_assert_occurs_at_most_passes(self):
        with EventCallbackStream(FetchEvents(events=[1, 2, 3, 4],
                                             delay_ms=50)) as event_stream:
            event_asserts = EventAsserts(event_stream)
            event_asserts.assert_event_occurs_at_most(
                lambda data: data.value_ < 4,
                timeout=timedelta(seconds=1),
                at_most_times=3)

    def test_assert_occurs_at_most_fails(self):
        try:
            with EventCallbackStream(
                    FetchEvents(events=[1, 2, 3, 4],
                                delay_ms=50)) as event_stream:
                event_asserts = EventAsserts(event_stream)
                event_asserts.assert_event_occurs_at_most(
                    lambda data: data.value_ > 1,
                    timeout=timedelta(seconds=1),
                    at_most_times=2)
        except Exception as e:
            logging.debug(e)
            return True  # Failed as expected
        return False
+1 −0
Original line number Diff line number Diff line
CertSelfTest
SimpleHalTest
SimpleSecurityTest
+44 −40
Original line number Diff line number Diff line
@@ -40,7 +40,6 @@ class EventAsserts(object):
    being popped as asserted events happen
    """
    DEFAULT_TIMEOUT_SECONDS = 3
    DEFAULT_INCREMENTAL_TIMEOUT_SECONDS = 0.1

    def __init__(self, event_callback_stream):
        if event_callback_stream is None:
@@ -60,12 +59,11 @@ class EventAsserts(object):
        :param timeout: a timedelta object
        :return:
        """
        logging.debug("assert_none")
        logging.debug("assert_none %fs" % (timeout.total_seconds()))
        try:
            event = self.event_queue.get(timeout=timeout.seconds)
            asserts.assert_equal(
                event,
                None,
            event = self.event_queue.get(timeout=timeout.total_seconds())
            asserts.assert_true(
                event is None,
                msg=("Expected None, but got %s" % text_format.MessageToString(
                    event, as_one_line=True)))
        except Empty:
@@ -81,26 +79,30 @@ class EventAsserts(object):
        :param timeout: a timedelta object
        :return:
        """
        logging.debug("assert_none_matching")
        logging.debug("assert_none_matching %fs" % (timeout.total_seconds()))
        event = None
        iter_count = 0
        timeout_seconds = timeout.seconds
        while timeout_seconds > 0:
        end_time = datetime.now() + timeout
        while iter_count == 0 or event is None and datetime.now() < end_time:
            if iter_count > 0:
                remaining = end_time - datetime.now()
            else:
                remaining = timeout
            logging.debug("Waiting for event iteration %d %fs remaining" %
                          (iter_count, remaining.total_seconds()))
            iter_count += 1
            logging.debug("Waiting for event iteration %d" % iter_count)
            try:
                time_before = datetime.now()
                current_event = self.event_queue.get(timeout=timeout_seconds)
                time_elapsed = datetime.now() - time_before
                timeout_seconds -= time_elapsed.seconds
                current_event = self.event_queue.get(
                    timeout=remaining.total_seconds())
                if match_fn(current_event):
                    event = current_event
            except Empty:
                continue
        logging.debug("Done waiting for an event")
        asserts.assert_equal(
            event,
            None,
        if event is None:
            return  # Avoid an assert in MessageToString(None, ...)
        asserts.assert_true(
            event is None,
            msg=("Expected None matching, but got %s" %
                 text_format.MessageToString(event, as_one_line=True)))

@@ -118,27 +120,29 @@ class EventAsserts(object):
                               happen
        :return:
        """
        logging.debug("assert_event_occurs")
        logging.debug("assert_event_occurs %d %fs" % (at_least_times,
                                                      timeout.total_seconds()))
        event_list = []
        iter_count = 0
        timeout_seconds = timeout.seconds
        while len(event_list) < at_least_times and timeout_seconds > 0:
        end_time = datetime.now() + timeout
        while iter_count == 0 or len(
                event_list) < at_least_times and datetime.now() < end_time:
            if iter_count > 0:
                remaining = end_time - datetime.now()
            else:
                remaining = timeout
            logging.debug("Waiting for event iteration %d %fs remaining" %
                          (iter_count, remaining.total_seconds()))
            iter_count += 1
            logging.debug("Waiting for event iteration %d" % iter_count)
            try:
                time_before = datetime.now()
                current_event = self.event_queue.get(timeout=timeout_seconds)
                time_elapsed = datetime.now() - time_before
                timeout_seconds -= time_elapsed.seconds
                remaining = end_time - datetime.now()
                current_event = self.event_queue.get(
                    timeout=remaining.total_seconds())
                if match_fn(current_event):
                    event_list.append(current_event)
            except Empty:
                continue
        logging.debug("Done waiting for event")
        if len(event_list) >= 1:
            logging.debug(
                "Done waiting for event, got %s" % text_format.MessageToString(
                    event_list[-1], as_one_line=True))
        asserts.assert_true(
            len(event_list) >= at_least_times,
            msg=("Expected at least %d events, but got %d" % (at_least_times,
@@ -162,23 +166,23 @@ class EventAsserts(object):
        logging.debug("assert_event_occurs_at_most")
        event_list = []
        iter_count = 0
        timeout_seconds = timeout.seconds
        while timeout_seconds > 0:
        end_time = datetime.now() + timeout
        while len(event_list) <= at_most_times and datetime.now() < end_time:
            if iter_count > 0:
                remaining = end_time - datetime.now()
            else:
                remaining = timeout
            logging.debug("Waiting for event iteration %d %fs remaining" %
                          (iter_count, remaining.total_seconds()))
            iter_count += 1
            logging.debug("Waiting for event iteration %d" % iter_count)
            try:
                time_before = datetime.now()
                current_event = self.event_queue.get(timeout=timeout_seconds)
                time_elapsed = datetime.now() - time_before
                timeout_seconds -= time_elapsed.seconds
                current_event = self.event_queue.get(
                    timeout=remaining.total_seconds())
                if match_fn(current_event):
                    event_list.append(current_event)
            except Empty:
                continue
        if len(event_list) >= 1:
            logging.debug(
                "Done waiting for event, got %s" % text_format.MessageToString(
                    event_list[-1], as_one_line=True))
        logging.debug("Done waiting, got %d events" % len(event_list))
        asserts.assert_true(
            len(event_list) <= at_most_times,
            msg=("Expected at most %d events, but got %d" % (at_most_times,
+4 −1
Original line number Diff line number Diff line
@@ -60,7 +60,10 @@ class EventCallbackStream(object):

    def __exit__(self, type, value, traceback):
        self.shutdown()
        if traceback is None:
            return True
        else:
            return False

    def __del__(self):
        self.shutdown()