Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ed126191 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Behavior: Add wait for timeout"

parents 85bd8d7d fea737df
Loading
Loading
Loading
Loading
+53 −0
Original line number Diff line number Diff line
@@ -15,8 +15,11 @@
#   limitations under the License.

from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from mobly import signals
from threading import Condition

from cert.event_stream import static_remaining_time_delta
from cert.truth import assertThat


@@ -45,6 +48,8 @@ class SingleArgumentBehavior(object):
    def __init__(self, reply_stage_factory):
        self._reply_stage_factory = reply_stage_factory
        self._instances = []
        self._invoked_obj = []
        self._invoked_condition = Condition()
        self.set_default_to_crash()

    def begin(self, matcher):
@@ -66,15 +71,35 @@ class SingleArgumentBehavior(object):
    def run(self, obj):
        for instance in self._instances:
            if instance.try_run(obj):
                self.__obj_invoked(obj)
                return
        if self._default_fn is not None:
            # IGNORE_UNHANDLED is also a default fn
            self._default_fn(obj)
            self.__obj_invoked(obj)
        else:
            raise signals.TestFailure(
                "%s: behavior for %s went unhandled" %
                (self._reply_stage_factory().__class__.__name__, obj),
                extras=None)

    def __obj_invoked(self, obj):
        self._invoked_condition.acquire()
        self._invoked_obj.append(obj)
        self._invoked_condition.notify()
        self._invoked_condition.release()

    def wait_until_invoked(self, matcher, times, timeout):
        end_time = datetime.now() + timeout
        invoked_times = 0
        while datetime.now() < end_time and invoked_times < times:
            remaining = static_remaining_time_delta(end_time)
            invoked_times = sum((matcher(i) for i in self._invoked_obj))
            self._invoked_condition.acquire()
            self._invoked_condition.wait(remaining.total_seconds())
            self._invoked_condition.release()
        return invoked_times == times


class PersistenceStage(object):

@@ -121,3 +146,31 @@ class BehaviorInstance(object):
        self._called_count += 1
        self._fn(obj)
        return True


class BoundVerificationStage(object):

    def __init__(self, behavior, matcher, timeout):
        self._behavior = behavior
        self._matcher = matcher
        self._timeout = timeout

    def times(self, times=1):
        return self._behavior.wait_until_invoked(self._matcher, times,
                                                 self._timeout)


class WaitForBehaviorSubject(object):

    def __init__(self, behaviors, timeout):
        self._behaviors = behaviors
        self._timeout = timeout

    def __getattr__(self, item):
        behavior = getattr(self._behaviors, item + "_behavior")
        t = self._timeout
        return lambda matcher: BoundVerificationStage(behavior, matcher, t)


def wait_until(i_has_behaviors, timeout=timedelta(seconds=3)):
    return WaitForBehaviorSubject(i_has_behaviors.get_behaviors(), timeout)
+111 −1
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

from datetime import datetime, timedelta
import logging
from threading import Timer
import time

from mobly import asserts
@@ -28,7 +29,7 @@ from bluetooth_packets_python3 import l2cap_packets
from cert.event_stream import EventStream, FilteringEventStream
from cert.truth import assertThat
from cert.metadata import metadata
from cert.behavior import when
from cert.behavior import when, wait_until
from cert.behavior import IHasBehaviors
from cert.behavior import anything
from cert.behavior import SingleArgumentBehavior
@@ -646,3 +647,112 @@ class CertSelfTest(BaseTestClass):
        thing.behaviors.test_request_behavior.run("A")
        thing.behaviors.test_request_behavior.run("A")
        assertThat(thing.unhandled_count).isEqualTo(1)

    def test_fluent_behavior__wait_until_done(self):
        thing = ObjectWithBehaviors()
        is_a = lambda obj: obj == "A"
        when(thing).test_request(is_a).then().increment_count()

        closure = lambda: thing.behaviors.test_request_behavior.run("A")
        t = Timer(0.5, closure)
        t.start()

        wait_until(thing).test_request(is_a).times(1)
        assertThat(thing.count).isEqualTo(1)
        assertThat(thing.captured).isEqualTo(["A"])

    def test_fluent_behavior__wait_until_done_different_lambda(self):
        thing = ObjectWithBehaviors()
        when(thing).test_request(
            lambda obj: obj == "A").then().increment_count()

        closure = lambda: thing.behaviors.test_request_behavior.run("A")
        t = Timer(0.5, closure)
        t.start()

        wait_until(thing).test_request(lambda obj: obj == "A").times(1)
        assertThat(thing.count).isEqualTo(1)
        assertThat(thing.captured).isEqualTo(["A"])

    def test_fluent_behavior__wait_until_done_anything(self):
        thing = ObjectWithBehaviors()
        when(thing).test_request(
            lambda obj: obj == "A").then().increment_count()

        closure = lambda: thing.behaviors.test_request_behavior.run("A")
        t = Timer(0.5, closure)
        t.start()

        wait_until(thing).test_request(anything()).times(1)
        assertThat(thing.count).isEqualTo(1)
        assertThat(thing.captured).isEqualTo(["A"])

    def test_fluent_behavior__wait_until_done_not_happened(self):
        thing = ObjectWithBehaviors()
        thing.behaviors.test_request_behavior.set_default_to_ignore()
        when(thing).test_request(
            lambda obj: obj == "A").then().increment_count()

        closure = lambda: thing.behaviors.test_request_behavior.run("B")
        t = Timer(0.5, closure)
        t.start()
        assertThat(
            wait_until(thing).test_request(lambda obj: obj == "A").times(
                1)).isFalse()

    def test_fluent_behavior__wait_until_done_with_default(self):
        thing = ObjectWithBehaviors()
        thing.behaviors.test_request_behavior.set_default(
            lambda obj: thing.increment_unhandled())

        closure = lambda: thing.behaviors.test_request_behavior.run("A")
        t = Timer(0.5, closure)
        t.start()

        wait_until(thing).test_request(anything()).times(1)
        assertThat(thing.unhandled_count).isEqualTo(1)

    def test_fluent_behavior__wait_until_done_two_events_AA(self):
        thing = ObjectWithBehaviors()
        when(thing).test_request(
            lambda obj: obj == "A").then().increment_count().increment_count()

        closure1 = lambda: thing.behaviors.test_request_behavior.run("A")
        t1 = Timer(0.5, closure1)
        t1.start()
        closure2 = lambda: thing.behaviors.test_request_behavior.run("A")
        t2 = Timer(0.5, closure2)
        t2.start()

        wait_until(thing).test_request(lambda obj: obj == "A").times(2)
        assertThat(thing.count).isEqualTo(2)
        assertThat(thing.captured).isEqualTo(["A", "A"])

    def test_fluent_behavior__wait_until_done_two_events_AB(self):
        thing = ObjectWithBehaviors()
        when(thing).test_request(anything()).always().increment_count()

        closure1 = lambda: thing.behaviors.test_request_behavior.run("A")
        t1 = Timer(0.5, closure1)
        t1.start()
        closure2 = lambda: thing.behaviors.test_request_behavior.run("B")
        t2 = Timer(0.5, closure2)
        t2.start()

        wait_until(thing).test_request(anything()).times(2)
        assertThat(thing.count).isEqualTo(2)
        assertThat(thing.captured).isEqualTo(["A", "B"])

    def test_fluent_behavior__wait_until_done_only_one_event_is_done(self):
        thing = ObjectWithBehaviors()
        when(thing).test_request(anything()).always().increment_count()

        closure1 = lambda: thing.behaviors.test_request_behavior.run("A")
        t1 = Timer(1, closure1)
        t1.start()
        closure2 = lambda: thing.behaviors.test_request_behavior.run("B")
        t2 = Timer(3, closure2)
        t2.start()
        assertThat(
            wait_until(thing).test_request(lambda obj: obj == "A").times(
                2)).isFalse()