Loading system/gd/cert/cert_self_test.py 0 → 100644 +198 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2019 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import os import sys import logging import time sys.path.append(os.environ['ANDROID_BUILD_TOP'] + '/packages/modules/Bluetooth/system/gd') from datetime import datetime, timedelta from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass from cert.event_callback_stream import EventCallbackStream from cert.event_asserts import EventAsserts class BogusProto: class BogusType: def __init__(self): self.name = "BogusProto" self.is_extension = False self.cpp_type = False def type(self): return 'BogusRpc' def label(self): return "label" class BogusDescriptor: def __init__(self, name): self.full_name = name def __init__(self, value): self.value_ = value self.DESCRIPTOR = BogusProto.BogusDescriptor(str(value)) def __str__(self): return "BogusRpc value = " + str(self.value_) def ListFields(self): for field in [BogusProto.BogusType()]: yield [field, self.value_] class FetchEvents: def __init__(self, events, delay_ms): self.events_ = events self.sleep_time_ = (delay_ms * 1.0) / 1000 self.index_ = 0 self.done_ = False self.then_ = datetime.now() def __iter__(self): for event in self.events_: time.sleep(self.sleep_time_) if self.done_: return logging.debug("yielding %d" % event) yield BogusProto(event) def done(self): return self.done_ def cancel(self): logging.debug("cancel") self.done_ = True return None class CertSelfTest(GdFacadeOnlyBaseTestClass): def setup_test(self): return True def teardown_test(self): return True def test_assert_none_passes(self): with EventCallbackStream(FetchEvents(events=[], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none(timeout=timedelta(milliseconds=10)) def test_assert_none_passes_after_one_second(self): with EventCallbackStream(FetchEvents([1], delay_ms=1500)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none(timeout=timedelta(seconds=1.0)) def test_assert_none_fails(self): try: with EventCallbackStream(FetchEvents(events=[17], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none(timeout=timedelta(seconds=1)) except Exception as e: logging.debug(e) return True # Failed as expected return False def test_assert_none_matching_passes(self): with EventCallbackStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none_matching( lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15)) def test_assert_none_matching_passes_after_1_second(self): with EventCallbackStream( FetchEvents(events=[1, 2, 3, 4], delay_ms=400)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none_matching( lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) def test_assert_none_matching_fails(self): try: with EventCallbackStream( FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none_matching( lambda data: data.value_ == 2, timeout=timedelta(seconds=1)) except Exception as e: logging.debug(e) return True # Failed as expected return False def test_assert_occurs_at_least_passes(self): with EventCallbackStream( FetchEvents(events=[1, 2, 3, 1, 2, 3], delay_ms=40)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_event_occurs( lambda data: data.value_ == 1, timeout=timedelta(milliseconds=150), at_least_times=2) def test_assert_occurs_passes(self): with EventCallbackStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_event_occurs( lambda data: data.value_ == 1, timeout=timedelta(seconds=1)) def test_assert_occurs_fails(self): try: with EventCallbackStream( FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_event_occurs( lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) except Exception as e: logging.debug(e) return True # Failed as expected return False def test_assert_occurs_at_most_passes(self): with EventCallbackStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_event_occurs_at_most( lambda data: data.value_ < 4, timeout=timedelta(seconds=1), at_most_times=3) def test_assert_occurs_at_most_fails(self): try: with EventCallbackStream( FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_event_occurs_at_most( lambda data: data.value_ > 1, timeout=timedelta(seconds=1), at_most_times=2) except Exception as e: logging.debug(e) return True # Failed as expected return False system/gd/cert/cert_testcases_facade_only +1 −0 Original line number Diff line number Diff line CertSelfTest SimpleHalTest SimpleSecurityTest system/gd/cert/event_asserts.py +44 −40 Original line number Diff line number Diff line Loading @@ -40,7 +40,6 @@ class EventAsserts(object): being popped as asserted events happen """ DEFAULT_TIMEOUT_SECONDS = 3 DEFAULT_INCREMENTAL_TIMEOUT_SECONDS = 0.1 def __init__(self, event_callback_stream): if event_callback_stream is None: Loading @@ -60,12 +59,11 @@ class EventAsserts(object): :param timeout: a timedelta object :return: """ logging.debug("assert_none") logging.debug("assert_none %fs" % (timeout.total_seconds())) try: event = self.event_queue.get(timeout=timeout.seconds) asserts.assert_equal( event, None, event = self.event_queue.get(timeout=timeout.total_seconds()) asserts.assert_true( event is None, msg=("Expected None, but got %s" % text_format.MessageToString( event, as_one_line=True))) except Empty: Loading @@ -81,26 +79,30 @@ class EventAsserts(object): :param timeout: a timedelta object :return: """ logging.debug("assert_none_matching") logging.debug("assert_none_matching %fs" % (timeout.total_seconds())) event = None iter_count = 0 timeout_seconds = timeout.seconds while timeout_seconds > 0: end_time = datetime.now() + timeout while iter_count == 0 or event is None and datetime.now() < end_time: if iter_count > 0: remaining = end_time - datetime.now() else: remaining = timeout logging.debug("Waiting for event iteration %d %fs remaining" % (iter_count, remaining.total_seconds())) iter_count += 1 logging.debug("Waiting for event iteration %d" % iter_count) try: time_before = datetime.now() current_event = self.event_queue.get(timeout=timeout_seconds) time_elapsed = datetime.now() - time_before timeout_seconds -= time_elapsed.seconds current_event = self.event_queue.get( timeout=remaining.total_seconds()) if match_fn(current_event): event = current_event except Empty: continue logging.debug("Done waiting for an event") asserts.assert_equal( event, None, if event is None: return # Avoid an assert in MessageToString(None, ...) asserts.assert_true( event is None, msg=("Expected None matching, but got %s" % text_format.MessageToString(event, as_one_line=True))) Loading @@ -118,27 +120,29 @@ class EventAsserts(object): happen :return: """ logging.debug("assert_event_occurs") logging.debug("assert_event_occurs %d %fs" % (at_least_times, timeout.total_seconds())) event_list = [] iter_count = 0 timeout_seconds = timeout.seconds while len(event_list) < at_least_times and timeout_seconds > 0: end_time = datetime.now() + timeout while iter_count == 0 or len( event_list) < at_least_times and datetime.now() < end_time: if iter_count > 0: remaining = end_time - datetime.now() else: remaining = timeout logging.debug("Waiting for event iteration %d %fs remaining" % (iter_count, remaining.total_seconds())) iter_count += 1 logging.debug("Waiting for event iteration %d" % iter_count) try: time_before = datetime.now() current_event = self.event_queue.get(timeout=timeout_seconds) time_elapsed = datetime.now() - time_before timeout_seconds -= time_elapsed.seconds remaining = end_time - datetime.now() current_event = self.event_queue.get( timeout=remaining.total_seconds()) if match_fn(current_event): event_list.append(current_event) except Empty: continue logging.debug("Done waiting for event") if len(event_list) >= 1: logging.debug( "Done waiting for event, got %s" % text_format.MessageToString( event_list[-1], as_one_line=True)) asserts.assert_true( len(event_list) >= at_least_times, msg=("Expected at least %d events, but got %d" % (at_least_times, Loading @@ -162,23 +166,23 @@ class EventAsserts(object): logging.debug("assert_event_occurs_at_most") event_list = [] iter_count = 0 timeout_seconds = timeout.seconds while timeout_seconds > 0: end_time = datetime.now() + timeout while len(event_list) <= at_most_times and datetime.now() < end_time: if iter_count > 0: remaining = end_time - datetime.now() else: remaining = timeout logging.debug("Waiting for event iteration %d %fs remaining" % (iter_count, remaining.total_seconds())) iter_count += 1 logging.debug("Waiting for event iteration %d" % iter_count) try: time_before = datetime.now() current_event = self.event_queue.get(timeout=timeout_seconds) time_elapsed = datetime.now() - time_before timeout_seconds -= time_elapsed.seconds current_event = self.event_queue.get( timeout=remaining.total_seconds()) if match_fn(current_event): event_list.append(current_event) except Empty: continue if len(event_list) >= 1: logging.debug( "Done waiting for event, got %s" % text_format.MessageToString( event_list[-1], as_one_line=True)) logging.debug("Done waiting, got %d events" % len(event_list)) asserts.assert_true( len(event_list) <= at_most_times, msg=("Expected at most %d events, but got %d" % (at_most_times, Loading system/gd/cert/event_callback_stream.py +4 −1 Original line number Diff line number Diff line Loading @@ -60,7 +60,10 @@ class EventCallbackStream(object): def __exit__(self, type, value, traceback): self.shutdown() if traceback is None: return True else: return False def __del__(self): self.shutdown() Loading Loading
system/gd/cert/cert_self_test.py 0 → 100644 +198 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2019 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import os import sys import logging import time sys.path.append(os.environ['ANDROID_BUILD_TOP'] + '/packages/modules/Bluetooth/system/gd') from datetime import datetime, timedelta from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass from cert.event_callback_stream import EventCallbackStream from cert.event_asserts import EventAsserts class BogusProto: class BogusType: def __init__(self): self.name = "BogusProto" self.is_extension = False self.cpp_type = False def type(self): return 'BogusRpc' def label(self): return "label" class BogusDescriptor: def __init__(self, name): self.full_name = name def __init__(self, value): self.value_ = value self.DESCRIPTOR = BogusProto.BogusDescriptor(str(value)) def __str__(self): return "BogusRpc value = " + str(self.value_) def ListFields(self): for field in [BogusProto.BogusType()]: yield [field, self.value_] class FetchEvents: def __init__(self, events, delay_ms): self.events_ = events self.sleep_time_ = (delay_ms * 1.0) / 1000 self.index_ = 0 self.done_ = False self.then_ = datetime.now() def __iter__(self): for event in self.events_: time.sleep(self.sleep_time_) if self.done_: return logging.debug("yielding %d" % event) yield BogusProto(event) def done(self): return self.done_ def cancel(self): logging.debug("cancel") self.done_ = True return None class CertSelfTest(GdFacadeOnlyBaseTestClass): def setup_test(self): return True def teardown_test(self): return True def test_assert_none_passes(self): with EventCallbackStream(FetchEvents(events=[], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none(timeout=timedelta(milliseconds=10)) def test_assert_none_passes_after_one_second(self): with EventCallbackStream(FetchEvents([1], delay_ms=1500)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none(timeout=timedelta(seconds=1.0)) def test_assert_none_fails(self): try: with EventCallbackStream(FetchEvents(events=[17], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none(timeout=timedelta(seconds=1)) except Exception as e: logging.debug(e) return True # Failed as expected return False def test_assert_none_matching_passes(self): with EventCallbackStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none_matching( lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15)) def test_assert_none_matching_passes_after_1_second(self): with EventCallbackStream( FetchEvents(events=[1, 2, 3, 4], delay_ms=400)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none_matching( lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) def test_assert_none_matching_fails(self): try: with EventCallbackStream( FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_none_matching( lambda data: data.value_ == 2, timeout=timedelta(seconds=1)) except Exception as e: logging.debug(e) return True # Failed as expected return False def test_assert_occurs_at_least_passes(self): with EventCallbackStream( FetchEvents(events=[1, 2, 3, 1, 2, 3], delay_ms=40)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_event_occurs( lambda data: data.value_ == 1, timeout=timedelta(milliseconds=150), at_least_times=2) def test_assert_occurs_passes(self): with EventCallbackStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_event_occurs( lambda data: data.value_ == 1, timeout=timedelta(seconds=1)) def test_assert_occurs_fails(self): try: with EventCallbackStream( FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_event_occurs( lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) except Exception as e: logging.debug(e) return True # Failed as expected return False def test_assert_occurs_at_most_passes(self): with EventCallbackStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_event_occurs_at_most( lambda data: data.value_ < 4, timeout=timedelta(seconds=1), at_most_times=3) def test_assert_occurs_at_most_fails(self): try: with EventCallbackStream( FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: event_asserts = EventAsserts(event_stream) event_asserts.assert_event_occurs_at_most( lambda data: data.value_ > 1, timeout=timedelta(seconds=1), at_most_times=2) except Exception as e: logging.debug(e) return True # Failed as expected return False
system/gd/cert/cert_testcases_facade_only +1 −0 Original line number Diff line number Diff line CertSelfTest SimpleHalTest SimpleSecurityTest
system/gd/cert/event_asserts.py +44 −40 Original line number Diff line number Diff line Loading @@ -40,7 +40,6 @@ class EventAsserts(object): being popped as asserted events happen """ DEFAULT_TIMEOUT_SECONDS = 3 DEFAULT_INCREMENTAL_TIMEOUT_SECONDS = 0.1 def __init__(self, event_callback_stream): if event_callback_stream is None: Loading @@ -60,12 +59,11 @@ class EventAsserts(object): :param timeout: a timedelta object :return: """ logging.debug("assert_none") logging.debug("assert_none %fs" % (timeout.total_seconds())) try: event = self.event_queue.get(timeout=timeout.seconds) asserts.assert_equal( event, None, event = self.event_queue.get(timeout=timeout.total_seconds()) asserts.assert_true( event is None, msg=("Expected None, but got %s" % text_format.MessageToString( event, as_one_line=True))) except Empty: Loading @@ -81,26 +79,30 @@ class EventAsserts(object): :param timeout: a timedelta object :return: """ logging.debug("assert_none_matching") logging.debug("assert_none_matching %fs" % (timeout.total_seconds())) event = None iter_count = 0 timeout_seconds = timeout.seconds while timeout_seconds > 0: end_time = datetime.now() + timeout while iter_count == 0 or event is None and datetime.now() < end_time: if iter_count > 0: remaining = end_time - datetime.now() else: remaining = timeout logging.debug("Waiting for event iteration %d %fs remaining" % (iter_count, remaining.total_seconds())) iter_count += 1 logging.debug("Waiting for event iteration %d" % iter_count) try: time_before = datetime.now() current_event = self.event_queue.get(timeout=timeout_seconds) time_elapsed = datetime.now() - time_before timeout_seconds -= time_elapsed.seconds current_event = self.event_queue.get( timeout=remaining.total_seconds()) if match_fn(current_event): event = current_event except Empty: continue logging.debug("Done waiting for an event") asserts.assert_equal( event, None, if event is None: return # Avoid an assert in MessageToString(None, ...) asserts.assert_true( event is None, msg=("Expected None matching, but got %s" % text_format.MessageToString(event, as_one_line=True))) Loading @@ -118,27 +120,29 @@ class EventAsserts(object): happen :return: """ logging.debug("assert_event_occurs") logging.debug("assert_event_occurs %d %fs" % (at_least_times, timeout.total_seconds())) event_list = [] iter_count = 0 timeout_seconds = timeout.seconds while len(event_list) < at_least_times and timeout_seconds > 0: end_time = datetime.now() + timeout while iter_count == 0 or len( event_list) < at_least_times and datetime.now() < end_time: if iter_count > 0: remaining = end_time - datetime.now() else: remaining = timeout logging.debug("Waiting for event iteration %d %fs remaining" % (iter_count, remaining.total_seconds())) iter_count += 1 logging.debug("Waiting for event iteration %d" % iter_count) try: time_before = datetime.now() current_event = self.event_queue.get(timeout=timeout_seconds) time_elapsed = datetime.now() - time_before timeout_seconds -= time_elapsed.seconds remaining = end_time - datetime.now() current_event = self.event_queue.get( timeout=remaining.total_seconds()) if match_fn(current_event): event_list.append(current_event) except Empty: continue logging.debug("Done waiting for event") if len(event_list) >= 1: logging.debug( "Done waiting for event, got %s" % text_format.MessageToString( event_list[-1], as_one_line=True)) asserts.assert_true( len(event_list) >= at_least_times, msg=("Expected at least %d events, but got %d" % (at_least_times, Loading @@ -162,23 +166,23 @@ class EventAsserts(object): logging.debug("assert_event_occurs_at_most") event_list = [] iter_count = 0 timeout_seconds = timeout.seconds while timeout_seconds > 0: end_time = datetime.now() + timeout while len(event_list) <= at_most_times and datetime.now() < end_time: if iter_count > 0: remaining = end_time - datetime.now() else: remaining = timeout logging.debug("Waiting for event iteration %d %fs remaining" % (iter_count, remaining.total_seconds())) iter_count += 1 logging.debug("Waiting for event iteration %d" % iter_count) try: time_before = datetime.now() current_event = self.event_queue.get(timeout=timeout_seconds) time_elapsed = datetime.now() - time_before timeout_seconds -= time_elapsed.seconds current_event = self.event_queue.get( timeout=remaining.total_seconds()) if match_fn(current_event): event_list.append(current_event) except Empty: continue if len(event_list) >= 1: logging.debug( "Done waiting for event, got %s" % text_format.MessageToString( event_list[-1], as_one_line=True)) logging.debug("Done waiting, got %d events" % len(event_list)) asserts.assert_true( len(event_list) <= at_most_times, msg=("Expected at most %d events, but got %d" % (at_most_times, Loading
system/gd/cert/event_callback_stream.py +4 −1 Original line number Diff line number Diff line Loading @@ -60,7 +60,10 @@ class EventCallbackStream(object): def __exit__(self, type, value, traceback): self.shutdown() if traceback is None: return True else: return False def __del__(self): self.shutdown() Loading