Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 44cb0db6 authored by Tao Bao's avatar Tao Bao Committed by Gerrit Code Review
Browse files

Merge "releasetools: Reduce the memory use in test_common.py."

parents 207b009c 31b08073
Loading
Loading
Loading
Loading
+48 −38
Original line number Diff line number Diff line
@@ -20,32 +20,27 @@ import time
import unittest
import zipfile

from hashlib import sha1

import common
import validate_target_files


def random_string_with_holes(size, block_size, step_size):
  data = ["\0"] * size
  for begin in range(0, size, step_size):
    end = begin + block_size
    data[begin:end] = os.urandom(block_size)
  return "".join(data)
KiB = 1024
MiB = 1024 * KiB
GiB = 1024 * MiB

def get_2gb_string():
  kilobytes = 1024
  megabytes = 1024 * kilobytes
  gigabytes = 1024 * megabytes

  size = int(2 * gigabytes + 1)
  block_size = 4 * kilobytes
  step_size = 4 * megabytes
  two_gb_string = random_string_with_holes(
        size, block_size, step_size)
  return two_gb_string
  size = int(2 * GiB + 1)
  block_size = 4 * KiB
  step_size = 4 * MiB
  # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
  for _ in range(0, size, step_size):
    yield os.urandom(block_size)
    yield '\0' * (step_size - block_size)


class CommonZipTest(unittest.TestCase):
  def _verify(self, zip_file, zip_file_name, arcname, contents,
  def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
              test_file_name=None, expected_stat=None, expected_mode=0o644,
              expected_compress_type=zipfile.ZIP_STORED):
    # Verify the stat if present.
@@ -69,7 +64,11 @@ class CommonZipTest(unittest.TestCase):
    self.assertEqual(info.compress_type, expected_compress_type)

    # Verify the zip contents.
    self.assertEqual(zip_file.read(arcname), contents)
    entry = zip_file.open(arcname)
    sha1_hash = sha1()
    for chunk in iter(lambda: entry.read(4 * MiB), ''):
      sha1_hash.update(chunk)
    self.assertEqual(expected_hash, sha1_hash.hexdigest())
    self.assertIsNone(zip_file.testzip())

  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
@@ -90,7 +89,10 @@ class CommonZipTest(unittest.TestCase):
    zip_file = zipfile.ZipFile(zip_file_name, "w")

    try:
      test_file.write(contents)
      sha1_hash = sha1()
      for data in contents:
        sha1_hash.update(data)
        test_file.write(data)
      test_file.close()

      expected_stat = os.stat(test_file_name)
@@ -102,8 +104,9 @@ class CommonZipTest(unittest.TestCase):
      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
      common.ZipClose(zip_file)

      self._verify(zip_file, zip_file_name, arcname, contents, test_file_name,
                   expected_stat, expected_mode, expected_compress_type)
      self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
                   test_file_name, expected_stat, expected_mode,
                   expected_compress_type)
    finally:
      os.remove(test_file_name)
      os.remove(zip_file_name)
@@ -133,7 +136,7 @@ class CommonZipTest(unittest.TestCase):
      common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
      common.ZipClose(zip_file)

      self._verify(zip_file, zip_file_name, arcname, contents,
      self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
                   expected_mode=expected_mode,
                   expected_compress_type=expected_compress_type)
    finally:
@@ -159,7 +162,10 @@ class CommonZipTest(unittest.TestCase):
    zip_file = zipfile.ZipFile(zip_file_name, "w")

    try:
      test_file.write(large)
      sha1_hash = sha1()
      for data in large:
        sha1_hash.update(data)
        test_file.write(data)
      test_file.close()

      expected_stat = os.stat(test_file_name)
@@ -173,12 +179,13 @@ class CommonZipTest(unittest.TestCase):
      common.ZipClose(zip_file)

      # Verify the contents written by ZipWrite().
      self._verify(zip_file, zip_file_name, arcname_large, large,
                   test_file_name, expected_stat, expected_mode,
                   expected_compress_type)
      self._verify(zip_file, zip_file_name, arcname_large,
                   sha1_hash.hexdigest(), test_file_name, expected_stat,
                   expected_mode, expected_compress_type)

      # Verify the contents written by ZipWriteStr().
      self._verify(zip_file, zip_file_name, arcname_small, small,
      self._verify(zip_file, zip_file_name, arcname_small,
                   sha1(small).hexdigest(),
                   expected_compress_type=expected_compress_type)
    finally:
      os.remove(zip_file_name)
@@ -287,13 +294,17 @@ class CommonZipTest(unittest.TestCase):
      common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
      common.ZipClose(zip_file)

      self._verify(zip_file, zip_file_name, "foo", random_string,
      self._verify(zip_file, zip_file_name, "foo",
                   sha1(random_string).hexdigest(),
                   expected_mode=0o644)
      self._verify(zip_file, zip_file_name, "bar", random_string,
      self._verify(zip_file, zip_file_name, "bar",
                   sha1(random_string).hexdigest(),
                   expected_mode=0o755)
      self._verify(zip_file, zip_file_name, "baz", random_string,
      self._verify(zip_file, zip_file_name, "baz",
                   sha1(random_string).hexdigest(),
                   expected_mode=0o740)
      self._verify(zip_file, zip_file_name, "qux", random_string,
      self._verify(zip_file, zip_file_name, "qux",
                   sha1(random_string).hexdigest(),
                   expected_mode=0o400)
    finally:
      os.remove(zip_file_name)
@@ -310,8 +321,7 @@ class InstallRecoveryScriptFormatTest(unittest.TestCase):
    dummy_fstab = \
        ["/dev/soc.0/by-name/boot /boot emmc defaults defaults",
         "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
    self._info["fstab"] = common.LoadRecoveryFSTab(lambda x : "\n".join(x),
                                                   2, dummy_fstab)
    self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
    # Construct the gzipped recovery.img and boot.img
    self.recovery_data = bytearray([
        0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
@@ -332,8 +342,8 @@ class InstallRecoveryScriptFormatTest(unittest.TestCase):
      f.write(data)

  def test_full_recovery(self):
    recovery_image = common.File("recovery.img", self.recovery_data);
    boot_image = common.File("boot.img", self.boot_data);
    recovery_image = common.File("recovery.img", self.recovery_data)
    boot_image = common.File("boot.img", self.boot_data)
    self._info["full_recovery_image"] = "true"

    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
@@ -342,9 +352,9 @@ class InstallRecoveryScriptFormatTest(unittest.TestCase):
                                                        self._info)

  def test_recovery_from_boot(self):
    recovery_image = common.File("recovery.img", self.recovery_data);
    recovery_image = common.File("recovery.img", self.recovery_data)
    self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
    boot_image = common.File("boot.img", self.boot_data);
    boot_image = common.File("boot.img", self.boot_data)
    self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")

    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,