Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c3075321 authored by Treehugger Robot's avatar Treehugger Robot Committed by Automerger Merge Worker
Browse files

Merge "Allow ota_from_target_file to work entirely on directories" am:...

Merge "Allow ota_from_target_file to work entirely on directories" am: ece71703 am: 3de45a7b am: 4f600c2a

Original change: https://android-review.googlesource.com/c/platform/build/+/2542170



Change-Id: Id9d0a79469a15b3ff8149826ef523a155eacb840
Signed-off-by: default avatarAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
parents adaddab3 4f600c2a
Loading
Loading
Loading
Loading
+30 −9
Original line number Diff line number Diff line
@@ -717,26 +717,46 @@ class BuildInfo(object):
      script.AssertOemProperty(prop, values, oem_no_mount)


def ReadFromInputFile(input_file, fn):
  """Reads the contents of fn from input zipfile or directory."""
def DoesInputFileContain(input_file, fn):
  """Check whether the input target_files.zip contain an entry `fn`"""
  if isinstance(input_file, zipfile.ZipFile):
    return fn in input_file.namelist()
  elif zipfile.is_zipfile(input_file):
    with zipfile.ZipFile(input_file, "r", allowZip64=True) as zfp:
      return fn in zfp.namelist()
  else:
    if not os.path.isdir(input_file):
      raise ValueError(
          "Invalid input_file, accepted inputs are ZipFile object, path to .zip file on disk, or path to extracted directory. Actual: " + input_file)
    path = os.path.join(input_file, *fn.split("/"))
    return os.path.exists(path)


def ReadBytesFromInputFile(input_file, fn):
  """Reads the bytes of fn from input zipfile or directory."""
  if isinstance(input_file, zipfile.ZipFile):
    return input_file.read(fn).decode()
    return input_file.read(fn)
  elif zipfile.is_zipfile(input_file):
    with zipfile.ZipFile(input_file, "r", allowZip64=True) as zfp:
      return zfp.read(fn).decode()
      return zfp.read(fn)
  else:
    if not os.path.isdir(input_file):
      raise ValueError(
          "Invalid input_file, accepted inputs are ZipFile object, path to .zip file on disk, or path to extracted directory. Actual: " + input_file)
    path = os.path.join(input_file, *fn.split("/"))
    try:
      with open(path) as f:
      with open(path, "rb") as f:
        return f.read()
    except IOError as e:
      if e.errno == errno.ENOENT:
        raise KeyError(fn)


def ReadFromInputFile(input_file, fn):
  """Reads the str contents of fn from input zipfile or directory."""
  return ReadBytesFromInputFile(input_file, fn).decode()


def ExtractFromInputFile(input_file, fn):
  """Extracts the contents of fn from input zipfile or directory into a file."""
  if isinstance(input_file, zipfile.ZipFile):
@@ -1540,7 +1560,8 @@ def BuildVBMeta(image_path, partitions, name, needed_partitions):

  custom_partitions = OPTIONS.info_dict.get(
      "avb_custom_images_partition_list", "").strip().split()
  custom_avb_partitions = ["vbmeta_" + part for part in OPTIONS.info_dict.get("avb_custom_vbmeta_images_partition_list", "").strip().split()]
  custom_avb_partitions = ["vbmeta_" + part for part in OPTIONS.info_dict.get(
      "avb_custom_vbmeta_images_partition_list", "").strip().split()]

  for partition, path in partitions.items():
    if partition not in needed_partitions:
@@ -2976,7 +2997,6 @@ def ZipDelete(zip_filename, entries, force=False):
      cmd.append(entry)
    RunAndCheckOutput(cmd)


  os.replace(new_zipfile, zip_filename)


@@ -4081,6 +4101,7 @@ def IsSparseImage(filepath):
    # https://source.android.com/devices/bootloader/images
    return fp.read(4) == b'\x3A\xFF\x26\xED'


def ParseUpdateEngineConfig(path: str):
  """Parse the update_engine config stored in file `path`
  Args
+18 −16
Original line number Diff line number Diff line
@@ -267,8 +267,8 @@ import care_map_pb2
import common
import ota_utils
from ota_utils import (UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata,
                       PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME)
from common import IsSparseImage
                       PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME, CopyTargetFilesDir)
from common import DoesInputFileContain, IsSparseImage
import target_files_diff
from check_target_files_vintf import CheckVintfIfTrebleEnabled
from non_ab_ota import GenerateNonAbOtaPackage
@@ -830,6 +830,12 @@ def SupportsMainlineGkiUpdates(target_file):

def GenerateAbOtaPackage(target_file, output_file, source_file=None):
  """Generates an Android OTA package that has A/B update payload."""
  # If input target_files are directories, create a copy so that we can modify
  # them directly
  if os.path.isdir(target_file):
    target_file = CopyTargetFilesDir(target_file)
  if source_file is not None and os.path.isdir(source_file):
    source_file = CopyTargetFilesDir(source_file)
  # Stage the output zip package for package signing.
  if not OPTIONS.no_signing:
    staging_file = common.MakeTempFile(suffix='.zip')
@@ -840,6 +846,7 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None):
                               allowZip64=True)

  if source_file is not None:
    source_file = ota_utils.ExtractTargetFiles(source_file)
    assert "ab_partitions" in OPTIONS.source_info_dict, \
        "META/ab_partitions.txt is required for ab_update."
    assert "ab_partitions" in OPTIONS.target_info_dict, \
@@ -942,9 +949,8 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None):
  elif OPTIONS.skip_postinstall:
    target_file = GetTargetFilesZipWithoutPostinstallConfig(target_file)
  # Target_file may have been modified, reparse ab_partitions
  with zipfile.ZipFile(target_file, allowZip64=True) as zfp:
    target_info.info_dict['ab_partitions'] = zfp.read(
        AB_PARTITIONS).decode().strip().split("\n")
  target_info.info_dict['ab_partitions'] = common.ReadFromInputFile(target_file,
                                                                    AB_PARTITIONS).strip().split("\n")

  CheckVintfIfTrebleEnabled(target_file, target_info)

@@ -1042,15 +1048,13 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None):

  # If dm-verity is supported for the device, copy contents of care_map
  # into A/B OTA package.
  target_zip = zipfile.ZipFile(target_file, "r", allowZip64=True)
  if target_info.get("avb_enable") == "true":
    care_map_list = [x for x in ["care_map.pb", "care_map.txt"] if
                     "META/" + x in target_zip.namelist()]

    # Adds care_map if either the protobuf format or the plain text one exists.
    if care_map_list:
      care_map_name = care_map_list[0]
      care_map_data = target_zip.read("META/" + care_map_name)
    for care_map_name in ["care_map.pb", "care_map.txt"]:
      if not DoesInputFileContain(target_file, "META/" + care_map_name):
        continue
      care_map_data = common.ReadBytesFromInputFile(
          target_file, "META/" + care_map_name)
      # In order to support streaming, care_map needs to be packed as
      # ZIP_STORED.
      common.ZipWriteStr(output_zip, care_map_name, care_map_data,
@@ -1060,13 +1064,11 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None):

  # Add the source apex version for incremental ota updates, and write the
  # result apex info to the ota package.
  ota_apex_info = ota_utils.ConstructOtaApexInfo(target_zip, source_file)
  ota_apex_info = ota_utils.ConstructOtaApexInfo(target_file, source_file)
  if ota_apex_info is not None:
    common.ZipWriteStr(output_zip, "apex_info.pb", ota_apex_info,
                       compress_type=zipfile.ZIP_STORED)

  common.ZipClose(target_zip)

  # We haven't written the metadata entry yet, which will be handled in
  # FinalizeMetadata().
  common.ZipClose(output_zip)
@@ -1257,7 +1259,7 @@ def main(argv):
  if OPTIONS.extracted_input is not None:
    OPTIONS.info_dict = common.LoadInfoDict(OPTIONS.extracted_input)
  else:
    OPTIONS.info_dict = ParseInfoDict(args[0])
    OPTIONS.info_dict = common.LoadInfoDict(args[0])

  if OPTIONS.wipe_user_data:
    if not OPTIONS.vabc_downgrade:
+31 −7
Original line number Diff line number Diff line
@@ -22,7 +22,8 @@ import zipfile

import ota_metadata_pb2
import common
from common import (ZipDelete, ZipClose, OPTIONS, MakeTempFile,
import fnmatch
from common import (ZipDelete, DoesInputFileContain, ReadBytesFromInputFile, OPTIONS, MakeTempFile,
                    ZipWriteStr, BuildInfo, LoadDictionaryFromFile,
                    SignFile, PARTITIONS_WITH_BUILD_PROP, PartitionBuildProps,
                    GetRamdiskFormat, ParseUpdateEngineConfig)
@@ -44,7 +45,8 @@ OPTIONS.boot_variable_file = None

METADATA_NAME = 'META-INF/com/android/metadata'
METADATA_PROTO_NAME = 'META-INF/com/android/metadata.pb'
UNZIP_PATTERN = ['IMAGES/*', 'META/*', 'OTA/*', 'RADIO/*']
UNZIP_PATTERN = ['IMAGES/*', 'META/*', 'OTA/*',
                 'RADIO/*', '*/build.prop', '*/default.prop', '*/build.default', "*/etc/vintf/*"]
SECURITY_PATCH_LEVEL_PROP_NAME = "ro.build.version.security_patch"


@@ -626,12 +628,10 @@ def ConstructOtaApexInfo(target_zip, source_file=None):
  """If applicable, add the source version to the apex info."""

  def _ReadApexInfo(input_zip):
    if "META/apex_info.pb" not in input_zip.namelist():
    if not DoesInputFileContain(input_zip, "META/apex_info.pb"):
      logger.warning("target_file doesn't contain apex_info.pb %s", input_zip)
      return None

    with input_zip.open("META/apex_info.pb", "r") as zfp:
      return zfp.read()
    return ReadBytesFromInputFile(input_zip, "META/apex_info.pb")

  target_apex_string = _ReadApexInfo(target_zip)
  # Return early if the target apex info doesn't exist or is empty.
@@ -727,7 +727,7 @@ def ExtractTargetFiles(path: str):
    logger.info("target files %s is already extracted", path)
    return path
  extracted_dir = common.MakeTempDir("target_files")
  common.UnzipToDir(path, extracted_dir, UNZIP_PATTERN)
  common.UnzipToDir(path, extracted_dir, UNZIP_PATTERN + [""])
  return extracted_dir


@@ -1040,3 +1040,27 @@ class AbOtaPropertyFiles(StreamingPropertyFiles):
    assert metadata_total <= payload_size

    return (payload_offset, metadata_total)


def Fnmatch(filename, pattersn):
  return any([fnmatch.fnmatch(filename, pat) for pat in pattersn])


def CopyTargetFilesDir(input_dir):
  output_dir = common.MakeTempDir("target_files")
  shutil.copytree(os.path.join(input_dir, "IMAGES"), os.path.join(
      output_dir, "IMAGES"), dirs_exist_ok=True)
  shutil.copytree(os.path.join(input_dir, "META"), os.path.join(
      output_dir, "META"), dirs_exist_ok=True)
  for (dirpath, _, filenames) in os.walk(input_dir):
    for filename in filenames:
      path = os.path.join(dirpath, filename)
      relative_path = path.removeprefix(input_dir).removeprefix("/")
      if not Fnmatch(relative_path, UNZIP_PATTERN):
        continue
      if filename.endswith(".prop") or filename == "prop.default" or "/etc/vintf/" in relative_path:
        target_path = os.path.join(
            output_dir, relative_path)
        os.makedirs(os.path.dirname(target_path), exist_ok=True)
        shutil.copy(path, target_path)
  return output_dir