Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 962ee81a authored by Chienyuan's avatar Chienyuan Committed by Hansong Zhang
Browse files

Cert: implement assert function with event buffer

Test: run gd/cert/run_cert.sh
Change-Id: I237ecdee7e70b860f3e73d4825dd79b0235ae22b
parent 32c91e3e
Loading
Loading
Loading
Loading
+49 −0
Original line number Diff line number Diff line
@@ -23,9 +23,15 @@ from grpc import RpcError
from grpc import StatusCode

class EventStream(object):

  event_buffer = []

  def __init__(self, stream_stub_fn):
    self.stream_stub_fn = stream_stub_fn

  def clear_event_buffer(self):
    self.event_buffer.clear()

  def subscribe(self):
    return self.stream_stub_fn(
        common_pb2.EventStreamRequest(
@@ -42,8 +48,49 @@ class EventStream(object):
        )
    )

  def assert_none(self):
    response = self.stream_stub_fn(
        common_pb2.EventStreamRequest(
            subscription_mode=common_pb2.NONE,
            fetch_mode=common_pb2.ALL_CURRENT
        )
    )

    try:
      for event in response:
        self.event_buffer.append(event)
    except RpcError:
        pass

    if len(self.event_buffer) != 0:
      asserts.fail("event_buffer is not empty \n%s" % self.event_buffer)

  def assert_none_matching(self, match_fn):
    response = self.stream_stub_fn(
        common_pb2.EventStreamRequest(
            subscription_mode=common_pb2.NONE,
            fetch_mode=common_pb2.ALL_CURRENT
        )
    )

    try:
      for event in response:
        self.event_buffer.append(event)
    except RpcError:
      pass

    for event in self.event_buffer:
      if match_fn(event):
        asserts.fail("event %s occurs" % event)

  def assert_event_occurs(self, match_fn, timeout=timedelta(seconds=3)):
    expiration_time = datetime.now() + timeout

    while len(self.event_buffer):
      element = self.event_buffer.pop(0)
      if match_fn(element):
        return

    while (True):
      if datetime.now() > expiration_time:
        asserts.fail("timeout of %s exceeded" % str(timeout))
@@ -59,6 +106,8 @@ class EventStream(object):
      try:
        for event in response:
          if (match_fn(event)):
            for remain_event in response:
              self.event_buffer.append(remain_event)
            return
      except RpcError:
        if response.code() == StatusCode.DEADLINE_EXCEEDED:
+9 −0
Original line number Diff line number Diff line
@@ -25,6 +25,15 @@ from hal import facade_pb2 as hal_facade_pb2

class SimpleHalTest(GdBaseTestClass):

    def test_none_event(self):
        self.gd_devices[0].hal.hci_event_stream.clear_event_buffer()

        self.gd_devices[0].hal.hci_event_stream.subscribe()

        self.gd_devices[0].hal.hci_event_stream.assert_none()

        self.gd_devices[0].hal.hci_event_stream.unsubscribe()

    def test_fetch_hci_event(self):
        self.gd_devices[0].hal.SetLoopbackMode(
            hal_facade_pb2.LoopbackModeSettings(enable=True)