Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f2ca92d9 authored by Tao Bao's avatar Tao Bao Committed by android-build-merger
Browse files

Merge "Generate OTA packages for A/B update." am: bc6a6682

am: 4f3015f4

* commit '4f3015f4':
  Generate OTA packages for A/B update.
parents c8caf263 4f3015f4
Loading
Loading
Loading
Loading
+153 −9
Original line number Diff line number Diff line
@@ -109,6 +109,7 @@ if sys.hexversion < 0x02070000:

import multiprocessing
import os
import subprocess
import tempfile
import zipfile

@@ -1078,6 +1079,124 @@ def WriteVerifyPackage(input_zip, output_zip):
  WriteMetadata(metadata, output_zip)


def WriteABOTAPackageWithBrilloScript(target_file, output_file,
                                      source_file=None):
  """Generate an Android OTA package that has A/B update payload."""

  # Setup signing keys.
  if OPTIONS.package_key is None:
    OPTIONS.package_key = OPTIONS.info_dict.get(
        "default_system_dev_certificate",
        "build/target/product/security/testkey")

  # A/B updater expects key in RSA format.
  cmd = ["openssl", "pkcs8",
         "-in", OPTIONS.package_key + OPTIONS.private_key_suffix,
         "-inform", "DER", "-nocrypt"]
  rsa_key = common.MakeTempFile(prefix="key-", suffix=".key")
  cmd.extend(["-out", rsa_key])
  p1 = common.Run(cmd, stdout=subprocess.PIPE)
  p1.wait()
  assert p1.returncode == 0, "openssl pkcs8 failed"

  # Stage the output zip package for signing.
  temp_zip_file = tempfile.NamedTemporaryFile()
  output_zip = zipfile.ZipFile(temp_zip_file, "w",
                               compression=zipfile.ZIP_DEFLATED)

  # Metadata to comply with Android OTA package format.
  oem_props = OPTIONS.info_dict.get("oem_fingerprint_properties", None)
  oem_dict = None
  if oem_props:
    if OPTIONS.oem_source is None:
      raise common.ExternalError("OEM source required for this build")
    oem_dict = common.LoadDictionaryFromLines(
        open(OPTIONS.oem_source).readlines())

  metadata = {
      "post-build": CalculateFingerprint(oem_props, oem_dict,
                                         OPTIONS.info_dict),
      "pre-device": GetOemProperty("ro.product.device", oem_props, oem_dict,
                                   OPTIONS.info_dict),
      "post-timestamp": GetBuildProp("ro.build.date.utc", OPTIONS.info_dict),
  }

  if source_file is not None:
    metadata["pre-build"] = CalculateFingerprint(oem_props, oem_dict,
                                                 OPTIONS.source_info_dict)

  # 1. Generate payload.
  payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin")
  cmd = ["brillo_update_payload", "generate",
         "--payload", payload_file,
         "--target_image", target_file]
  if source_file is not None:
    cmd.extend(["--source_image", source_file])
  p1 = common.Run(cmd, stdout=subprocess.PIPE)
  p1.wait()
  assert p1.returncode == 0, "brillo_update_payload generate failed"

  # 2. Generate hashes of the payload and metadata files.
  payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
  metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
  cmd = ["brillo_update_payload", "hash",
         "--unsigned_payload", payload_file,
         "--signature_size", "256",
         "--metadata_hash_file", metadata_sig_file,
         "--payload_hash_file", payload_sig_file]
  p1 = common.Run(cmd, stdout=subprocess.PIPE)
  p1.wait()
  assert p1.returncode == 0, "brillo_update_payload hash failed"

  # 3. Sign the hashes and insert them back into the payload file.
  signed_payload_sig_file = common.MakeTempFile(prefix="signed-sig-",
                                                suffix=".bin")
  signed_metadata_sig_file = common.MakeTempFile(prefix="signed-sig-",
                                                 suffix=".bin")
  # 3a. Sign the payload hash.
  cmd = ["openssl", "pkeyutl", "-sign",
         "-inkey", rsa_key,
         "-pkeyopt", "digest:sha256",
         "-in", payload_sig_file,
         "-out", signed_payload_sig_file]
  p1 = common.Run(cmd, stdout=subprocess.PIPE)
  p1.wait()
  assert p1.returncode == 0, "openssl sign payload failed"

  # 3b. Sign the metadata hash.
  cmd = ["openssl", "pkeyutl", "-sign",
         "-inkey", rsa_key,
         "-pkeyopt", "digest:sha256",
         "-in", metadata_sig_file,
         "-out", signed_metadata_sig_file]
  p1 = common.Run(cmd, stdout=subprocess.PIPE)
  p1.wait()
  assert p1.returncode == 0, "openssl sign metadata failed"

  # 3c. Insert the signatures back into the payload file.
  signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
                                            suffix=".bin")
  cmd = ["brillo_update_payload", "sign",
         "--unsigned_payload", payload_file,
         "--payload", signed_payload_file,
         "--signature_size", "256",
         "--metadata_signature_file", signed_metadata_sig_file,
         "--payload_signature_file", signed_payload_sig_file]
  p1 = common.Run(cmd, stdout=subprocess.PIPE)
  p1.wait()
  assert p1.returncode == 0, "brillo_update_payload sign failed"

  # Add the signed payload file into the zip.
  common.ZipWrite(output_zip, signed_payload_file, arcname="payload.bin",
                  compress_type=zipfile.ZIP_STORED)
  WriteMetadata(metadata, output_zip)

  # Sign the whole package to comply with the Android OTA package format.
  common.ZipClose(output_zip)
  SignOutput(temp_zip_file.name, output_file)
  temp_zip_file.close()


class FileDifference(object):
  def __init__(self, partition, source_zip, target_zip, output_zip):
    self.deferred_patch_list = None
@@ -1683,6 +1802,37 @@ def main(argv):
    common.Usage(__doc__)
    sys.exit(1)

  # Load the dict file from the zip directly to have a peek at the OTA type.
  # For packages using A/B update, unzipping is not needed.
  input_zip = zipfile.ZipFile(args[0], "r")
  OPTIONS.info_dict = common.LoadInfoDict(input_zip)
  common.ZipClose(input_zip)

  ab_update = OPTIONS.info_dict.get("ab_update") == "true"

  if ab_update:
    if OPTIONS.incremental_source is not None:
      OPTIONS.target_info_dict = OPTIONS.info_dict
      source_zip = zipfile.ZipFile(OPTIONS.incremental_source, "r")
      OPTIONS.source_info_dict = common.LoadInfoDict(source_zip)
      common.ZipClose(source_zip)

    if OPTIONS.verbose:
      print "--- target info ---"
      common.DumpInfoDict(OPTIONS.info_dict)

      if OPTIONS.incremental_source is not None:
        print "--- source info ---"
        common.DumpInfoDict(OPTIONS.source_info_dict)

    WriteABOTAPackageWithBrilloScript(
        target_file=args[0],
        output_file=args[1],
        source_file=OPTIONS.incremental_source)

    print "done."
    return

  if OPTIONS.extra_script is not None:
    OPTIONS.extra_script = open(OPTIONS.extra_script).read()

@@ -1714,9 +1864,7 @@ def main(argv):
  if OPTIONS.device_specific is not None:
    OPTIONS.device_specific = os.path.abspath(OPTIONS.device_specific)

  ab_update = OPTIONS.info_dict.get("ab_update") == "true"

  if OPTIONS.info_dict.get("no_recovery") == "true" and not ab_update:
  if OPTIONS.info_dict.get("no_recovery") == "true":
    raise common.ExternalError(
        "--- target build has specified no recovery ---")

@@ -1740,7 +1888,7 @@ def main(argv):

  # Non A/B OTAs rely on /cache partition to store temporary files.
  cache_size = OPTIONS.info_dict.get("cache_size", None)
  if cache_size is None and not ab_update:
  if cache_size is None:
    print "--- can't determine the cache partition size ---"
  OPTIONS.cache_size = cache_size

@@ -1750,10 +1898,6 @@ def main(argv):

  # Generate a full OTA.
  elif OPTIONS.incremental_source is None:
    if ab_update:
      # TODO: Pending for b/25715402.
      pass
    else:
    WriteFullOTAPackage(input_zip, output_zip)

  # Generate an incremental OTA. It will fall back to generate a full OTA on