Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e1dba4f7 authored by Jizheng Chu's avatar Jizheng Chu Committed by Automerger Merge Worker
Browse files

Merge test scripts and test lib scripts in Mobly GD Cert tests and deprecate...

Merge test scripts and test lib scripts in Mobly GD Cert tests and deprecate ACTS version am: 35cbde44

Original change: https://android-review.googlesource.com/c/platform/packages/modules/Bluetooth/+/1890001

Change-Id: I9f4701259821eed1b32dfade320b6ab93a283e62
parents 9b275ec0 35cbde44
Loading
Loading
Loading
Loading
+90 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import concurrent.futures
import logging
import re
import subprocess
from blueberry.tests.gd.cert.os_utils import TerminalColor
from contextlib import ExitStack


class AsyncSubprocessLogger:
    """
    An asynchronous logger for subprocesses.Popen object's STDOUT

    Contains threading functionality that allows asynchronous handling of lines
    from STDOUT from subprocess.Popen
    """
    WAIT_TIMEOUT_SECONDS = 10
    PROCESS_TAG_MIN_WIDTH = 24

    def __init__(self,
                 process: subprocess.Popen,
                 log_file_paths,
                 log_to_stdout=False,
                 tag=None,
                 color: TerminalColor = None):
        """
        :param process: a subprocess.Popen object with STDOUT
        :param log_file_paths: list of log files to redirect log to
        :param log_to_stdout: whether to dump logs to stdout in the format of
                              "[tag] logline"
        :param tag: tag to be used in above format
        :param color: when dumping to stdout, what color to use for tag
        """
        if not process:
            raise ValueError("process cannot be None")
        if not process.stdout:
            raise ValueError("process.stdout cannot be None")
        if log_to_stdout:
            if not tag or type(tag) is not str:
                raise ValueError("When logging to stdout, log tag must be set")
        self.log_file_paths = log_file_paths
        self.log_to_stdout = log_to_stdout
        self.tag = tag
        self.color = color
        self.process = process
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
        self.future = self.executor.submit(self.__logging_loop)

    def stop(self):
        """
        Stop this logger and this object can no longer be used after this call
        """
        try:
            result = self.future.result(timeout=self.WAIT_TIMEOUT_SECONDS)
            if result:
                logging.error("logging thread %s produced an error when executing: %s" % (self.tag, str(result)))
        except concurrent.futures.TimeoutError:
            logging.error("logging thread %s failed to finish after %d seconds" % (self.tag, self.WAIT_TIMEOUT_SECONDS))
        self.executor.shutdown(wait=False)

    def __logging_loop(self):
        if self.color:
            loggableTag = "[%s%s%s]" % (self.color, self.tag, TerminalColor.END)
        else:
            loggableTag = "[%s]" % self.tag
        tagLength = len(re.sub('[^\w\s]', '', loggableTag))
        if tagLength < self.PROCESS_TAG_MIN_WIDTH:
            loggableTag += " " * (self.PROCESS_TAG_MIN_WIDTH - tagLength)
        with ExitStack() as stack:
            log_files = [stack.enter_context(open(file_path, 'w')) for file_path in self.log_file_paths]
            for line in self.process.stdout:
                for log_file in log_files:
                    log_file.write(line)
                if self.log_to_stdout:
                    print("{}{}".format(loggableTag, line.strip()))
+173 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from mobly import signals
from threading import Condition

from blueberry.tests.gd.cert.event_stream import static_remaining_time_delta
from blueberry.tests.gd.cert.truth import assertThat


class IHasBehaviors(ABC):

    @abstractmethod
    def get_behaviors(self):
        pass


def anything():
    return lambda obj: True


def when(has_behaviors):
    assertThat(isinstance(has_behaviors, IHasBehaviors)).isTrue()
    return has_behaviors.get_behaviors()


def IGNORE_UNHANDLED(obj):
    pass


class SingleArgumentBehavior(object):

    def __init__(self, reply_stage_factory):
        self._reply_stage_factory = reply_stage_factory
        self._instances = []
        self._invoked_obj = []
        self._invoked_condition = Condition()
        self.set_default_to_crash()

    def begin(self, matcher):
        return PersistenceStage(self, matcher, self._reply_stage_factory)

    def append(self, behavior_instance):
        self._instances.append(behavior_instance)

    def set_default(self, fn):
        assertThat(fn).isNotNone()
        self._default_fn = fn

    def set_default_to_crash(self):
        self._default_fn = None

    def set_default_to_ignore(self):
        self._default_fn = IGNORE_UNHANDLED

    def run(self, obj):
        for instance in self._instances:
            if instance.try_run(obj):
                self.__obj_invoked(obj)
                return
        if self._default_fn is not None:
            # IGNORE_UNHANDLED is also a default fn
            self._default_fn(obj)
            self.__obj_invoked(obj)
        else:
            raise signals.TestFailure(
                "%s: behavior for %s went unhandled" % (self._reply_stage_factory().__class__.__name__, obj),
                extras=None)

    def __obj_invoked(self, obj):
        self._invoked_condition.acquire()
        self._invoked_obj.append(obj)
        self._invoked_condition.notify()
        self._invoked_condition.release()

    def wait_until_invoked(self, matcher, times, timeout):
        end_time = datetime.now() + timeout
        invoked_times = 0
        while datetime.now() < end_time and invoked_times < times:
            remaining = static_remaining_time_delta(end_time)
            invoked_times = sum((matcher(i) for i in self._invoked_obj))
            self._invoked_condition.acquire()
            self._invoked_condition.wait(remaining.total_seconds())
            self._invoked_condition.release()
        return invoked_times == times


class PersistenceStage(object):

    def __init__(self, behavior, matcher, reply_stage_factory):
        self._behavior = behavior
        self._matcher = matcher
        self._reply_stage_factory = reply_stage_factory

    def then(self, times=1):
        reply_stage = self._reply_stage_factory()
        reply_stage.init(self._behavior, self._matcher, times)
        return reply_stage

    def always(self):
        return self.then(times=-1)


class ReplyStage(object):

    def init(self, behavior, matcher, persistence):
        self._behavior = behavior
        self._matcher = matcher
        self._persistence = persistence

    def _commit(self, fn):
        self._behavior.append(BehaviorInstance(self._matcher, self._persistence, fn))


class BehaviorInstance(object):

    def __init__(self, matcher, persistence, fn):
        self._matcher = matcher
        self._persistence = persistence
        self._fn = fn
        self._called_count = 0

    def try_run(self, obj):
        if not self._matcher(obj):
            return False
        if self._persistence >= 0:
            if self._called_count >= self._persistence:
                return False
        self._called_count += 1
        self._fn(obj)
        return True


class BoundVerificationStage(object):

    def __init__(self, behavior, matcher, timeout):
        self._behavior = behavior
        self._matcher = matcher
        self._timeout = timeout

    def times(self, times=1):
        return self._behavior.wait_until_invoked(self._matcher, times, self._timeout)


class WaitForBehaviorSubject(object):

    def __init__(self, behaviors, timeout):
        self._behaviors = behaviors
        self._timeout = timeout

    def __getattr__(self, item):
        behavior = getattr(self._behaviors, item + "_behavior")
        t = self._timeout
        return lambda matcher: BoundVerificationStage(behavior, matcher, t)


def wait_until(i_has_behaviors, timeout=timedelta(seconds=3)):
    return WaitForBehaviorSubject(i_has_behaviors.get_behaviors(), timeout)
+21 −4
Original line number Diff line number Diff line
@@ -14,11 +14,28 @@
#   See the License for the specific language governing permissions and
#   limitations under the License.

import os
import sys

class Capture(object):
    """
    Wrap a match function and use in its place, to capture the value
    that matched. Specify an optional |capture_fn| to transform the
    captured value.
    """

class StackTestBase():
    def __init__(self, match_fn, capture_fn=None):
        self._match_fn = match_fn
        self._capture_fn = capture_fn
        self._value = None

    def test_test(self):
    def __call__(self, obj):
        if self._match_fn(obj) != True:
            return False

        if self._capture_fn is not None:
            self._value = self._capture_fn(obj)
        else:
            self._value = obj
        return True

    def get(self):
        return self._value
+187 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import hci_packets
from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode
from blueberry.tests.gd.cert.capture import Capture
from blueberry.tests.gd.cert.matchers import HciMatchers
from blueberry.tests.gd.cert.matchers import L2capMatchers
from blueberry.tests.gd.cert.matchers import SecurityMatchers
from security.facade_pb2 import UiMsgType


class HalCaptures(object):

    @staticmethod
    def ReadBdAddrCompleteCapture():
        return Capture(
            lambda packet: packet.payload[0:5] == b'\x0e\x0a\x01\x09\x10', lambda packet: hci_packets.ReadBdAddrCompleteView(
                hci_packets.CommandCompleteView(
                    hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload))))))

    @staticmethod
    def ConnectionRequestCapture():
        return Capture(
            lambda packet: packet.payload[0:2] == b'\x04\x0a', lambda packet: hci_packets.ConnectionRequestView(
                hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload)))))

    @staticmethod
    def ConnectionCompleteCapture():
        return Capture(
            lambda packet: packet.payload[0:3] == b'\x03\x0b\x00', lambda packet: hci_packets.ConnectionCompleteView(
                hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload)))))

    @staticmethod
    def DisconnectionCompleteCapture():
        return Capture(
            lambda packet: packet.payload[0:2] == b'\x05\x04', lambda packet: hci_packets.DisconnectionCompleteView(
                hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload)))))

    @staticmethod
    def LeConnectionCompleteCapture():
        return Capture(
            lambda packet: packet.payload[0] == 0x3e and (packet.payload[2] == 0x01 or packet.payload[2] == 0x0a),
            lambda packet: hci_packets.LeConnectionCompleteView(
                hci_packets.LeMetaEventView(
                    hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet.payload))))))


class HciCaptures(object):

    @staticmethod
    def ReadLocalOobDataCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_DATA),
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_DATA)
        )

    @staticmethod
    def ReadLocalOobExtendedDataCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA),
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA)
        )

    @staticmethod
    def ReadBdAddrCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_BD_ADDR),
            lambda packet: hci_packets.ReadBdAddrCompleteView(HciMatchers.ExtractMatchingCommandComplete(packet.payload, hci_packets.OpCode.READ_BD_ADDR)))

    @staticmethod
    def ConnectionRequestCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_REQUEST),
            lambda packet: hci_packets.ConnectionRequestView(
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_REQUEST)))

    @staticmethod
    def ConnectionCompleteCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_COMPLETE),
            lambda packet: hci_packets.ConnectionCompleteView(
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.CONNECTION_COMPLETE)))

    @staticmethod
    def DisconnectionCompleteCapture():
        return Capture(
            HciMatchers.EventWithCode(hci_packets.EventCode.DISCONNECTION_COMPLETE),
            lambda packet: hci_packets.DisconnectionCompleteView(
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.DISCONNECTION_COMPLETE)))

    @staticmethod
    def LeConnectionCompleteCapture():
        return Capture(HciMatchers.LeConnectionComplete(),
                       lambda packet: HciMatchers.ExtractLeConnectionComplete(packet.payload))

    @staticmethod
    def SimplePairingCompleteCapture():
        return Capture(HciMatchers.EventWithCode(hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE),
            lambda packet: hci_packets.SimplePairingCompleteView(
                HciMatchers.ExtractEventWithCode(packet.payload, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE)))


class L2capCaptures(object):

    @staticmethod
    def ConnectionRequest(psm):
        return Capture(L2capMatchers.ConnectionRequest(psm), L2capCaptures._extract_connection_request)

    @staticmethod
    def _extract_connection_request(packet):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST)
        return l2cap_packets.ConnectionRequestView(frame)

    @staticmethod
    def ConnectionResponse(scid):
        return Capture(L2capMatchers.ConnectionResponse(scid), L2capCaptures._extract_connection_response)

    @staticmethod
    def _extract_connection_response(packet):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_RESPONSE)
        return l2cap_packets.ConnectionResponseView(frame)

    @staticmethod
    def ConfigurationRequest(cid=None):
        return Capture(L2capMatchers.ConfigurationRequest(cid), L2capCaptures._extract_configuration_request)

    @staticmethod
    def _extract_configuration_request(packet):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
        return l2cap_packets.ConfigurationRequestView(frame)

    @staticmethod
    def CreditBasedConnectionRequest(psm):
        return Capture(
            L2capMatchers.CreditBasedConnectionRequest(psm), L2capCaptures._extract_credit_based_connection_request)

    @staticmethod
    def _extract_credit_based_connection_request(packet):
        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
        return l2cap_packets.LeCreditBasedConnectionRequestView(frame)

    @staticmethod
    def CreditBasedConnectionResponse():
        return Capture(L2capMatchers.CreditBasedConnectionResponse(),
                       L2capCaptures._extract_credit_based_connection_response)

    @staticmethod
    def _extract_credit_based_connection_response(packet):
        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE)
        return l2cap_packets.LeCreditBasedConnectionResponseView(frame)

    @staticmethod
    def LinkSecurityInterfaceCallbackEvent(type):
        return Capture(L2capMatchers.LinkSecurityInterfaceCallbackEvent(type), L2capCaptures._extract_address)

    @staticmethod
    def _extract_address(packet):
        return packet.address


class SecurityCaptures(object):

    @staticmethod
    def DisplayPasskey():
        return Capture(SecurityMatchers.UiMsg(UiMsgType.DISPLAY_PASSKEY), SecurityCaptures._extract_passkey)

    @staticmethod
    def _extract_passkey(event):
        if event is None:
            return None
        return event.numeric_value
+442 −49

File changed.

Preview size limit exceeded, changes collapsed.

Loading