Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5202e3c8 authored by Henri Chataing's avatar Henri Chataing
Browse files

Remove system/blueberry/tests/gd

Bug: 384782957
Test: TreeHugger
Flag: EXEMPT, dead code removal
Change-Id: I3f78081b9668b341bc69812757787df1b74c916b
parent 8c74fcfb
Loading
Loading
Loading
Loading
+0 −175
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2016 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import encodings
import logging
import shlex
import shutil

from mobly.controllers.android_device_lib.adb import AdbError
from mobly.controllers.android_device_lib.adb import AdbProxy

ROOT_USER_ID = '0'
SHELL_USER_ID = '2000'
UTF_8 = encodings.utf_8.getregentry().name


class BlueberryAdbProxy(AdbProxy):
    """Proxy class for ADB.

    For syntactic reasons, the '-' in adb commands need to be replaced with
    '_'. Can directly execute adb commands on an object:
    >> adb = BlueberryAdbProxy(<serial>)
    >> adb.start_server()
    >> adb.devices() # will return the console output of "adb devices".
    """

    def __init__(self, serial="", ssh_connection=None):
        """Construct an instance of AdbProxy.

        Args:
            serial: str serial number of Android device from `adb devices`
            ssh_connection: SshConnection instance if the Android device is
                            connected to a remote host that we can reach via SSH.
        """
        super().__init__(serial)
        self._server_local_port = None
        adb_path = shutil.which('adb')
        adb_cmd = [shlex.quote(adb_path)]
        if serial:
            adb_cmd.append("-s %s" % serial)
        if ssh_connection is not None:
            # Kill all existing adb processes on the remote host (if any)
            # Note that if there are none, then pkill exits with non-zero status
            ssh_connection.run("pkill adb", ignore_status=True)
            # Copy over the adb binary to a temp dir
            temp_dir = ssh_connection.run("mktemp -d").stdout.strip()
            ssh_connection.send_file(adb_path, temp_dir)
            # Start up a new adb server running as root from the copied binary.
            remote_adb_cmd = "%s/adb %s root" % (temp_dir, "-s %s" % serial if serial else "")
            ssh_connection.run(remote_adb_cmd)
            # Proxy a local port to the adb server port
            local_port = ssh_connection.create_ssh_tunnel(5037)
            self._server_local_port = local_port

        if self._server_local_port:
            adb_cmd.append("-P %d" % local_port)
        self.adb_str = " ".join(adb_cmd)
        self._ssh_connection = ssh_connection

    def get_user_id(self):
        """Returns the adb user. Either 2000 (shell) or 0 (root)."""
        return self.shell('id -u').decode(UTF_8).rstrip()

    def is_root(self, user_id=None):
        """Checks if the user is root.

        Args:
            user_id: if supplied, the id to check against.
        Returns:
            True if the user is root. False otherwise.
        """
        if not user_id:
            user_id = self.get_user_id()
        return user_id == ROOT_USER_ID

    def ensure_root(self):
        """Ensures the user is root after making this call.

        Note that this will still fail if the device is a user build, as root
        is not accessible from a user build.

        Returns:
            False if the device is a user build. True otherwise.
        """
        self.ensure_user(ROOT_USER_ID)
        return self.is_root()

    def ensure_user(self, user_id=SHELL_USER_ID):
        """Ensures the user is set to the given user.

        Args:
            user_id: The id of the user.
        """
        if self.is_root(user_id):
            self.root()
        else:
            self.unroot()
        self.wait_for_device()
        return self.get_user_id() == user_id

    def tcp_forward(self, host_port, device_port):
        """Starts tcp forwarding from localhost to this android device.

        Args:
            host_port: Port number to use on localhost
            device_port: Port number to use on the android device.

        Returns:
            Forwarded port on host as int or command output string on error
        """
        if self._ssh_connection:
            # We have to hop through a remote host first.
            #  1) Find some free port on the remote host's localhost
            #  2) Setup forwarding between that remote port and the requested
            #     device port
            remote_port = self._ssh_connection.find_free_port()
            host_port = self._ssh_connection.create_ssh_tunnel(remote_port, local_port=host_port)
        try:
            output = self.forward(["tcp:%d" % host_port, "tcp:%d" % device_port])
        except AdbError as error:
            return error
        # If hinted_port is 0, the output will be the selected port.
        # Otherwise, there will be no output upon successfully
        # forwarding the hinted port.
        if not output:
            return host_port
        try:
            output_int = int(output)
        except ValueError:
            return output
        return output_int

    def remove_tcp_forward(self, host_port):
        """Stop tcp forwarding a port from localhost to this android device.

        Args:
            host_port: Port number to use on localhost
        """
        if self._ssh_connection:
            remote_port = self._ssh_connection.close_ssh_tunnel(host_port)
            if remote_port is None:
                logging.warning("Cannot close unknown forwarded tcp port: %d", host_port)
                return
            # The actual port we need to disable via adb is on the remote host.
            host_port = remote_port
        self.forward(["--remove", "tcp:%d" % host_port])

    def path_exists(self, path):
        """Check if a file path exists on an Android device

        :param path: file path, could be a directory
        :return: True if file path exists
        """
        try:
            ret = self.shell("ls {}".format(path))
            if ret is not None and len(ret) > 0:
                return True
            else:
                return False
        except AdbError as e:
            logging.debug("path {} does not exist, error={}".format(path, e))
            return False
+0 −57
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2018 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from mobly.asserts import *


# Have an instance of unittest.TestCase so we could reuse some logic from
# python's own unittest.
# _ProxyTest is required because py2 does not allow instantiating
# unittest.TestCase directly.
class _ProxyTest(unittest.TestCase):

    def runTest(self):
        pass


_pyunit_proxy = _ProxyTest()


def assert_almost_equal(first, second, places=7, msg=None, delta=None, extras=None):
    """
    Assert FIRST to be within +/- DELTA to SECOND, otherwise fail the
    test.
    :param first: The first argument, LHS
    :param second: The second argument, RHS
    :param places: For floating points, how many decimal places to look into
    :param msg: Message to display on failure
    :param delta: The +/- first and second could be apart from each other
    :param extras: Extra object passed to test failure handler
    :return:
    """
    my_msg = None
    try:
        if delta:
            _pyunit_proxy.assertAlmostEqual(first, second, msg=msg, delta=delta)
        else:
            _pyunit_proxy.assertAlmostEqual(first, second, places=places, msg=msg)
    except Exception as e:
        my_msg = str(e)
        if msg:
            my_msg = "%s %s" % (my_msg, msg)
    # This is a hack to remove the stacktrace produced by the above exception.
    if my_msg is not None:
        fail(my_msg, extras=extras)
+0 −90
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import concurrent.futures
import logging
import re
import subprocess
from blueberry.tests.gd.cert.os_utils import TerminalColor
from contextlib import ExitStack


class AsyncSubprocessLogger:
    """
    An asynchronous logger for subprocesses.Popen object's STDOUT

    Contains threading functionality that allows asynchronous handling of lines
    from STDOUT from subprocess.Popen
    """
    WAIT_TIMEOUT_SECONDS = 10
    PROCESS_TAG_MIN_WIDTH = 24

    def __init__(self,
                 process: subprocess.Popen,
                 log_file_paths,
                 log_to_stdout=False,
                 tag=None,
                 color: TerminalColor = None):
        """
        :param process: a subprocess.Popen object with STDOUT
        :param log_file_paths: list of log files to redirect log to
        :param log_to_stdout: whether to dump logs to stdout in the format of
                              "[tag] logline"
        :param tag: tag to be used in above format
        :param color: when dumping to stdout, what color to use for tag
        """
        if not process:
            raise ValueError("process cannot be None")
        if not process.stdout:
            raise ValueError("process.stdout cannot be None")
        if log_to_stdout:
            if not tag or type(tag) is not str:
                raise ValueError("When logging to stdout, log tag must be set")
        self.log_file_paths = log_file_paths
        self.log_to_stdout = log_to_stdout
        self.tag = tag
        self.color = color
        self.process = process
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
        self.future = self.executor.submit(self.__logging_loop)

    def stop(self):
        """
        Stop this logger and this object can no longer be used after this call
        """
        try:
            result = self.future.result(timeout=self.WAIT_TIMEOUT_SECONDS)
            if result:
                logging.error("logging thread %s produced an error when executing: %s" % (self.tag, str(result)))
        except concurrent.futures.TimeoutError:
            logging.error("logging thread %s failed to finish after %d seconds" % (self.tag, self.WAIT_TIMEOUT_SECONDS))
        self.executor.shutdown(wait=False)

    def __logging_loop(self):
        if self.color:
            loggableTag = "[%s%s%s]" % (self.color, self.tag, TerminalColor.END)
        else:
            loggableTag = "[%s]" % self.tag
        tagLength = len(re.sub('[^\w\s]', '', loggableTag))
        if tagLength < self.PROCESS_TAG_MIN_WIDTH:
            loggableTag += " " * (self.PROCESS_TAG_MIN_WIDTH - tagLength)
        with ExitStack() as stack:
            log_files = [stack.enter_context(open(file_path, 'w')) for file_path in self.log_file_paths]
            for line in self.process.stdout:
                for log_file in log_files:
                    log_file.write(line)
                if self.log_to_stdout:
                    print("{}{}".format(loggableTag, line.strip()))
+0 −173
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from mobly import signals
from threading import Condition

from blueberry.tests.gd.cert.event_stream import static_remaining_time_delta
from blueberry.tests.gd.cert.truth import assertThat


class IHasBehaviors(ABC):

    @abstractmethod
    def get_behaviors(self):
        pass


def anything():
    return lambda obj: True


def when(has_behaviors):
    assertThat(isinstance(has_behaviors, IHasBehaviors)).isTrue()
    return has_behaviors.get_behaviors()


def IGNORE_UNHANDLED(obj):
    pass


class SingleArgumentBehavior(object):

    def __init__(self, reply_stage_factory):
        self._reply_stage_factory = reply_stage_factory
        self._instances = []
        self._invoked_obj = []
        self._invoked_condition = Condition()
        self.set_default_to_crash()

    def begin(self, matcher):
        return PersistenceStage(self, matcher, self._reply_stage_factory)

    def append(self, behavior_instance):
        self._instances.append(behavior_instance)

    def set_default(self, fn):
        assertThat(fn).isNotNone()
        self._default_fn = fn

    def set_default_to_crash(self):
        self._default_fn = None

    def set_default_to_ignore(self):
        self._default_fn = IGNORE_UNHANDLED

    def run(self, obj):
        for instance in self._instances:
            if instance.try_run(obj):
                self.__obj_invoked(obj)
                return
        if self._default_fn is not None:
            # IGNORE_UNHANDLED is also a default fn
            self._default_fn(obj)
            self.__obj_invoked(obj)
        else:
            raise signals.TestFailure(
                "%s: behavior for %s went unhandled" % (self._reply_stage_factory().__class__.__name__, obj),
                extras=None)

    def __obj_invoked(self, obj):
        self._invoked_condition.acquire()
        self._invoked_obj.append(obj)
        self._invoked_condition.notify()
        self._invoked_condition.release()

    def wait_until_invoked(self, matcher, times, timeout):
        end_time = datetime.now() + timeout
        invoked_times = 0
        while datetime.now() < end_time and invoked_times < times:
            remaining = static_remaining_time_delta(end_time)
            invoked_times = sum((matcher(i) for i in self._invoked_obj))
            self._invoked_condition.acquire()
            self._invoked_condition.wait(remaining.total_seconds())
            self._invoked_condition.release()
        return invoked_times == times


class PersistenceStage(object):

    def __init__(self, behavior, matcher, reply_stage_factory):
        self._behavior = behavior
        self._matcher = matcher
        self._reply_stage_factory = reply_stage_factory

    def then(self, times=1):
        reply_stage = self._reply_stage_factory()
        reply_stage.init(self._behavior, self._matcher, times)
        return reply_stage

    def always(self):
        return self.then(times=-1)


class ReplyStage(object):

    def init(self, behavior, matcher, persistence):
        self._behavior = behavior
        self._matcher = matcher
        self._persistence = persistence

    def _commit(self, fn):
        self._behavior.append(BehaviorInstance(self._matcher, self._persistence, fn))


class BehaviorInstance(object):

    def __init__(self, matcher, persistence, fn):
        self._matcher = matcher
        self._persistence = persistence
        self._fn = fn
        self._called_count = 0

    def try_run(self, obj):
        if not self._matcher(obj):
            return False
        if self._persistence >= 0:
            if self._called_count >= self._persistence:
                return False
        self._called_count += 1
        self._fn(obj)
        return True


class BoundVerificationStage(object):

    def __init__(self, behavior, matcher, timeout):
        self._behavior = behavior
        self._matcher = matcher
        self._timeout = timeout

    def times(self, times=1):
        return self._behavior.wait_until_invoked(self._matcher, times, self._timeout)


class WaitForBehaviorSubject(object):

    def __init__(self, behaviors, timeout):
        self._behaviors = behaviors
        self._timeout = timeout

    def __getattr__(self, item):
        behavior = getattr(self._behaviors, item + "_behavior")
        t = self._timeout
        return lambda matcher: BoundVerificationStage(behavior, matcher, t)


def wait_until(i_has_behaviors, timeout=timedelta(seconds=3)):
    return WaitForBehaviorSubject(i_has_behaviors.get_behaviors(), timeout)
+0 −41
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.


class Capture(object):
    """
    Wrap a match function and use in its place, to capture the value
    that matched. Specify an optional |capture_fn| to transform the
    captured value.
    """

    def __init__(self, match_fn, capture_fn=None):
        self._match_fn = match_fn
        self._capture_fn = capture_fn
        self._value = None

    def __call__(self, obj):
        if self._match_fn(obj) != True:
            return False

        if self._capture_fn is not None:
            self._value = self._capture_fn(obj)
        else:
            self._value = obj
        return True

    def get(self):
        return self._value
Loading