Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 62d130b2 authored by Kelvin Zhang's avatar Kelvin Zhang Committed by Automerger Merge Worker
Browse files

Merge "Use seek() instead of writing 0s" into main am: dfa0c857

parents 892071ae dfa0c857
Loading
Loading
Loading
Loading
+69 −63
Original line number Diff line number Diff line
@@ -15,14 +15,13 @@
#

import copy
import json
import os
import subprocess
import tempfile
import time
import unittest
import zipfile
from hashlib import sha1
from typing import BinaryIO

import common
import test_utils
@@ -36,14 +35,24 @@ MiB = 1024 * KiB
GiB = 1024 * MiB


def get_2gb_string():
def get_2gb_file():
  size = int(2 * GiB + 1)
  block_size = 4 * KiB
  step_size = 4 * MiB
  # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
  tmpfile = tempfile.NamedTemporaryFile()
  tmpfile.truncate(size)
  for _ in range(0, size, step_size):
    yield os.urandom(block_size)
    yield b'\0' * (step_size - block_size)
    tmpfile.write(os.urandom(block_size))
    tmpfile.seek(step_size - block_size, os.SEEK_CUR)
  return tmpfile


def hash_file(filename):
  sha1_hash = sha1()
  with open(filename, "rb") as fp:
    for data in iter(lambda: fp.read(4*MiB), b''):
      sha1_hash.update(data)
  return sha1_hash


class BuildInfoTest(test_utils.ReleaseToolsTestCase):
@@ -429,6 +438,13 @@ class CommonZipTest(test_utils.ReleaseToolsTestCase):
    self.assertIsNone(zip_file.testzip())

  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
    with tempfile.NamedTemporaryFile() as test_file:
      test_file_name = test_file.name
      for data in contents:
        test_file.write(bytes(data))
      return self._test_ZipWriteFile(test_file_name, extra_zipwrite_args)

  def _test_ZipWriteFile(self, test_file_name, extra_zipwrite_args=None):
    extra_zipwrite_args = dict(extra_zipwrite_args or {})

    test_file = tempfile.NamedTemporaryFile(delete=False)
@@ -441,17 +457,12 @@ class CommonZipTest(test_utils.ReleaseToolsTestCase):
    arcname = extra_zipwrite_args.get("arcname", test_file_name)
    if arcname[0] == "/":
      arcname = arcname[1:]
    sha1_hash = hash_file(test_file_name)

    zip_file.close()
    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)

    try:
      sha1_hash = sha1()
      for data in contents:
        sha1_hash.update(bytes(data))
        test_file.write(bytes(data))
      test_file.close()

      expected_mode = extra_zipwrite_args.get("perms", 0o644)
      expected_compress_type = extra_zipwrite_args.get("compress_type",
                                                       zipfile.ZIP_STORED)
@@ -467,7 +478,6 @@ class CommonZipTest(test_utils.ReleaseToolsTestCase):
                   test_file_name, expected_stat, expected_mode,
                   expected_compress_type)
    finally:
      os.remove(test_file_name)
      os.remove(zip_file_name)

  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
@@ -502,14 +512,13 @@ class CommonZipTest(test_utils.ReleaseToolsTestCase):
    finally:
      os.remove(zip_file_name)

  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
  def _test_ZipWriteStr_large_file(self, large_file: BinaryIO, small, extra_args=None):
    extra_args = dict(extra_args or {})

    zip_file = tempfile.NamedTemporaryFile(delete=False)
    zip_file_name = zip_file.name

    test_file = tempfile.NamedTemporaryFile(delete=False)
    test_file_name = test_file.name
    test_file_name = large_file.name

    arcname_large = test_file_name
    arcname_small = "bar"
@@ -522,11 +531,7 @@ class CommonZipTest(test_utils.ReleaseToolsTestCase):
    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)

    try:
      sha1_hash = sha1()
      for data in large:
        sha1_hash.update(data)
        test_file.write(data)
      test_file.close()
      sha1_hash = hash_file(test_file_name)

      # Arbitrary timestamp, just to make sure common.ZipWrite() restores
      # the timestamp after writing.
@@ -551,7 +556,6 @@ class CommonZipTest(test_utils.ReleaseToolsTestCase):
                   expected_compress_type=expected_compress_type)
    finally:
      os.remove(zip_file_name)
      os.remove(test_file_name)

  def _test_reset_ZIP64_LIMIT(self, func, *args):
    default_limit = (1 << 31) - 1
@@ -577,8 +581,8 @@ class CommonZipTest(test_utils.ReleaseToolsTestCase):
    })

  def test_ZipWrite_large_file(self):
    file_contents = get_2gb_string()
    self._test_ZipWrite(file_contents, {
    with get_2gb_file() as tmpfile:
      self._test_ZipWriteFile(tmpfile.name, {
          "compress_type": zipfile.ZIP_DEFLATED,
      })

@@ -627,9 +631,9 @@ class CommonZipTest(test_utils.ReleaseToolsTestCase):
    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
    # the workaround. We will only test the case of writing a string into a
    # large archive.
    long_string = get_2gb_string()
    short_string = os.urandom(1024)
    self._test_ZipWriteStr_large_file(long_string, short_string, {
    with get_2gb_file() as large_file:
      self._test_ZipWriteStr_large_file(large_file, short_string, {
          "compress_type": zipfile.ZIP_DEFLATED,
      })

@@ -1237,7 +1241,8 @@ class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
    self.assertEqual(
        '1-5 9-10',
        sparse_image.file_map['//system/file1'].extra['text_str'])
    self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
    self.assertTrue(
        sparse_image.file_map['//system/file2'].extra['incomplete'])
    self.assertTrue(
        sparse_image.file_map['/system/app/file3'].extra['incomplete'])

@@ -1667,6 +1672,7 @@ class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
    self.assertRaises(common.ExternalError, common._GenerateGkiCertificate,
                      test_file.name, 'generic_kernel')


class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
  """Checks the format of install-recovery.sh.