Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4aae62ee authored by Dennis Song's avatar Dennis Song
Browse files

Resolve conflict AVB rollback index location

Add an `--avb-resolve-rollback-index-location-conflict`
option in merge_target_files. When this option is set,
the merge tool will resolve conflicting index locations
by assigning the smallest unused index location.

This is to support merging system and vendor target files
from two different targets. In this case, the two target
files may have conflict rollback index location because
they were built independently.

Test: atest releasetools_test
Test: validate_target_files *-target_files-*.zip
Test: merge_target_files &&
        add_img_to_target_files &&
        img_from_target_files &&
        flash device
Bug: 300604688
Change-Id: Ibd18ef2a9f3784157fe17966f5364c3c81c9bd9f
parent d61f2efd
Loading
Loading
Loading
Loading
+11 −2
Original line number Original line Diff line number Diff line
@@ -42,6 +42,10 @@ Usage: add_img_to_target_files [flag] target_files
  --is_signing
  --is_signing
      Skip building & adding the images for "userdata" and "cache" if we
      Skip building & adding the images for "userdata" and "cache" if we
      are signing the target files.
      are signing the target files.

  --avb-resolve-rollback-index-location-conflict
      If provided, resolve the conflict AVB rollback index location when
      necessary.
"""
"""


from __future__ import print_function
from __future__ import print_function
@@ -81,6 +85,7 @@ OPTIONS.add_missing = False
OPTIONS.rebuild_recovery = False
OPTIONS.rebuild_recovery = False
OPTIONS.replace_updated_files_list = []
OPTIONS.replace_updated_files_list = []
OPTIONS.is_signing = False
OPTIONS.is_signing = False
OPTIONS.avb_resolve_rollback_index_location_conflict = False




def ParseAvbFooter(img_path) -> avbtool.AvbFooter:
def ParseAvbFooter(img_path) -> avbtool.AvbFooter:
@@ -682,7 +687,8 @@ def AddVBMeta(output_zip, partitions, name, needed_partitions):
    logger.info("%s.img already exists; not rebuilding...", name)
    logger.info("%s.img already exists; not rebuilding...", name)
    return img.name
    return img.name


  common.BuildVBMeta(img.name, partitions, name, needed_partitions)
  common.BuildVBMeta(img.name, partitions, name, needed_partitions,
                     OPTIONS.avb_resolve_rollback_index_location_conflict)
  img.Write()
  img.Write()
  return img.name
  return img.name


@@ -1224,6 +1230,8 @@ def main(argv):
                       " please switch to AVB")
                       " please switch to AVB")
    elif o == "--is_signing":
    elif o == "--is_signing":
      OPTIONS.is_signing = True
      OPTIONS.is_signing = True
    elif o == "--avb_resolve_rollback_index_location_conflict":
      OPTIONS.avb_resolve_rollback_index_location_conflict = True
    else:
    else:
      return False
      return False
    return True
    return True
@@ -1233,7 +1241,8 @@ def main(argv):
      extra_long_opts=["add_missing", "rebuild_recovery",
      extra_long_opts=["add_missing", "rebuild_recovery",
                       "replace_verity_public_key=",
                       "replace_verity_public_key=",
                       "replace_verity_private_key=",
                       "replace_verity_private_key=",
                       "is_signing"],
                       "is_signing",
                       "avb_resolve_rollback_index_location_conflict"],
      extra_option_handler=option_handler)
      extra_option_handler=option_handler)


  if len(args) != 1:
  if len(args) != 1:
+85 −7
Original line number Original line Diff line number Diff line
@@ -39,6 +39,7 @@ import tempfile
import threading
import threading
import time
import time
import zipfile
import zipfile
from dataclasses import dataclass
from genericpath import isdir
from genericpath import isdir
from hashlib import sha1, sha256
from hashlib import sha1, sha256


@@ -144,6 +145,19 @@ PARTITIONS_WITH_BUILD_PROP = PARTITIONS_WITH_CARE_MAP + ['boot', 'init_boot']
RAMDISK_BUILD_PROP_REL_PATHS = ['system/etc/ramdisk/build.prop']
RAMDISK_BUILD_PROP_REL_PATHS = ['system/etc/ramdisk/build.prop']




@dataclass
class AvbChainedPartitionArg:
  """The required arguments for avbtool --chain_partition."""
  partition: str
  rollback_index_location: int
  pubkey_path: str

  def to_string(self):
    """Convert to string command arguments."""
    return '{}:{}:{}'.format(
        self.partition, self.rollback_index_location, self.pubkey_path)


class ErrorCode(object):
class ErrorCode(object):
  """Define error_codes for failures that happen during the actual
  """Define error_codes for failures that happen during the actual
  update package installation.
  update package installation.
@@ -1452,7 +1466,7 @@ def ResolveAVBSigningPathArgs(split_args):




def GetAvbPartitionArg(partition, image, info_dict=None):
def GetAvbPartitionArg(partition, image, info_dict=None):
  """Returns the VBMeta arguments for partition.
  """Returns the VBMeta arguments for one partition.


  It sets up the VBMeta argument by including the partition descriptor from the
  It sets up the VBMeta argument by including the partition descriptor from the
  given 'image', or by configuring the partition as a chained partition.
  given 'image', or by configuring the partition as a chained partition.
@@ -1464,7 +1478,7 @@ def GetAvbPartitionArg(partition, image, info_dict=None):
        OPTIONS.info_dict if None has been given.
        OPTIONS.info_dict if None has been given.


  Returns:
  Returns:
    A list of VBMeta arguments.
    A list of VBMeta arguments for one partition.
  """
  """
  if info_dict is None:
  if info_dict is None:
    info_dict = OPTIONS.info_dict
    info_dict = OPTIONS.info_dict
@@ -1487,6 +1501,61 @@ def GetAvbPartitionArg(partition, image, info_dict=None):
  return [AVB_ARG_NAME_CHAIN_PARTITION, chained_partition_arg]
  return [AVB_ARG_NAME_CHAIN_PARTITION, chained_partition_arg]




def GetAvbPartitionsArg(partitions,
                        resolve_rollback_index_location_conflict=False,
                        info_dict=None):
  """Returns the VBMeta arguments for all AVB partitions.

  It sets up the VBMeta argument by calling GetAvbPartitionArg of all
  partitions.

  Args:
    partitions: A dict of all AVB partitions.
    resolve_rollback_index_location_conflict: If true, resolve conflicting avb
        rollback index locations by assigning the smallest unused value.
    info_dict: A dict returned by common.LoadInfoDict().

  Returns:
    A list of VBMeta arguments for all partitions.
  """
  # An AVB partition will be linked into a vbmeta partition by either
  # AVB_ARG_NAME_INCLUDE_DESC_FROM_IMG or AVB_ARG_NAME_CHAIN_PARTITION, there
  # should be no other cases.
  valid_args = {
      AVB_ARG_NAME_INCLUDE_DESC_FROM_IMG: [],
      AVB_ARG_NAME_CHAIN_PARTITION: []
  }

  for partition, path in partitions.items():
    avb_partition_arg = GetAvbPartitionArg(partition, path, info_dict)
    if not avb_partition_arg:
      continue
    arg_name, arg_value = avb_partition_arg
    assert arg_name in valid_args
    valid_args[arg_name].append(arg_value)

  # Copy the arguments for non-chained AVB partitions directly without
  # intervention.
  avb_args = []
  for image in valid_args[AVB_ARG_NAME_INCLUDE_DESC_FROM_IMG]:
    avb_args.extend([AVB_ARG_NAME_INCLUDE_DESC_FROM_IMG, image])

  # Handle chained AVB partitions. The rollback index location might be
  # adjusted if two partitions use the same value. This may happen when mixing
  # a shared system image with other vendor images.
  used_index_loc = set()
  for chained_partition_arg in valid_args[AVB_ARG_NAME_CHAIN_PARTITION]:
    if resolve_rollback_index_location_conflict:
      while chained_partition_arg.rollback_index_location in used_index_loc:
        chained_partition_arg.rollback_index_location += 1

    used_index_loc.add(chained_partition_arg.rollback_index_location)
    avb_args.extend([AVB_ARG_NAME_CHAIN_PARTITION,
                     chained_partition_arg.to_string()])

  return avb_args


def GetAvbChainedPartitionArg(partition, info_dict, key=None):
def GetAvbChainedPartitionArg(partition, info_dict, key=None):
  """Constructs and returns the arg to build or verify a chained partition.
  """Constructs and returns the arg to build or verify a chained partition.


@@ -1498,8 +1567,8 @@ def GetAvbChainedPartitionArg(partition, info_dict, key=None):
        the key listed in info_dict.
        the key listed in info_dict.


  Returns:
  Returns:
    A string of form "partition:rollback_index_location:key" that can be used to
    An AvbChainedPartitionArg object with rollback_index_location and
    build or verify vbmeta image.
    pubkey_path that can be used to build or verify vbmeta image.
  """
  """
  if key is None:
  if key is None:
    key = info_dict["avb_" + partition + "_key_path"]
    key = info_dict["avb_" + partition + "_key_path"]
@@ -1507,7 +1576,10 @@ def GetAvbChainedPartitionArg(partition, info_dict, key=None):
  pubkey_path = ExtractAvbPublicKey(info_dict["avb_avbtool"], key)
  pubkey_path = ExtractAvbPublicKey(info_dict["avb_avbtool"], key)
  rollback_index_location = info_dict[
  rollback_index_location = info_dict[
      "avb_" + partition + "_rollback_index_location"]
      "avb_" + partition + "_rollback_index_location"]
  return "{}:{}:{}".format(partition, rollback_index_location, pubkey_path)
  return AvbChainedPartitionArg(
      partition=partition,
      rollback_index_location=int(rollback_index_location),
      pubkey_path=pubkey_path)




def _HasGkiCertificationArgs():
def _HasGkiCertificationArgs():
@@ -1554,7 +1626,8 @@ def _GenerateGkiCertificate(image, image_name):
  return data
  return data




def BuildVBMeta(image_path, partitions, name, needed_partitions):
def BuildVBMeta(image_path, partitions, name, needed_partitions,
                resolve_rollback_index_location_conflict=False):
  """Creates a VBMeta image.
  """Creates a VBMeta image.


  It generates the requested VBMeta image. The requested image could be for
  It generates the requested VBMeta image. The requested image could be for
@@ -1569,6 +1642,8 @@ def BuildVBMeta(image_path, partitions, name, needed_partitions):
    name: Name of the VBMeta partition, e.g. 'vbmeta', 'vbmeta_system'.
    name: Name of the VBMeta partition, e.g. 'vbmeta', 'vbmeta_system'.
    needed_partitions: Partitions whose descriptors should be included into the
    needed_partitions: Partitions whose descriptors should be included into the
        generated VBMeta image.
        generated VBMeta image.
    resolve_rollback_index_location_conflict: If true, resolve conflicting avb
        rollback index locations by assigning the smallest unused value.


  Raises:
  Raises:
    AssertionError: On invalid input args.
    AssertionError: On invalid input args.
@@ -1582,6 +1657,7 @@ def BuildVBMeta(image_path, partitions, name, needed_partitions):
  custom_avb_partitions = ["vbmeta_" + part for part in OPTIONS.info_dict.get(
  custom_avb_partitions = ["vbmeta_" + part for part in OPTIONS.info_dict.get(
      "avb_custom_vbmeta_images_partition_list", "").strip().split()]
      "avb_custom_vbmeta_images_partition_list", "").strip().split()]


  avb_partitions = {}
  for partition, path in partitions.items():
  for partition, path in partitions.items():
    if partition not in needed_partitions:
    if partition not in needed_partitions:
      continue
      continue
@@ -1592,7 +1668,9 @@ def BuildVBMeta(image_path, partitions, name, needed_partitions):
        'Unknown partition: {}'.format(partition)
        'Unknown partition: {}'.format(partition)
    assert os.path.exists(path), \
    assert os.path.exists(path), \
        'Failed to find {} for {}'.format(path, partition)
        'Failed to find {} for {}'.format(path, partition)
    cmd.extend(GetAvbPartitionArg(partition, path))
    avb_partitions[partition] = path
  cmd.extend(GetAvbPartitionsArg(avb_partitions,
                                 resolve_rollback_index_location_conflict))


  args = OPTIONS.info_dict.get("avb_{}_args".format(name))
  args = OPTIONS.info_dict.get("avb_{}_args".format(name))
  if args and args.strip():
  if args and args.strip():
+10 −0
Original line number Original line Diff line number Diff line
@@ -90,6 +90,10 @@ Usage: merge_target_files [args]
  --keep-tmp
  --keep-tmp
      Keep tempoary files for debugging purposes.
      Keep tempoary files for debugging purposes.


  --avb-resolve-rollback-index-location-conflict
      If provided, resolve the conflict AVB rollback index location when
      necessary.

  The following only apply when using the VSDK to perform dexopt on vendor apps:
  The following only apply when using the VSDK to perform dexopt on vendor apps:


  --framework-dexpreopt-config
  --framework-dexpreopt-config
@@ -144,6 +148,7 @@ OPTIONS.allow_duplicate_apkapex_keys = False
OPTIONS.vendor_otatools = None
OPTIONS.vendor_otatools = None
OPTIONS.rebuild_sepolicy = False
OPTIONS.rebuild_sepolicy = False
OPTIONS.keep_tmp = False
OPTIONS.keep_tmp = False
OPTIONS.avb_resolve_rollback_index_location_conflict = False
OPTIONS.framework_dexpreopt_config = None
OPTIONS.framework_dexpreopt_config = None
OPTIONS.framework_dexpreopt_tools = None
OPTIONS.framework_dexpreopt_tools = None
OPTIONS.vendor_dexpreopt_config = None
OPTIONS.vendor_dexpreopt_config = None
@@ -230,6 +235,8 @@ def generate_missing_images(target_files_dir):
  ]
  ]
  if OPTIONS.rebuild_recovery:
  if OPTIONS.rebuild_recovery:
    add_img_args.append('--rebuild_recovery')
    add_img_args.append('--rebuild_recovery')
  if OPTIONS.avb_resolve_rollback_index_location_conflict:
    add_img_args.append('--avb_resolve_rollback_index_location_conflict')
  add_img_args.append(target_files_dir)
  add_img_args.append(target_files_dir)


  add_img_to_target_files.main(add_img_args)
  add_img_to_target_files.main(add_img_args)
@@ -554,6 +561,8 @@ def main():
      OPTIONS.rebuild_sepolicy = True
      OPTIONS.rebuild_sepolicy = True
    elif o == '--keep-tmp':
    elif o == '--keep-tmp':
      OPTIONS.keep_tmp = True
      OPTIONS.keep_tmp = True
    elif o == '--avb-resolve-rollback-index-location-conflict':
      OPTIONS.avb_resolve_rollback_index_location_conflict = True
    elif o == '--framework-dexpreopt-config':
    elif o == '--framework-dexpreopt-config':
      OPTIONS.framework_dexpreopt_config = a
      OPTIONS.framework_dexpreopt_config = a
    elif o == '--framework-dexpreopt-tools':
    elif o == '--framework-dexpreopt-tools':
@@ -593,6 +602,7 @@ def main():
          'vendor-otatools=',
          'vendor-otatools=',
          'rebuild-sepolicy',
          'rebuild-sepolicy',
          'keep-tmp',
          'keep-tmp',
          'avb-resolve-rollback-index-location-conflict',
      ],
      ],
      extra_option_handler=option_handler)
      extra_option_handler=option_handler)


+23 −26
Original line number Original line Diff line number Diff line
@@ -1299,11 +1299,11 @@ class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
        'avb_system_key_path': pubkey,
        'avb_system_key_path': pubkey,
        'avb_system_rollback_index_location': 2,
        'avb_system_rollback_index_location': 2,
    }
    }
    args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
    chained_partition_args = common.GetAvbChainedPartitionArg(
    self.assertEqual(3, len(args))
        'system', info_dict)
    self.assertEqual('system', args[0])
    self.assertEqual('system', chained_partition_args.partition)
    self.assertEqual('2', args[1])
    self.assertEqual(2, chained_partition_args.rollback_index_location)
    self.assertTrue(os.path.exists(args[2]))
    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))


  @test_utils.SkipIfExternalToolsUnavailable()
  @test_utils.SkipIfExternalToolsUnavailable()
  def test_GetAvbChainedPartitionArg_withPrivateKey(self):
  def test_GetAvbChainedPartitionArg_withPrivateKey(self):
@@ -1313,11 +1313,11 @@ class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
        'avb_product_key_path': key,
        'avb_product_key_path': key,
        'avb_product_rollback_index_location': 2,
        'avb_product_rollback_index_location': 2,
    }
    }
    args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
    chained_partition_args = common.GetAvbChainedPartitionArg(
    self.assertEqual(3, len(args))
        'product', info_dict)
    self.assertEqual('product', args[0])
    self.assertEqual('product', chained_partition_args.partition)
    self.assertEqual('2', args[1])
    self.assertEqual(2, chained_partition_args.rollback_index_location)
    self.assertTrue(os.path.exists(args[2]))
    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))


  @test_utils.SkipIfExternalToolsUnavailable()
  @test_utils.SkipIfExternalToolsUnavailable()
  def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
  def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
@@ -1327,12 +1327,11 @@ class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
        'avb_system_rollback_index_location': 2,
        'avb_system_rollback_index_location': 2,
    }
    }
    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
    args = common.GetAvbChainedPartitionArg(
    chained_partition_args = common.GetAvbChainedPartitionArg(
        'system', info_dict, pubkey).split(':')
        'system', info_dict, pubkey)
    self.assertEqual(3, len(args))
    self.assertEqual('system', chained_partition_args.partition)
    self.assertEqual('system', args[0])
    self.assertEqual(2, chained_partition_args.rollback_index_location)
    self.assertEqual('2', args[1])
    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
    self.assertTrue(os.path.exists(args[2]))


  @test_utils.SkipIfExternalToolsUnavailable()
  @test_utils.SkipIfExternalToolsUnavailable()
  def test_GetAvbChainedPartitionArg_invalidKey(self):
  def test_GetAvbChainedPartitionArg_invalidKey(self):
@@ -1600,11 +1599,10 @@ class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
    cmd = common.GetAvbPartitionArg('vendor', '/path/to/vendor.img', info_dict)
    cmd = common.GetAvbPartitionArg('vendor', '/path/to/vendor.img', info_dict)
    self.assertEqual(2, len(cmd))
    self.assertEqual(2, len(cmd))
    self.assertEqual(common.AVB_ARG_NAME_CHAIN_PARTITION, cmd[0])
    self.assertEqual(common.AVB_ARG_NAME_CHAIN_PARTITION, cmd[0])
    chained_partition_args = cmd[1].split(':')
    chained_partition_args = cmd[1]
    self.assertEqual(3, len(chained_partition_args))
    self.assertEqual('vendor', chained_partition_args.partition)
    self.assertEqual('vendor', chained_partition_args[0])
    self.assertEqual(5, chained_partition_args.rollback_index_location)
    self.assertEqual('5', chained_partition_args[1])
    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
    self.assertTrue(os.path.exists(chained_partition_args[2]))


  @test_utils.SkipIfExternalToolsUnavailable()
  @test_utils.SkipIfExternalToolsUnavailable()
  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_nonAb(self):
  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_nonAb(self):
@@ -1633,11 +1631,10 @@ class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
        'recovery', '/path/to/recovery.img', info_dict)
        'recovery', '/path/to/recovery.img', info_dict)
    self.assertEqual(2, len(cmd))
    self.assertEqual(2, len(cmd))
    self.assertEqual(common.AVB_ARG_NAME_CHAIN_PARTITION, cmd[0])
    self.assertEqual(common.AVB_ARG_NAME_CHAIN_PARTITION, cmd[0])
    chained_partition_args = cmd[1].split(':')
    chained_partition_args = cmd[1]
    self.assertEqual(3, len(chained_partition_args))
    self.assertEqual('recovery', chained_partition_args.partition)
    self.assertEqual('recovery', chained_partition_args[0])
    self.assertEqual(3, chained_partition_args.rollback_index_location)
    self.assertEqual('3', chained_partition_args[1])
    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
    self.assertTrue(os.path.exists(chained_partition_args[2]))


  def test_GenerateGkiCertificate_KeyPathNotFound(self):
  def test_GenerateGkiCertificate_KeyPathNotFound(self):
    pubkey = os.path.join(self.testdata_dir, 'no_testkey_gki.pem')
    pubkey = os.path.join(self.testdata_dir, 'no_testkey_gki.pem')
+2 −1
Original line number Original line Diff line number Diff line
@@ -430,7 +430,8 @@ def ValidateVerifiedBootImages(input_tmp, info_dict, options):
        key_file = options.get(key_name, info_dict[key_name])
        key_file = options.get(key_name, info_dict[key_name])
        chained_partition_arg = common.GetAvbChainedPartitionArg(
        chained_partition_arg = common.GetAvbChainedPartitionArg(
            partition, info_dict, key_file)
            partition, info_dict, key_file)
        cmd.extend(['--expected_chain_partition', chained_partition_arg])
        cmd.extend(['--expected_chain_partition',
                    chained_partition_arg.to_string()])


    # Handle the boot image with a non-default name, e.g. boot-5.4.img
    # Handle the boot image with a non-default name, e.g. boot-5.4.img
    boot_images = info_dict.get("boot_images")
    boot_images = info_dict.get("boot_images")