Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5d4e906c authored by TreeHugger Robot's avatar TreeHugger Robot Committed by Android (Google) Code Review
Browse files

Merge "startop: Rewrite app startup runner to use new python run_app_with_prefetch."

parents 692527d0 7af0155c
Loading
Loading
Loading
Loading
+166 −481

File changed.

Preview size limit exceeded, changes collapsed.

+26 −169
Original line number Diff line number Diff line
@@ -31,18 +31,17 @@ Usage:
See also https://docs.pytest.org/en/latest/usage.html
"""

# global imports
from contextlib import contextmanager
import io
import shlex
import sys
import typing

# pip imports
import pytest
# global imports
from contextlib import contextmanager

# local imports
import app_startup_runner as asr
# pip imports
import pytest

#
# Argument Parsing Helpers
@@ -91,7 +90,8 @@ def default_dict_for_parsed_args(**kwargs):
  """
  # Combine it with all of the "optional" parameters' default values.
  """
  d = {'compiler_filters': None, 'simulate': False, 'debug': False, 'output': None, 'timeout': None, 'loop_count': 1, 'inodes': None}
  d = {'compiler_filters': None, 'simulate': False, 'debug': False,
       'output': None, 'timeout': 10, 'loop_count': 1, 'inodes': None}
  d.update(kwargs)
  return d

@@ -124,15 +124,22 @@ def test_argparse():
  # required arguments are parsed correctly
  ad = default_dict_for_parsed_args  # assert dict

  assert parse_args("--package xyz --readahead warm") == ad(packages=['xyz'], readaheads=['warm'])
  assert parse_args("-p xyz -r warm") == ad(packages=['xyz'], readaheads=['warm'])
  assert parse_args("--package xyz --readahead warm") == ad(packages=['xyz'],
                                                            readaheads=['warm'])
  assert parse_args("-p xyz -r warm") == ad(packages=['xyz'],
                                            readaheads=['warm'])

  assert parse_args("-p xyz -r warm -s") == ad(packages=['xyz'], readaheads=['warm'], simulate=True)
  assert parse_args("-p xyz -r warm --simulate") == ad(packages=['xyz'], readaheads=['warm'], simulate=True)
  assert parse_args("-p xyz -r warm -s") == ad(packages=['xyz'],
                                               readaheads=['warm'],
                                               simulate=True)
  assert parse_args("-p xyz -r warm --simulate") == ad(packages=['xyz'],
                                                       readaheads=['warm'],
                                                       simulate=True)

  # optional arguments are parsed correctly.
  mad = default_mock_dict_for_parsed_args  # mock assert dict
  assert parse_optional_args("--output filename.csv") == mad(output='filename.csv')
  assert parse_optional_args("--output filename.csv") == mad(
    output='filename.csv')
  assert parse_optional_args("-o filename.csv") == mad(output='filename.csv')

  assert parse_optional_args("--timeout 123") == mad(timeout=123)
@@ -145,36 +152,6 @@ def test_argparse():
  assert parse_optional_args("-in baz") == mad(inodes="baz")


def generate_run_combinations(*args):
  # expand out the generator values so that assert x == y works properly.
  return [i for i in asr.generate_run_combinations(*args)]

def test_generate_run_combinations():
  blank_nd = typing.NamedTuple('Blank')
  assert generate_run_combinations(blank_nd, {}) == [()], "empty"
  assert generate_run_combinations(blank_nd, {'a' : ['a1', 'a2']}) == [()], "empty filter"
  a_nd = typing.NamedTuple('A', [('a', str)])
  assert generate_run_combinations(a_nd, {'a': None}) == [(None,)], "None"
  assert generate_run_combinations(a_nd, {'a': ['a1', 'a2']}) == [('a1',), ('a2',)], "one item"
  assert generate_run_combinations(a_nd,
                                   {'a' : ['a1', 'a2'], 'b': ['b1', 'b2']}) == [('a1',), ('a2',)],\
      "one item filter"
  ab_nd = typing.NamedTuple('AB', [('a', str), ('b', str)])
  assert generate_run_combinations(ab_nd,
                                   {'a': ['a1', 'a2'],
                                    'b': ['b1', 'b2']}) == [ab_nd('a1', 'b1'),
                                                            ab_nd('a1', 'b2'),
                                                            ab_nd('a2', 'b1'),
                                                            ab_nd('a2', 'b2')],\
      "two items"

  assert generate_run_combinations(ab_nd,
                                   {'as': ['a1', 'a2'],
                                    'bs': ['b1', 'b2']}) == [ab_nd('a1', 'b1'),
                                                             ab_nd('a1', 'b2'),
                                                             ab_nd('a2', 'b1'),
                                                             ab_nd('a2', 'b2')],\
      "two items plural"

def test_key_to_cmdline_flag():
  assert asr.key_to_cmdline_flag("abc") == "--abc"
@@ -182,138 +159,18 @@ def test_key_to_cmdline_flag():
  assert asr.key_to_cmdline_flag("ba_r") == "--ba-r"
  assert asr.key_to_cmdline_flag("ba_zs") == "--ba-z"


def test_make_script_command_with_temp_output():
  cmd_str, tmp_file = asr.make_script_command_with_temp_output("fake_script", args=[], count=1)
  cmd_str, tmp_file = asr.make_script_command_with_temp_output("fake_script",
                                                               args=[], count=1)
  with tmp_file:
    assert cmd_str == ["fake_script", "--count", "1", "--output", tmp_file.name]

  cmd_str, tmp_file = asr.make_script_command_with_temp_output("fake_script", args=['a', 'b'], count=2)
  cmd_str, tmp_file = asr.make_script_command_with_temp_output("fake_script",
                                                               args=['a', 'b'],
                                                               count=2)
  with tmp_file:
    assert cmd_str == ["fake_script", "a", "b", "--count", "2", "--output", tmp_file.name]

def test_parse_run_script_csv_file_flat():
  # empty file -> empty list
  f = io.StringIO("")
  assert asr.parse_run_script_csv_file_flat(f) == []

  # common case
  f = io.StringIO("1,2,3")
  assert asr.parse_run_script_csv_file_flat(f) == [1,2,3]

  # ignore trailing comma
  f = io.StringIO("1,2,3,4,5,")
  assert asr.parse_run_script_csv_file_flat(f) == [1,2,3,4,5]

def test_data_frame():
  # trivial empty data frame
  df = asr.DataFrame()
  assert df.headers == []
  assert df.data_table == []
  assert df.data_table_transposed == []

  # common case, same number of values in each place.
  df = asr.DataFrame({'TotalTime_ms':[1,2,3], 'Displayed_ms':[4,5,6]})
  assert df.headers == ['TotalTime_ms', 'Displayed_ms']
  assert df.data_table == [[1, 4], [2, 5], [3, 6]]
  assert df.data_table_transposed == [(1, 2, 3), (4, 5, 6)]

  # varying num values.
  df = asr.DataFrame({'many':[1,2], 'none': []})
  assert df.headers == ['many', 'none']
  assert df.data_table == [[1, None], [2, None]]
  assert df.data_table_transposed == [(1, 2), (None, None)]

  df = asr.DataFrame({'many':[], 'none': [1,2]})
  assert df.headers == ['many', 'none']
  assert df.data_table == [[None, 1], [None, 2]]
  assert df.data_table_transposed == [(None, None), (1, 2)]

  # merge multiple data frames
  df = asr.DataFrame()
  df.concat_rows(asr.DataFrame())
  assert df.headers == []
  assert df.data_table == []
  assert df.data_table_transposed == []

  df = asr.DataFrame()
  df2 = asr.DataFrame({'TotalTime_ms':[1,2,3], 'Displayed_ms':[4,5,6]})

  df.concat_rows(df2)
  assert df.headers == ['TotalTime_ms', 'Displayed_ms']
  assert df.data_table == [[1, 4], [2, 5], [3, 6]]
  assert df.data_table_transposed == [(1, 2, 3), (4, 5, 6)]

  df = asr.DataFrame({'TotalTime_ms':[1,2]})
  df2 = asr.DataFrame({'Displayed_ms':[4,5]})

  df.concat_rows(df2)
  assert df.headers == ['TotalTime_ms', 'Displayed_ms']
  assert df.data_table == [[1, None], [2, None], [None, 4], [None, 5]]

  df = asr.DataFrame({'TotalTime_ms':[1,2]})
  df2 = asr.DataFrame({'TotalTime_ms': [3, 4], 'Displayed_ms':[5, 6]})

  df.concat_rows(df2)
  assert df.headers == ['TotalTime_ms', 'Displayed_ms']
  assert df.data_table == [[1, None], [2, None], [3, 5], [4, 6]]

  # data_row_at
  df = asr.DataFrame({'TotalTime_ms':[1,2,3], 'Displayed_ms':[4,5,6]})
  assert df.data_row_at(-1) == [3,6]
  assert df.data_row_at(2) == [3,6]
  assert df.data_row_at(1) == [2,5]

  # repeat
  df = asr.DataFrame({'TotalTime_ms':[1], 'Displayed_ms':[4]})
  df2 = asr.DataFrame({'TotalTime_ms':[1,1,1], 'Displayed_ms':[4,4,4]})
  assert df.repeat(3) == df2

  # repeat
  df = asr.DataFrame({'TotalTime_ms':[1,1,1], 'Displayed_ms':[4,4,4]})
  assert df.data_row_len == 3
  df = asr.DataFrame({'TotalTime_ms':[1,1]})
  assert df.data_row_len == 2

  # repeat
  df = asr.DataFrame({'TotalTime_ms':[1,1,1], 'Displayed_ms':[4,4,4]})
  assert df.data_row_len == 3
  df = asr.DataFrame({'TotalTime_ms':[1,1]})
  assert df.data_row_len == 2

  # data_row_reduce
  df = asr.DataFrame({'TotalTime_ms':[1,1,1], 'Displayed_ms':[4,4,4]})
  df_sum = asr.DataFrame({'TotalTime_ms':[3], 'Displayed_ms':[12]})
  assert df.data_row_reduce(sum) == df_sum

  # merge_data_columns
  df = asr.DataFrame({'TotalTime_ms':[1,2,3]})
  df2 = asr.DataFrame({'Displayed_ms':[3,4,5,6]})

  df.merge_data_columns(df2)
  assert df == asr.DataFrame({'TotalTime_ms':[1,2,3], 'Displayed_ms':[3,4,5,6]})

  df = asr.DataFrame({'TotalTime_ms':[1,2,3]})
  df2 = asr.DataFrame({'Displayed_ms':[3,4]})

  df.merge_data_columns(df2)
  assert df == asr.DataFrame({'TotalTime_ms':[1,2,3], 'Displayed_ms':[3,4]})

  df = asr.DataFrame({'TotalTime_ms':[1,2,3]})
  df2 = asr.DataFrame({'TotalTime_ms':[10,11]})

  df.merge_data_columns(df2)
  assert df == asr.DataFrame({'TotalTime_ms':[10,11,3]})

  df = asr.DataFrame({'TotalTime_ms':[]})
  df2 = asr.DataFrame({'TotalTime_ms':[10,11]})

  df.merge_data_columns(df2)
  assert df == asr.DataFrame({'TotalTime_ms':[10,11]})




    assert cmd_str == ["fake_script", "a", "b", "--count", "2", "--output",
                       tmp_file.name]

def test_parse_run_script_csv_file():
  # empty file -> empty list
+77 −0
Original line number Diff line number Diff line
import itertools
import os
import sys
from typing import Any, Callable, Dict, Iterable, List, NamedTuple, Tuple, \
    TypeVar, Optional

# local import
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(
    os.path.abspath(__file__)))))
import lib.print_utils as print_utils

T = TypeVar('T')
NamedTupleMeta = Callable[
    ..., T]  # approximation of a (S : NamedTuple<T> where S() == T) metatype.
FilterFuncType = Callable[[NamedTuple], bool]

def dict_lookup_any_key(dictionary: dict, *keys: List[Any]):
  for k in keys:
    if k in dictionary:
      return dictionary[k]


  print_utils.debug_print("None of the keys {} were in the dictionary".format(
      keys))
  return [None]

def generate_run_combinations(named_tuple: NamedTupleMeta[T],
                              opts_dict: Dict[str, List[Optional[object]]],
                              loop_count: int = 1) -> Iterable[T]:
  """
  Create all possible combinations given the values in opts_dict[named_tuple._fields].

  :type T: type annotation for the named_tuple type.
  :param named_tuple: named tuple type, whose fields are used to make combinations for
  :param opts_dict: dictionary of keys to value list. keys correspond to the named_tuple fields.
  :param loop_count: number of repetitions.
  :return: an iterable over named_tuple instances.
  """
  combinations_list = []
  for k in named_tuple._fields:
    # the key can be either singular or plural , e.g. 'package' or 'packages'
    val = dict_lookup_any_key(opts_dict, k, k + "s")

    # treat {'x': None} key value pairs as if it was [None]
    # otherwise itertools.product throws an exception about not being able to iterate None.
    combinations_list.append(val or [None])

  print_utils.debug_print("opts_dict: ", opts_dict)
  print_utils.debug_print_nd("named_tuple: ", named_tuple)
  print_utils.debug_print("combinations_list: ", combinations_list)

  for i in range(loop_count):
    for combo in itertools.product(*combinations_list):
      yield named_tuple(*combo)

def filter_run_combinations(named_tuple: NamedTuple,
                            filters: List[FilterFuncType]) -> bool:
  for filter in filters:
    if filter(named_tuple):
      return False
  return True

def generate_group_run_combinations(run_combinations: Iterable[NamedTuple],
                                    dst_nt: NamedTupleMeta[T]) \
    -> Iterable[Tuple[T, Iterable[NamedTuple]]]:
  def group_by_keys(src_nt):
    src_d = src_nt._asdict()
    # now remove the keys that aren't legal in dst.
    for illegal_key in set(src_d.keys()) - set(dst_nt._fields):
      if illegal_key in src_d:
        del src_d[illegal_key]

    return dst_nt(**src_d)

  for args_list_it in itertools.groupby(run_combinations, group_by_keys):
    (group_key_value, args_it) = args_list_it
    yield (group_key_value, args_it)
+58 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
# Copyright 2018, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Unit tests for the args_utils.py script."""

import typing

import args_utils

def generate_run_combinations(*args):
  # expand out the generator values so that assert x == y works properly.
  return [i for i in args_utils.generate_run_combinations(*args)]

def test_generate_run_combinations():
  blank_nd = typing.NamedTuple('Blank')
  assert generate_run_combinations(blank_nd, {}, 1) == [()], "empty"
  assert generate_run_combinations(blank_nd, {'a': ['a1', 'a2']}) == [
    ()], "empty filter"
  a_nd = typing.NamedTuple('A', [('a', str)])
  assert generate_run_combinations(a_nd, {'a': None}) == [(None,)], "None"
  assert generate_run_combinations(a_nd, {'a': ['a1', 'a2']}) == [('a1',), (
    'a2',)], "one item"
  assert generate_run_combinations(a_nd,
                                   {'a': ['a1', 'a2'], 'b': ['b1', 'b2']}) == [
           ('a1',), ('a2',)], \
    "one item filter"
  assert generate_run_combinations(a_nd, {'a': ['a1', 'a2']}, 2) == [('a1',), (
    'a2',), ('a1',), ('a2',)], "one item"
  ab_nd = typing.NamedTuple('AB', [('a', str), ('b', str)])
  assert generate_run_combinations(ab_nd,
                                   {'a': ['a1', 'a2'],
                                    'b': ['b1', 'b2']}) == [ab_nd('a1', 'b1'),
                                                            ab_nd('a1', 'b2'),
                                                            ab_nd('a2', 'b1'),
                                                            ab_nd('a2', 'b2')], \
    "two items"

  assert generate_run_combinations(ab_nd,
                                   {'as': ['a1', 'a2'],
                                    'bs': ['b1', 'b2']}) == [ab_nd('a1', 'b1'),
                                                             ab_nd('a1', 'b2'),
                                                             ab_nd('a2', 'b1'),
                                                             ab_nd('a2', 'b2')], \
    "two items plural"
+201 −0
Original line number Diff line number Diff line
import itertools
from typing import Dict, List

class DataFrame:
  """Table-like class for storing a 2D cells table with named columns."""
  def __init__(self, data: Dict[str, List[object]] = {}):
    """
    Create a new DataFrame from a dictionary (keys = headers,
    values = columns).
    """
    self._headers = [i for i in data.keys()]
    self._rows = []

    row_num = 0

    def get_data_row(idx):
      r = {}
      for header, header_data in data.items():

        if not len(header_data) > idx:
          continue

        r[header] = header_data[idx]

      return r

    while True:
      row_dict = get_data_row(row_num)
      if len(row_dict) == 0:
        break

      self._append_row(row_dict.keys(), row_dict.values())
      row_num = row_num + 1

  def concat_rows(self, other: 'DataFrame') -> None:
    """
    In-place concatenate rows of other into the rows of the
    current DataFrame.

    None is added in pre-existing cells if new headers
    are introduced.
    """
    other_datas = other._data_only()

    other_headers = other.headers

    for d in other_datas:
      self._append_row(other_headers, d)

  def _append_row(self, headers: List[str], data: List[object]):
    new_row = {k:v for k,v in zip(headers, data)}
    self._rows.append(new_row)

    for header in headers:
      if not header in self._headers:
        self._headers.append(header)

  def __repr__(self):
#     return repr(self._rows)
    repr = ""

    header_list = self._headers_only()

    row_format = u""
    for header in header_list:
      row_format = row_format + u"{:>%d}" %(len(header) + 1)

    repr = row_format.format(*header_list) + "\n"

    for v in self._data_only():
      repr = repr + row_format.format(*v) + "\n"

    return repr

  def __eq__(self, other):
    if isinstance(other, self.__class__):
      return self.headers == other.headers and self.data_table == other.data_table
    else:
      print("wrong instance", other.__class__)
      return False

  @property
  def headers(self) -> List[str]:
    return [i for i in self._headers_only()]

  @property
  def data_table(self) -> List[List[object]]:
    return list(self._data_only())

  @property
  def data_table_transposed(self) -> List[List[object]]:
    return list(self._transposed_data())

  @property
  def data_row_len(self) -> int:
    return len(self._rows)

  def data_row_at(self, idx) -> List[object]:
    """
    Return a single data row at the specified index (0th based).

    Accepts negative indices, e.g. -1 is last row.
    """
    row_dict = self._rows[idx]
    l = []

    for h in self._headers_only():
      l.append(row_dict.get(h)) # Adds None in blank spots.

    return l

  def copy(self) -> 'DataFrame':
    """
    Shallow copy of this DataFrame.
    """
    return self.repeat(count=0)

  def repeat(self, count: int) -> 'DataFrame':
    """
    Returns a new DataFrame where each row of this dataframe is repeated count times.
    A repeat of a row is adjacent to other repeats of that same row.
    """
    df = DataFrame()
    df._headers = self._headers.copy()

    rows = []
    for row in self._rows:
      for i in range(count):
        rows.append(row.copy())

    df._rows = rows

    return df

  def merge_data_columns(self, other: 'DataFrame'):
    """
    Merge self and another DataFrame by adding the data from other column-wise.
    For any headers that are the same, data from 'other' is preferred.
    """
    for h in other._headers:
      if not h in self._headers:
        self._headers.append(h)

    append_rows = []

    for self_dict, other_dict in itertools.zip_longest(self._rows, other._rows):
      if not self_dict:
        d = {}
        append_rows.append(d)
      else:
        d = self_dict

      d_other = other_dict
      if d_other:
        for k,v in d_other.items():
          d[k] = v

    for r in append_rows:
      self._rows.append(r)

  def data_row_reduce(self, fnc) -> 'DataFrame':
    """
    Reduces the data row-wise by applying the fnc to each row (column-wise).
    Empty cells are skipped.

    fnc(Iterable[object]) -> object
    fnc is applied over every non-empty cell in that column (descending row-wise).

    Example:
      DataFrame({'a':[1,2,3]}).data_row_reduce(sum) == DataFrame({'a':[6]})

    Returns a new single-row DataFrame.
    """
    df = DataFrame()
    df._headers = self._headers.copy()

    def yield_by_column(header_key):
      for row_dict in self._rows:
        val = row_dict.get(header_key)
        if val:
          yield val

    new_row_dict = {}
    for h in df._headers:
      cell_value = fnc(yield_by_column(h))
      new_row_dict[h] = cell_value

    df._rows = [new_row_dict]
    return df

  def _headers_only(self):
    return self._headers

  def _data_only(self):
    row_len = len(self._rows)

    for i in range(row_len):
      yield self.data_row_at(i)

  def _transposed_data(self):
    return zip(*self._data_only())
 No newline at end of file
Loading