Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2732413f authored by Kelvin Zhang's avatar Kelvin Zhang Committed by Daniel Norman
Browse files

Moves care map generation logic to common.py, so it can be reused.

Test: th
Bug: 182296208
Change-Id: I045be7cb208412431d6ab1c26e31d38d2285738b
parent 359f09fd
Loading
Loading
Loading
Loading
+4 −108
Original line number Diff line number Diff line
@@ -59,12 +59,11 @@ import zipfile
import build_image
import build_super_image
import common
import rangelib
import sparse_img
import verity_utils
import ota_metadata_pb2

from apex_utils import GetSystemApexInfoFromTargetFiles
from common import AddCareMapForAbOta

if sys.hexversion < 0x02070000:
  print("Python 2.7 or newer is required.", file=sys.stderr)
@@ -110,45 +109,6 @@ class OutputFile(object):
      common.ZipWrite(self._output_zip, self.name, self._zip_name)


def GetCareMap(which, imgname):
  """Returns the care_map string for the given partition.

  Args:
    which: The partition name, must be listed in PARTITIONS_WITH_CARE_MAP.
    imgname: The filename of the image.

  Returns:
    (which, care_map_ranges): care_map_ranges is the raw string of the care_map
    RangeSet; or None.
  """
  assert which in common.PARTITIONS_WITH_CARE_MAP

  # which + "_image_size" contains the size that the actual filesystem image
  # resides in, which is all that needs to be verified. The additional blocks in
  # the image file contain verity metadata, by reading which would trigger
  # invalid reads.
  image_size = OPTIONS.info_dict.get(which + "_image_size")
  if not image_size:
    return None

  image_blocks = int(image_size) // 4096 - 1
  assert image_blocks > 0, "blocks for {} must be positive".format(which)

  # For sparse images, we will only check the blocks that are listed in the care
  # map, i.e. the ones with meaningful data.
  if "extfs_sparse_flag" in OPTIONS.info_dict:
    simg = sparse_img.SparseImage(imgname)
    care_map_ranges = simg.care_map.intersect(
        rangelib.RangeSet("0-{}".format(image_blocks)))

  # Otherwise for non-sparse images, we read all the blocks in the filesystem
  # image.
  else:
    care_map_ranges = rangelib.RangeSet("0-{}".format(image_blocks))

  return [which, care_map_ranges.to_string_raw()]


def AddSystem(output_zip, recovery_img=None, boot_img=None):
  """Turn the contents of SYSTEM into a system image and store it in
  output_zip. Returns the name of the system image file."""
@@ -644,72 +604,6 @@ def CheckAbOtaImages(output_zip, ab_partitions):
    assert available, "Failed to find " + img_name


def AddCareMapForAbOta(output_zip, ab_partitions, image_paths):
  """Generates and adds care_map.pb for a/b partition that has care_map.

  Args:
    output_zip: The output zip file (needs to be already open), or None to
        write care_map.pb to OPTIONS.input_tmp/.
    ab_partitions: The list of A/B partitions.
    image_paths: A map from the partition name to the image path.
  """
  care_map_list = []
  for partition in ab_partitions:
    partition = partition.strip()
    if partition not in common.PARTITIONS_WITH_CARE_MAP:
      continue

    verity_block_device = "{}_verity_block_device".format(partition)
    avb_hashtree_enable = "avb_{}_hashtree_enable".format(partition)
    if (verity_block_device in OPTIONS.info_dict or
        OPTIONS.info_dict.get(avb_hashtree_enable) == "true"):
      image_path = image_paths[partition]
      assert os.path.exists(image_path)

      care_map = GetCareMap(partition, image_path)
      if not care_map:
        continue
      care_map_list += care_map

      # adds fingerprint field to the care_map
      # TODO(xunchang) revisit the fingerprint calculation for care_map.
      partition_props = OPTIONS.info_dict.get(partition + ".build.prop")
      prop_name_list = ["ro.{}.build.fingerprint".format(partition),
                        "ro.{}.build.thumbprint".format(partition)]

      present_props = [x for x in prop_name_list if
                       partition_props and partition_props.GetProp(x)]
      if not present_props:
        logger.warning("fingerprint is not present for partition %s", partition)
        property_id, fingerprint = "unknown", "unknown"
      else:
        property_id = present_props[0]
        fingerprint = partition_props.GetProp(property_id)
      care_map_list += [property_id, fingerprint]

  if not care_map_list:
    return

  # Converts the list into proto buf message by calling care_map_generator; and
  # writes the result to a temp file.
  temp_care_map_text = common.MakeTempFile(prefix="caremap_text-",
                                           suffix=".txt")
  with open(temp_care_map_text, 'w') as text_file:
    text_file.write('\n'.join(care_map_list))

  temp_care_map = common.MakeTempFile(prefix="caremap-", suffix=".pb")
  care_map_gen_cmd = ["care_map_generator", temp_care_map_text, temp_care_map]
  common.RunAndCheckOutput(care_map_gen_cmd)

  care_map_path = "META/care_map.pb"
  if output_zip and care_map_path not in output_zip.namelist():
    common.ZipWrite(output_zip, temp_care_map, arcname=care_map_path)
  else:
    shutil.copy(temp_care_map, os.path.join(OPTIONS.input_tmp, care_map_path))
    if output_zip:
      OPTIONS.replace_updated_files_list.append(care_map_path)


def AddPackRadioImages(output_zip, images):
  """Copies images listed in META/pack_radioimages.txt from RADIO/ to IMAGES/.

@@ -1050,7 +944,9 @@ def AddImagesToTargetFiles(filename):

    # Generate care_map.pb for ab_partitions, then write this file to
    # target_files package.
    AddCareMapForAbOta(output_zip, ab_partitions, partitions)
    output_care_map = os.path.join(OPTIONS.input_tmp, "META", "care_map.pb")
    AddCareMapForAbOta(output_zip if output_zip else output_care_map,
                       ab_partitions, partitions)

  # Radio images that need to be packed into IMAGES/, and product-img.zip.
  pack_radioimages_txt = os.path.join(
+122 −0
Original line number Diff line number Diff line
@@ -41,6 +41,7 @@ import zipfile
from hashlib import sha1, sha256

import images
import rangelib
import sparse_img
from blockimgdiff import BlockImageDiff

@@ -3759,3 +3760,124 @@ def GetBootImageTimestamp(boot_img):
  except ExternalError as e:
    logger.warning('Unable to get boot image timestamp: %s', e)
    return None


def GetCareMap(which, imgname):
  """Returns the care_map string for the given partition.

  Args:
    which: The partition name, must be listed in PARTITIONS_WITH_CARE_MAP.
    imgname: The filename of the image.

  Returns:
    (which, care_map_ranges): care_map_ranges is the raw string of the care_map
    RangeSet; or None.
  """
  assert which in PARTITIONS_WITH_CARE_MAP

  # which + "_image_size" contains the size that the actual filesystem image
  # resides in, which is all that needs to be verified. The additional blocks in
  # the image file contain verity metadata, by reading which would trigger
  # invalid reads.
  image_size = OPTIONS.info_dict.get(which + "_image_size")
  if not image_size:
    return None

  image_blocks = int(image_size) // 4096 - 1
  assert image_blocks > 0, "blocks for {} must be positive".format(which)

  # For sparse images, we will only check the blocks that are listed in the care
  # map, i.e. the ones with meaningful data.
  if "extfs_sparse_flag" in OPTIONS.info_dict:
    simg = sparse_img.SparseImage(imgname)
    care_map_ranges = simg.care_map.intersect(
        rangelib.RangeSet("0-{}".format(image_blocks)))

  # Otherwise for non-sparse images, we read all the blocks in the filesystem
  # image.
  else:
    care_map_ranges = rangelib.RangeSet("0-{}".format(image_blocks))

  return [which, care_map_ranges.to_string_raw()]


def AddCareMapForAbOta(output_file, ab_partitions, image_paths):
  """Generates and adds care_map.pb for a/b partition that has care_map.

  Args:
    output_file: The output zip file (needs to be already open),
        or file path to write care_map.pb.
    ab_partitions: The list of A/B partitions.
    image_paths: A map from the partition name to the image path.
  """
  if not output_file:
    raise ExternalError('Expected output_file for AddCareMapForAbOta')

  care_map_list = []
  for partition in ab_partitions:
    partition = partition.strip()
    if partition not in PARTITIONS_WITH_CARE_MAP:
      continue

    verity_block_device = "{}_verity_block_device".format(partition)
    avb_hashtree_enable = "avb_{}_hashtree_enable".format(partition)
    if (verity_block_device in OPTIONS.info_dict or
            OPTIONS.info_dict.get(avb_hashtree_enable) == "true"):
      if partition not in image_paths:
        logger.warning('Potential partition with care_map missing from images: %s',
                       partition)
        continue
      image_path = image_paths[partition]
      if not os.path.exists(image_path):
        raise ExternalError('Expected image at path {}'.format(image_path))

      care_map = GetCareMap(partition, image_path)
      if not care_map:
        continue
      care_map_list += care_map

      # adds fingerprint field to the care_map
      # TODO(xunchang) revisit the fingerprint calculation for care_map.
      partition_props = OPTIONS.info_dict.get(partition + ".build.prop")
      prop_name_list = ["ro.{}.build.fingerprint".format(partition),
                        "ro.{}.build.thumbprint".format(partition)]

      present_props = [x for x in prop_name_list if
                       partition_props and partition_props.GetProp(x)]
      if not present_props:
        logger.warning(
            "fingerprint is not present for partition %s", partition)
        property_id, fingerprint = "unknown", "unknown"
      else:
        property_id = present_props[0]
        fingerprint = partition_props.GetProp(property_id)
      care_map_list += [property_id, fingerprint]

  if not care_map_list:
    return

  # Converts the list into proto buf message by calling care_map_generator; and
  # writes the result to a temp file.
  temp_care_map_text = MakeTempFile(prefix="caremap_text-",
                                           suffix=".txt")
  with open(temp_care_map_text, 'w') as text_file:
    text_file.write('\n'.join(care_map_list))

  temp_care_map = MakeTempFile(prefix="caremap-", suffix=".pb")
  care_map_gen_cmd = ["care_map_generator", temp_care_map_text, temp_care_map]
  RunAndCheckOutput(care_map_gen_cmd)

  if not isinstance(output_file, zipfile.ZipFile):
    shutil.copy(temp_care_map, output_file)
    return
  # output_file is a zip file
  care_map_path = "META/care_map.pb"
  if care_map_path in output_file.namelist():
    # Copy the temp file into the OPTIONS.input_tmp dir and update the
    # replace_updated_files_list used by add_img_to_target_files
    if not OPTIONS.replace_updated_files_list:
      OPTIONS.replace_updated_files_list = []
    shutil.copy(temp_care_map, os.path.join(OPTIONS.input_tmp, care_map_path))
    OPTIONS.replace_updated_files_list.append(care_map_path)
  else:
    ZipWrite(output_file, temp_care_map, arcname=care_map_path)
+22 −20
Original line number Diff line number Diff line
@@ -21,9 +21,10 @@ import zipfile
import common
import test_utils
from add_img_to_target_files import (
    AddCareMapForAbOta, AddPackRadioImages,
    CheckAbOtaImages, GetCareMap)
    AddPackRadioImages,
    CheckAbOtaImages)
from rangelib import RangeSet
from common import AddCareMapForAbOta, GetCareMap


OPTIONS = common.OPTIONS
@@ -174,9 +175,9 @@ class AddImagesToTargetFilesTest(test_utils.ReleaseToolsTestCase):
  def test_AddCareMapForAbOta(self):
    image_paths = self._test_AddCareMapForAbOta()

    AddCareMapForAbOta(None, ['system', 'vendor'], image_paths)

    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.pb')
    AddCareMapForAbOta(care_map_file, ['system', 'vendor'], image_paths)

    expected = ['system', RangeSet("0-5 10-15").to_string_raw(),
                "ro.system.build.fingerprint",
                "google/sailfish/12345:user/dev-keys",
@@ -191,10 +192,10 @@ class AddImagesToTargetFilesTest(test_utils.ReleaseToolsTestCase):
    """Partitions without care_map should be ignored."""
    image_paths = self._test_AddCareMapForAbOta()

    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.pb')
    AddCareMapForAbOta(
        None, ['boot', 'system', 'vendor', 'vbmeta'], image_paths)
        care_map_file, ['boot', 'system', 'vendor', 'vbmeta'], image_paths)

    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.pb')
    expected = ['system', RangeSet("0-5 10-15").to_string_raw(),
                "ro.system.build.fingerprint",
                "google/sailfish/12345:user/dev-keys",
@@ -226,9 +227,9 @@ class AddImagesToTargetFilesTest(test_utils.ReleaseToolsTestCase):
        ),
    }

    AddCareMapForAbOta(None, ['system', 'vendor'], image_paths)

    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.pb')
    AddCareMapForAbOta(care_map_file, ['system', 'vendor'], image_paths)

    expected = ['system', RangeSet("0-5 10-15").to_string_raw(),
                "ro.system.build.fingerprint",
                "google/sailfish/12345:user/dev-keys",
@@ -250,9 +251,9 @@ class AddImagesToTargetFilesTest(test_utils.ReleaseToolsTestCase):
        'vendor_verity_block_device': '/dev/block/vendor',
    }

    AddCareMapForAbOta(None, ['system', 'vendor'], image_paths)

    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.pb')
    AddCareMapForAbOta(care_map_file, ['system', 'vendor'], image_paths)

    expected = ['system', RangeSet("0-5 10-15").to_string_raw(), "unknown",
                "unknown", 'vendor', RangeSet("0-9").to_string_raw(), "unknown",
                "unknown"]
@@ -281,9 +282,9 @@ class AddImagesToTargetFilesTest(test_utils.ReleaseToolsTestCase):
        ),
    }

    AddCareMapForAbOta(None, ['system', 'vendor'], image_paths)

    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.pb')
    AddCareMapForAbOta(care_map_file, ['system', 'vendor'], image_paths)

    expected = ['system', RangeSet("0-5 10-15").to_string_raw(),
                "ro.system.build.thumbprint",
                "google/sailfish/123:user/dev-keys",
@@ -300,9 +301,9 @@ class AddImagesToTargetFilesTest(test_utils.ReleaseToolsTestCase):
    # Remove vendor_image_size to invalidate the care_map for vendor.img.
    del OPTIONS.info_dict['vendor_image_size']

    AddCareMapForAbOta(None, ['system', 'vendor'], image_paths)

    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.pb')
    AddCareMapForAbOta(care_map_file, ['system', 'vendor'], image_paths)

    expected = ['system', RangeSet("0-5 10-15").to_string_raw(),
                "ro.system.build.fingerprint",
                "google/sailfish/12345:user/dev-keys"]
@@ -317,25 +318,26 @@ class AddImagesToTargetFilesTest(test_utils.ReleaseToolsTestCase):
    del OPTIONS.info_dict['system_image_size']
    del OPTIONS.info_dict['vendor_image_size']

    AddCareMapForAbOta(None, ['system', 'vendor'], image_paths)
    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.pb')
    AddCareMapForAbOta(care_map_file, ['system', 'vendor'], image_paths)

    self.assertFalse(
        os.path.exists(os.path.join(OPTIONS.input_tmp, 'META', 'care_map.pb')))
    self.assertFalse(os.path.exists(care_map_file))

  def test_AddCareMapForAbOta_verityNotEnabled(self):
    """No care_map.pb should be generated if verity not enabled."""
    image_paths = self._test_AddCareMapForAbOta()
    OPTIONS.info_dict = {}
    AddCareMapForAbOta(None, ['system', 'vendor'], image_paths)

    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.pb')
    AddCareMapForAbOta(care_map_file, ['system', 'vendor'], image_paths)

    self.assertFalse(os.path.exists(care_map_file))

  def test_AddCareMapForAbOta_missingImageFile(self):
    """Missing image file should be considered fatal."""
    image_paths = self._test_AddCareMapForAbOta()
    image_paths['vendor'] = ''
    self.assertRaises(AssertionError, AddCareMapForAbOta, None,
    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.pb')
    self.assertRaises(common.ExternalError, AddCareMapForAbOta, care_map_file,
                      ['system', 'vendor'], image_paths)

  @test_utils.SkipIfExternalToolsUnavailable()