Loading system/gd/cert/event_stream.py +22 −24 Original line number Diff line number Diff line Loading @@ -81,34 +81,33 @@ class EventStream(IEventStream, Closable): self.event_queue = SimpleQueue() self.handlers = [] self.executor = ThreadPoolExecutor() self.future = self.executor.submit(EventStream._event_loop, self) self.future = self.executor.submit(EventStream.__event_loop, self) def get_event_queue(self): return self.event_queue def close(self): """ Stop the gRPC lambda so that event_callback will not be invoked after th method returns. Stop the gRPC lambda so that event_callback will not be invoked after the method returns. This object will be useless after this call as there is no way to restart the gRPC callback. You would have to create a new EventStream This object will be useless after this call as there is no way to restart the gRPC callback. You would have to create a new EventStream :return: None on success, exception object on failure :raise None on success, or the same exception as __event_loop(), or concurrent.futures.TimeoutError if underlying stream failed to terminate within DEFAULT_TIMEOUT_SECONDS """ while not self.server_stream_call.done(): # Try to cancel the execution, don't care the result, non-blocking self.server_stream_call.cancel() exception_for_return = None try: result = self.future.result() if result: logging.warning("Inner loop error %s" % result) raise result except Exception as exp: logging.warning("Exception: %s" % (exp)) exception_for_return = exp self.executor.shutdown() return exception_for_return # cancelling gRPC stream should cause __event_loop() to quit # same exception will be raised by future.result() or # concurrent.futures.TimeoutError will be raised after timeout self.future.result(timeout=DEFAULT_TIMEOUT_SECONDS) finally: # Make sure we force shutdown the executor regardless of the result self.executor.shutdown(wait=False) def register_callback(self, callback, matcher_fn=None): """ Loading Loading @@ -145,11 +144,11 @@ class EventStream(IEventStream, Closable): raise ValueError("callback must not be None") self.handlers.remove((callback, matcher_fn)) def _event_loop(self): def __event_loop(self): """ Main loop for consuming the gRPC stream events. Blocks until computation is cancelled :return: None on success, exception object on failure :raise grpc.Error on failure """ try: for event in self.server_stream_call: Loading @@ -157,14 +156,13 @@ class EventStream(IEventStream, Closable): for (callback, matcher_fn) in self.handlers: if not matcher_fn or matcher_fn(event): callback(event) return None except RpcError as exp: # Underlying gRPC stream should run indefinitely until cancelled # Hence any other reason besides CANCELLED is raised as an error if self.server_stream_call.cancelled(): logging.debug("Cancelled") return None else: logging.warning("Some RPC error not due to cancellation") return exp raise exp def assert_none(self, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)): """ Loading system/gd/cert/gd_base_test.py +53 −1 Original line number Diff line number Diff line Loading @@ -19,8 +19,12 @@ import logging import os import signal import subprocess import traceback from acts import asserts from functools import wraps from grpc import RpcError from acts import asserts, signals from acts.context import get_current_context from acts.base_test import BaseTestClass Loading Loading @@ -133,3 +137,51 @@ class GdBaseTestClass(BaseTestClass): def teardown_test(self): self.cert.rootservice.StopStack(facade_rootservice.StopStackRequest()) self.dut.rootservice.StopStack(facade_rootservice.StopStackRequest()) def __getattribute__(self, name): attr = super().__getattribute__(name) if not callable(attr) or not GdBaseTestClass.__is_entry_function(name): return attr @wraps(attr) def __wrapped(*args, **kwargs): try: return attr(*args, **kwargs) except RpcError as e: exception_info = "".join( traceback.format_exception(e.__class__, e, e.__traceback__)) raise signals.TestFailure( "RpcError during test\n\nRpcError:\n\n%s\n%s" % (exception_info, self.__dump_crashes())) return __wrapped __ENTRY_METHODS = { "setup_class", "teardown_class", "setup_test", "teardown_test" } @staticmethod def __is_entry_function(name): return name.startswith( "test_") or name in GdBaseTestClass.__ENTRY_METHODS def __dump_crashes(self): """ :return: formatted stack traces if found, or last few lines of log """ dut_crash, dut_log_tail = self.dut.get_crash_snippet_and_log_tail() cert_crash, cert_log_tail = self.cert.get_crash_snippet_and_log_tail() crash_detail = "" if dut_crash or cert_crash: if dut_crash: crash_detail += "dut stack crashed:\n\n%s\n\n" % dut_crash if cert_crash: crash_detail += "cert stack crashed:\n\n%s\n\n" % cert_crash else: if dut_log_tail: crash_detail += "dut log tail:\n\n%s\n\n" % dut_log_tail if cert_log_tail: crash_detail += "cert log tail:\n\n%s\n\n" % cert_log_tail return crash_detail system/gd/cert/gd_device.py +44 −0 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ # limitations under the License. from abc import ABC from collections import deque import inspect import logging import os Loading @@ -24,6 +25,7 @@ import signal import socket import subprocess import time import re from typing import List import grpc Loading Loading @@ -247,6 +249,48 @@ class GdDeviceBase(ABC): self.security = security_facade_pb2_grpc.SecurityModuleFacadeStub( self.grpc_channel) # e.g. 2020-05-06 16:02:04.216 bt - packages/modules/Bluetooth/system/gd/facade/facade_main.cc:79 - crash_callback: #03 pc 0000000000013520 /lib/x86_64-linux-gnu/libpthread-2.29.so HOST_CRASH_LINE_REGEX = re.compile(r"^.* - crash_callback: (?P<line>.*)$") HOST_CRASH_HEADER = "Process crashed, signal: Aborted" def get_crash_snippet_and_log_tail(self): """ Get crash snippet if regex matched or last 20 lines of log :return: crash_snippet, log_tail_20 1) crash snippet without timestamp in one string; 2) last 20 lines of log in one string; """ if is_subprocess_alive(self.backing_process): return None, None gd_root_prefix = get_gd_root() + "/" abort_line = None last_20_lines = deque(maxlen=20) crash_log_lines = [] with open(self.backing_process_log_path) as f: for _, line in enumerate(f): m = self.HOST_CRASH_LINE_REGEX.match(line) if m: crash_line = m.group("line").replace(gd_root_prefix, "") if self.HOST_CRASH_HEADER in crash_line \ and len(last_20_lines) > 0: abort_line = last_20_lines[-1] crash_log_lines.append(crash_line) last_20_lines.append(line) log_tail_20 = "".join(last_20_lines) if len(crash_log_lines) == 0: return None, log_tail_20 crash_snippet = "" if abort_line is not None: crash_snippet += "abort log line:\n\n%s\n" % abort_line crash_snippet += "\n".join(crash_log_lines) return crash_snippet, log_tail_20 def teardown(self): """Tear down this device and clean up any resources. - Must be called after setup() Loading Loading
system/gd/cert/event_stream.py +22 −24 Original line number Diff line number Diff line Loading @@ -81,34 +81,33 @@ class EventStream(IEventStream, Closable): self.event_queue = SimpleQueue() self.handlers = [] self.executor = ThreadPoolExecutor() self.future = self.executor.submit(EventStream._event_loop, self) self.future = self.executor.submit(EventStream.__event_loop, self) def get_event_queue(self): return self.event_queue def close(self): """ Stop the gRPC lambda so that event_callback will not be invoked after th method returns. Stop the gRPC lambda so that event_callback will not be invoked after the method returns. This object will be useless after this call as there is no way to restart the gRPC callback. You would have to create a new EventStream This object will be useless after this call as there is no way to restart the gRPC callback. You would have to create a new EventStream :return: None on success, exception object on failure :raise None on success, or the same exception as __event_loop(), or concurrent.futures.TimeoutError if underlying stream failed to terminate within DEFAULT_TIMEOUT_SECONDS """ while not self.server_stream_call.done(): # Try to cancel the execution, don't care the result, non-blocking self.server_stream_call.cancel() exception_for_return = None try: result = self.future.result() if result: logging.warning("Inner loop error %s" % result) raise result except Exception as exp: logging.warning("Exception: %s" % (exp)) exception_for_return = exp self.executor.shutdown() return exception_for_return # cancelling gRPC stream should cause __event_loop() to quit # same exception will be raised by future.result() or # concurrent.futures.TimeoutError will be raised after timeout self.future.result(timeout=DEFAULT_TIMEOUT_SECONDS) finally: # Make sure we force shutdown the executor regardless of the result self.executor.shutdown(wait=False) def register_callback(self, callback, matcher_fn=None): """ Loading Loading @@ -145,11 +144,11 @@ class EventStream(IEventStream, Closable): raise ValueError("callback must not be None") self.handlers.remove((callback, matcher_fn)) def _event_loop(self): def __event_loop(self): """ Main loop for consuming the gRPC stream events. Blocks until computation is cancelled :return: None on success, exception object on failure :raise grpc.Error on failure """ try: for event in self.server_stream_call: Loading @@ -157,14 +156,13 @@ class EventStream(IEventStream, Closable): for (callback, matcher_fn) in self.handlers: if not matcher_fn or matcher_fn(event): callback(event) return None except RpcError as exp: # Underlying gRPC stream should run indefinitely until cancelled # Hence any other reason besides CANCELLED is raised as an error if self.server_stream_call.cancelled(): logging.debug("Cancelled") return None else: logging.warning("Some RPC error not due to cancellation") return exp raise exp def assert_none(self, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)): """ Loading
system/gd/cert/gd_base_test.py +53 −1 Original line number Diff line number Diff line Loading @@ -19,8 +19,12 @@ import logging import os import signal import subprocess import traceback from acts import asserts from functools import wraps from grpc import RpcError from acts import asserts, signals from acts.context import get_current_context from acts.base_test import BaseTestClass Loading Loading @@ -133,3 +137,51 @@ class GdBaseTestClass(BaseTestClass): def teardown_test(self): self.cert.rootservice.StopStack(facade_rootservice.StopStackRequest()) self.dut.rootservice.StopStack(facade_rootservice.StopStackRequest()) def __getattribute__(self, name): attr = super().__getattribute__(name) if not callable(attr) or not GdBaseTestClass.__is_entry_function(name): return attr @wraps(attr) def __wrapped(*args, **kwargs): try: return attr(*args, **kwargs) except RpcError as e: exception_info = "".join( traceback.format_exception(e.__class__, e, e.__traceback__)) raise signals.TestFailure( "RpcError during test\n\nRpcError:\n\n%s\n%s" % (exception_info, self.__dump_crashes())) return __wrapped __ENTRY_METHODS = { "setup_class", "teardown_class", "setup_test", "teardown_test" } @staticmethod def __is_entry_function(name): return name.startswith( "test_") or name in GdBaseTestClass.__ENTRY_METHODS def __dump_crashes(self): """ :return: formatted stack traces if found, or last few lines of log """ dut_crash, dut_log_tail = self.dut.get_crash_snippet_and_log_tail() cert_crash, cert_log_tail = self.cert.get_crash_snippet_and_log_tail() crash_detail = "" if dut_crash or cert_crash: if dut_crash: crash_detail += "dut stack crashed:\n\n%s\n\n" % dut_crash if cert_crash: crash_detail += "cert stack crashed:\n\n%s\n\n" % cert_crash else: if dut_log_tail: crash_detail += "dut log tail:\n\n%s\n\n" % dut_log_tail if cert_log_tail: crash_detail += "cert log tail:\n\n%s\n\n" % cert_log_tail return crash_detail
system/gd/cert/gd_device.py +44 −0 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ # limitations under the License. from abc import ABC from collections import deque import inspect import logging import os Loading @@ -24,6 +25,7 @@ import signal import socket import subprocess import time import re from typing import List import grpc Loading Loading @@ -247,6 +249,48 @@ class GdDeviceBase(ABC): self.security = security_facade_pb2_grpc.SecurityModuleFacadeStub( self.grpc_channel) # e.g. 2020-05-06 16:02:04.216 bt - packages/modules/Bluetooth/system/gd/facade/facade_main.cc:79 - crash_callback: #03 pc 0000000000013520 /lib/x86_64-linux-gnu/libpthread-2.29.so HOST_CRASH_LINE_REGEX = re.compile(r"^.* - crash_callback: (?P<line>.*)$") HOST_CRASH_HEADER = "Process crashed, signal: Aborted" def get_crash_snippet_and_log_tail(self): """ Get crash snippet if regex matched or last 20 lines of log :return: crash_snippet, log_tail_20 1) crash snippet without timestamp in one string; 2) last 20 lines of log in one string; """ if is_subprocess_alive(self.backing_process): return None, None gd_root_prefix = get_gd_root() + "/" abort_line = None last_20_lines = deque(maxlen=20) crash_log_lines = [] with open(self.backing_process_log_path) as f: for _, line in enumerate(f): m = self.HOST_CRASH_LINE_REGEX.match(line) if m: crash_line = m.group("line").replace(gd_root_prefix, "") if self.HOST_CRASH_HEADER in crash_line \ and len(last_20_lines) > 0: abort_line = last_20_lines[-1] crash_log_lines.append(crash_line) last_20_lines.append(line) log_tail_20 = "".join(last_20_lines) if len(crash_log_lines) == 0: return None, log_tail_20 crash_snippet = "" if abort_line is not None: crash_snippet += "abort log line:\n\n%s\n" % abort_line crash_snippet += "\n".join(crash_log_lines) return crash_snippet, log_tail_20 def teardown(self): """Tear down this device and clean up any resources. - Must be called after setup() Loading