Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6b10e15a authored by Kelvin Zhang's avatar Kelvin Zhang
Browse files

Support generating partial OTAs from extracted target_files

This allows the build system to potentially paralleize generation of OTA
package and zipping of target files

Bug: 262185376
Bug: 227848550
Change-Id: I90b6c25761683ebe3803b22fc8e23540a5282c66
parent f9f3c569
Loading
Loading
Loading
Loading
+27 −0
Original line number Diff line number Diff line
@@ -754,6 +754,33 @@ def ReadFromInputFile(input_file, fn):
  return ReadBytesFromInputFile(input_file, fn).decode()


def WriteBytesToInputFile(input_file, fn, data):
  """Write bytes |data| contents to fn of input zipfile or directory."""
  if isinstance(input_file, zipfile.ZipFile):
    with input_file.open(fn, "w") as entry_fp:
      return entry_fp.write(data)
  elif zipfile.is_zipfile(input_file):
    with zipfile.ZipFile(input_file, "r", allowZip64=True) as zfp:
      with zfp.open(fn, "w") as entry_fp:
        return entry_fp.write(data)
  else:
    if not os.path.isdir(input_file):
      raise ValueError(
          "Invalid input_file, accepted inputs are ZipFile object, path to .zip file on disk, or path to extracted directory. Actual: " + input_file)
    path = os.path.join(input_file, *fn.split("/"))
    try:
      with open(path, "wb") as f:
        return f.write(data)
    except IOError as e:
      if e.errno == errno.ENOENT:
        raise KeyError(fn)


def WriteToInputFile(input_file, fn, str: str):
  """Write str content to fn of input file or directory"""
  return WriteBytesToInputFile(input_file, fn, str.encode())


def ExtractFromInputFile(input_file, fn):
  """Extracts the contents of fn from input zipfile or directory into a file."""
  if isinstance(input_file, zipfile.ZipFile):
+104 −107
Original line number Diff line number Diff line
@@ -270,7 +270,7 @@ import care_map_pb2
import common
import ota_utils
from ota_utils import (UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata,
                       PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME, CopyTargetFilesDir)
                       PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME, ExtractTargetFiles, CopyTargetFilesDir)
from common import DoesInputFileContain, IsSparseImage
import target_files_diff
from check_target_files_vintf import CheckVintfIfTrebleEnabled
@@ -519,16 +519,11 @@ def GetTargetFilesZipWithoutPostinstallConfig(input_file):
  Returns:
    The filename of target-files.zip that doesn't contain postinstall config.
  """
  # We should only make a copy if postinstall_config entry exists.
  with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
    if POSTINSTALL_CONFIG not in input_zip.namelist():
  config_path = os.path.join(input_file, POSTINSTALL_CONFIG)
  if os.path.exists(config_path):
    os.unlink(config_path)
  return input_file

  target_file = common.MakeTempFile(prefix="targetfiles-", suffix=".zip")
  shutil.copyfile(input_file, target_file)
  common.ZipDelete(target_file, POSTINSTALL_CONFIG)
  return target_file


def ParseInfoDict(target_file_path):
  with zipfile.ZipFile(target_file_path, 'r', allowZip64=True) as zfp:
@@ -544,6 +539,17 @@ def GetTargetFilesZipForCustomVABCCompression(input_file, vabc_compression_param
  Returns:
    The path to modified target-files.zip
  """
  if os.path.isdir(input_file):
    dynamic_partition_info_path = os.path.join(
        input_file, "META", "dynamic_partitions_info.txt")
    with open(dynamic_partition_info_path, "r") as fp:
      dynamic_partition_info = fp.read()
    dynamic_partition_info = ModifyVABCCompressionParam(
        dynamic_partition_info, vabc_compression_param)
    with open(dynamic_partition_info_path, "w") as fp:
      fp.write(dynamic_partition_info)
    return input_file

  target_file = common.MakeTempFile(prefix="targetfiles-", suffix=".zip")
  shutil.copyfile(input_file, target_file)
  common.ZipDelete(target_file, DYNAMIC_PARTITION_INFO)
@@ -571,23 +577,7 @@ def GetTargetFilesZipForPartialUpdates(input_file, ab_partitions):
    The filename of target-files.zip used for partial ota update.
  """

  def AddImageForPartition(partition_name):
    """Add the archive name for a given partition to the copy list."""
    for prefix in ['IMAGES', 'RADIO']:
      image_path = '{}/{}.img'.format(prefix, partition_name)
      if image_path in namelist:
        copy_entries.append(image_path)
        map_path = '{}/{}.map'.format(prefix, partition_name)
        if map_path in namelist:
          copy_entries.append(map_path)
        return

    raise ValueError("Cannot find {} in input zipfile".format(partition_name))

  with zipfile.ZipFile(input_file, allowZip64=True) as input_zip:
    original_ab_partitions = input_zip.read(
        AB_PARTITIONS).decode().splitlines()
    namelist = input_zip.namelist()
  original_ab_partitions = common.ReadFromInputFile(input_file, AB_PARTITIONS)

  unrecognized_partitions = [partition for partition in ab_partitions if
                             partition not in original_ab_partitions]
@@ -596,50 +586,65 @@ def GetTargetFilesZipForPartialUpdates(input_file, ab_partitions):
                     unrecognized_partitions)

  logger.info("Generating partial updates for %s", ab_partitions)

  copy_entries = ['META/update_engine_config.txt']
  for partition_name in ab_partitions:
    AddImageForPartition(partition_name)

  # Use zip2zip to avoid extracting the zipfile.
  partial_target_file = common.MakeTempFile(suffix='.zip')
  cmd = ['zip2zip', '-i', input_file, '-o', partial_target_file]
  cmd.extend(['{}:{}'.format(name, name) for name in copy_entries])
  common.RunAndCheckOutput(cmd)

  partial_target_zip = zipfile.ZipFile(partial_target_file, 'a',
                                       allowZip64=True)
  with zipfile.ZipFile(input_file, allowZip64=True) as input_zip:
    common.ZipWriteStr(partial_target_zip, 'META/ab_partitions.txt',
  for subdir in ["IMAGES", "RADIO", "PREBUILT_IMAGES"]:
    image_dir = os.path.join(subdir)
    if not os.path.exists(image_dir):
      continue
    for filename in os.listdir(image_dir):
      filepath = os.path.join(image_dir, filename)
      if filename.endswith(".img"):
        partition_name = filename.removesuffix(".img")
        if partition_name not in ab_partitions:
          os.unlink(filepath)

  common.WriteToInputFile(input_file, 'META/ab_partitions.txt',
                          '\n'.join(ab_partitions))
  CARE_MAP_ENTRY = "META/care_map.pb"
    if CARE_MAP_ENTRY in input_zip.namelist():
  if DoesInputFileContain(input_file, CARE_MAP_ENTRY):
    caremap = care_map_pb2.CareMap()
      caremap.ParseFromString(input_zip.read(CARE_MAP_ENTRY))
    caremap.ParseFromString(
        common.ReadBytesFromInputFile(input_file, CARE_MAP_ENTRY))
    filtered = [
        part for part in caremap.partitions if part.name in ab_partitions]
    del caremap.partitions[:]
    caremap.partitions.extend(filtered)
      common.ZipWriteStr(partial_target_zip, CARE_MAP_ENTRY,
    common.WriteBytesToInputFile(input_file, CARE_MAP_ENTRY,
                                 caremap.SerializeToString())

  for info_file in ['META/misc_info.txt', DYNAMIC_PARTITION_INFO]:
      if info_file not in input_zip.namelist():
    if not DoesInputFileContain(input_file, info_file):
      logger.warning('Cannot find %s in input zipfile', info_file)
      continue
      content = input_zip.read(info_file).decode()

    content = common.ReadFromInputFile(input_file, info_file)
    modified_info = UpdatesInfoForSpecialUpdates(
        content, lambda p: p in ab_partitions)
    if OPTIONS.vabc_compression_param and info_file == DYNAMIC_PARTITION_INFO:
      modified_info = ModifyVABCCompressionParam(
          modified_info, OPTIONS.vabc_compression_param)
      common.ZipWriteStr(partial_target_zip, info_file, modified_info)
    common.WriteToInputFile(input_file, info_file, modified_info)

    # TODO(xunchang) handle META/postinstall_config.txt'
  def IsInPartialList(postinstall_line: str):
    idx = postinstall_line.find("=")
    if idx < 0:
      return False
    key = postinstall_line[:idx]
    logger.info("%s %s", key, ab_partitions)
    for part in ab_partitions:
      if key.endswith("_" + part):
        return True
    return False

  common.ZipClose(partial_target_zip)
  postinstall_config = common.ReadFromInputFile(input_file, POSTINSTALL_CONFIG)
  postinstall_config = [
      line for line in postinstall_config.splitlines() if IsInPartialList(line)]
  if postinstall_config:
    postinstall_config = "\n".join(postinstall_config)
    common.WriteToInputFile(input_file, POSTINSTALL_CONFIG, postinstall_config)
  else:
    os.unlink(os.path.join(input_file, POSTINSTALL_CONFIG))

  return partial_target_file
  return input_file


def GetTargetFilesZipForRetrofitDynamicPartitions(input_file,
@@ -664,20 +669,11 @@ def GetTargetFilesZipForRetrofitDynamicPartitions(input_file,
  replace = {'OTA/super_{}.img'.format(dev): 'IMAGES/{}.img'.format(dev)
             for dev in super_block_devices}

  target_file = common.MakeTempFile(prefix="targetfiles-", suffix=".zip")
  shutil.copyfile(input_file, target_file)

  with zipfile.ZipFile(input_file, allowZip64=True) as input_zip:
    namelist = input_zip.namelist()

  input_tmp = common.UnzipTemp(input_file, RETROFIT_DAP_UNZIP_PATTERN)

  # Remove partitions from META/ab_partitions.txt that is in
  # dynamic_partition_list but not in super_block_devices so that
  # brillo_update_payload won't generate update for those logical partitions.
  ab_partitions_file = os.path.join(input_tmp, *AB_PARTITIONS.split('/'))
  with open(ab_partitions_file) as f:
    ab_partitions_lines = f.readlines()
  ab_partitions_lines = common.ReadFromInputFile(
      input_file, AB_PARTITIONS).split("\n")
  ab_partitions = [line.strip() for line in ab_partitions_lines]
  # Assert that all super_block_devices are in ab_partitions
  super_device_not_updated = [partition for partition in super_block_devices
@@ -686,15 +682,6 @@ def GetTargetFilesZipForRetrofitDynamicPartitions(input_file,
      "{} is in super_block_devices but not in {}".format(
          super_device_not_updated, AB_PARTITIONS)
  # ab_partitions -= (dynamic_partition_list - super_block_devices)
  new_ab_partitions = common.MakeTempFile(
      prefix="ab_partitions", suffix=".txt")
  with open(new_ab_partitions, 'w') as f:
    for partition in ab_partitions:
      if (partition in dynamic_partition_list and
              partition not in super_block_devices):
        logger.info("Dropping %s from ab_partitions.txt", partition)
        continue
      f.write(partition + "\n")
  to_delete = [AB_PARTITIONS]

  # Always skip postinstall for a retrofit update.
@@ -707,24 +694,28 @@ def GetTargetFilesZipForRetrofitDynamicPartitions(input_file,
  # Remove the existing partition images as well as the map files.
  to_delete += list(replace.values())
  to_delete += ['IMAGES/{}.map'.format(dev) for dev in super_block_devices]

  common.ZipDelete(target_file, to_delete)

  target_zip = zipfile.ZipFile(target_file, 'a', allowZip64=True)
  for item in to_delete:
    os.unlink(os.path.join(input_file, item))

  # Write super_{foo}.img as {foo}.img.
  for src, dst in replace.items():
    assert src in namelist, \
    assert DoesInputFileContain(input_file, src), \
        'Missing {} in {}; {} cannot be written'.format(src, input_file, dst)
    unzipped_file = os.path.join(input_tmp, *src.split('/'))
    common.ZipWrite(target_zip, unzipped_file, arcname=dst)
    source_path = os.path.join(input_file, *src.split("/"))
    target_path = os.path.join(input_file, *dst.split("/"))
    os.rename(source_path, target_path)

  # Write new ab_partitions.txt file
  common.ZipWrite(target_zip, new_ab_partitions, arcname=AB_PARTITIONS)

  common.ZipClose(target_zip)
  new_ab_partitions = os.paht.join(input_file, AB_PARTITIONS)
  with open(new_ab_partitions, 'w') as f:
    for partition in ab_partitions:
      if (partition in dynamic_partition_list and
              partition not in super_block_devices):
        logger.info("Dropping %s from ab_partitions.txt", partition)
        continue
      f.write(partition + "\n")

  return target_file
  return input_file


def GetTargetFilesZipForCustomImagesUpdates(input_file, custom_images):
@@ -833,14 +824,20 @@ def SupportsMainlineGkiUpdates(target_file):
  return pattern.search(output) is not None


def ExtractOrCopyTargetFiles(target_file):
  if os.path.isdir(target_file):
    return CopyTargetFilesDir(target_file)
  else:
    return ExtractTargetFiles(target_file)


def GenerateAbOtaPackage(target_file, output_file, source_file=None):
  """Generates an Android OTA package that has A/B update payload."""
  # If input target_files are directories, create a copy so that we can modify
  # them directly
  if os.path.isdir(target_file):
    target_file = CopyTargetFilesDir(target_file)
  if source_file is not None and os.path.isdir(source_file):
    source_file = CopyTargetFilesDir(source_file)
  target_file = ExtractOrCopyTargetFiles(target_file)
  if source_file is not None:
    source_file = ExtractOrCopyTargetFiles(source_file)
  # Stage the output zip package for package signing.
  if not OPTIONS.no_signing:
    staging_file = common.MakeTempFile(suffix='.zip')
@@ -851,7 +848,7 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None):
                               allowZip64=True)

  if source_file is not None:
    source_file = ota_utils.ExtractTargetFiles(source_file)
    source_file = ExtractTargetFiles(source_file)
    assert "ab_partitions" in OPTIONS.source_info_dict, \
        "META/ab_partitions.txt is required for ab_update."
    assert "ab_partitions" in OPTIONS.target_info_dict, \
@@ -948,10 +945,10 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None):
  elif OPTIONS.partial:
    target_file = GetTargetFilesZipForPartialUpdates(target_file,
                                                     OPTIONS.partial)
  elif OPTIONS.vabc_compression_param:
  if OPTIONS.vabc_compression_param:
    target_file = GetTargetFilesZipForCustomVABCCompression(
        target_file, OPTIONS.vabc_compression_param)
  elif OPTIONS.skip_postinstall:
  if OPTIONS.skip_postinstall:
    target_file = GetTargetFilesZipWithoutPostinstallConfig(target_file)
  # Target_file may have been modified, reparse ab_partitions
  target_info.info_dict['ab_partitions'] = common.ReadFromInputFile(target_file,
+7 −2
Original line number Diff line number Diff line
@@ -1047,10 +1047,15 @@ def Fnmatch(filename, pattersn):

def CopyTargetFilesDir(input_dir):
  output_dir = common.MakeTempDir("target_files")
  shutil.copytree(os.path.join(input_dir, "IMAGES"), os.path.join(
      output_dir, "IMAGES"), dirs_exist_ok=True)
  IMAGES_DIR = ["IMAGES", "PREBUILT_IMAGES", "RADIO"]
  for subdir in IMAGES_DIR:
    if not os.path.exists(os.path.join(input_dir, subdir)):
      continue
    shutil.copytree(os.path.join(input_dir, subdir), os.path.join(
        output_dir, subdir), dirs_exist_ok=True, copy_function=os.link)
  shutil.copytree(os.path.join(input_dir, "META"), os.path.join(
      output_dir, "META"), dirs_exist_ok=True)

  for (dirpath, _, filenames) in os.walk(input_dir):
    for filename in filenames:
      path = os.path.join(dirpath, filename)