Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e5cacd76 authored by Tao Bao's avatar Tao Bao Committed by android-build-merger
Browse files

Merge "releasetools: Add Payload class."

am: 80ac71ba

Change-Id: Iaa68c95cf0ccbfe7895881a16a9e0c2bc759929c
parents 6c831834 80ac71ba
Loading
Loading
Loading
Loading
+126 −71
Original line number Diff line number Diff line
@@ -360,6 +360,122 @@ class PayloadSigner(object):
    return out_file


class Payload(object):
  """Manages the creation and the signing of an A/B OTA Payload."""

  PAYLOAD_BIN = 'payload.bin'
  PAYLOAD_PROPERTIES_TXT = 'payload_properties.txt'

  def __init__(self):
    # The place where the output from the subprocess should go.
    self._log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE
    self.payload_file = None
    self.payload_properties = None

  def Generate(self, target_file, source_file=None, additional_args=None):
    """Generates a payload from the given target-files zip(s).

    Args:
      target_file: The filename of the target build target-files zip.
      source_file: The filename of the source build target-files zip; or None if
          generating a full OTA.
      additional_args: A list of additional args that should be passed to
          brillo_update_payload script; or None.
    """
    if additional_args is None:
      additional_args = []

    payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin")
    cmd = ["brillo_update_payload", "generate",
           "--payload", payload_file,
           "--target_image", target_file]
    if source_file is not None:
      cmd.extend(["--source_image", source_file])
    cmd.extend(additional_args)
    p = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
    stdoutdata, _ = p.communicate()
    assert p.returncode == 0, \
        "brillo_update_payload generate failed: {}".format(stdoutdata)

    self.payload_file = payload_file
    self.payload_properties = None

  def Sign(self, payload_signer):
    """Generates and signs the hashes of the payload and metadata.

    Args:
      payload_signer: A PayloadSigner() instance that serves the signing work.

    Raises:
      AssertionError: On any failure when calling brillo_update_payload script.
    """
    assert isinstance(payload_signer, PayloadSigner)

    # 1. Generate hashes of the payload and metadata files.
    payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
    metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
    cmd = ["brillo_update_payload", "hash",
           "--unsigned_payload", self.payload_file,
           "--signature_size", "256",
           "--metadata_hash_file", metadata_sig_file,
           "--payload_hash_file", payload_sig_file]
    p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
    p1.communicate()
    assert p1.returncode == 0, "brillo_update_payload hash failed"

    # 2. Sign the hashes.
    signed_payload_sig_file = payload_signer.Sign(payload_sig_file)
    signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)

    # 3. Insert the signatures back into the payload file.
    signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
                                              suffix=".bin")
    cmd = ["brillo_update_payload", "sign",
           "--unsigned_payload", self.payload_file,
           "--payload", signed_payload_file,
           "--signature_size", "256",
           "--metadata_signature_file", signed_metadata_sig_file,
           "--payload_signature_file", signed_payload_sig_file]
    p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
    p1.communicate()
    assert p1.returncode == 0, "brillo_update_payload sign failed"

    # 4. Dump the signed payload properties.
    properties_file = common.MakeTempFile(prefix="payload-properties-",
                                          suffix=".txt")
    cmd = ["brillo_update_payload", "properties",
           "--payload", signed_payload_file,
           "--properties_file", properties_file]
    p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
    p1.communicate()
    assert p1.returncode == 0, "brillo_update_payload properties failed"

    if OPTIONS.wipe_user_data:
      with open(properties_file, "a") as f:
        f.write("POWERWASH=1\n")

    self.payload_file = signed_payload_file
    self.payload_properties = properties_file

  def WriteToZip(self, output_zip):
    """Writes the payload to the given zip.

    Args:
      output_zip: The output ZipFile instance.
    """
    assert self.payload_file is not None
    assert self.payload_properties is not None

    # Add the signed payload file and properties into the zip. In order to
    # support streaming, we pack them as ZIP_STORED. So these entries can be
    # read directly with the offset and length pairs.
    common.ZipWrite(output_zip, self.payload_file, arcname=Payload.PAYLOAD_BIN,
                    compress_type=zipfile.ZIP_STORED)
    common.ZipWrite(output_zip, self.payload_properties,
                    arcname=Payload.PAYLOAD_PROPERTIES_TXT,
                    compress_type=zipfile.ZIP_STORED)


def SignOutput(temp_zip_name, output_zip_name):
  pw = OPTIONS.key_passwords[OPTIONS.package_key]

@@ -1122,12 +1238,6 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file,
      value += ' ' * (expected_length - len(value))
    return value

  # The place where the output from the subprocess should go.
  log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE

  # Get the PayloadSigner to be used in step 3.
  payload_signer = PayloadSigner()

  # Stage the output zip package for package signing.
  temp_zip_file = tempfile.NamedTemporaryFile()
  output_zip = zipfile.ZipFile(temp_zip_file, "w",
@@ -1143,72 +1253,15 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file,
  # Metadata to comply with Android OTA package format.
  metadata = GetPackageMetadata(target_info, source_info)

  # 1. Generate payload.
  payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin")
  cmd = ["brillo_update_payload", "generate",
         "--payload", payload_file,
         "--target_image", target_file]
  if source_file is not None:
    cmd.extend(["--source_image", source_file])
  p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
  p1.communicate()
  assert p1.returncode == 0, "brillo_update_payload generate failed"

  # 2. Generate hashes of the payload and metadata files.
  payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
  metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
  cmd = ["brillo_update_payload", "hash",
         "--unsigned_payload", payload_file,
         "--signature_size", "256",
         "--metadata_hash_file", metadata_sig_file,
         "--payload_hash_file", payload_sig_file]
  p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
  p1.communicate()
  assert p1.returncode == 0, "brillo_update_payload hash failed"
  # Generate payload.
  payload = Payload()
  payload.Generate(target_file, source_file)

  # 3. Sign the hashes and insert them back into the payload file.
  # 3a. Sign the payload hash.
  signed_payload_sig_file = payload_signer.Sign(payload_sig_file)
  # Sign the payload.
  payload.Sign(PayloadSigner())

  # 3b. Sign the metadata hash.
  signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)

  # 3c. Insert the signatures back into the payload file.
  signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
                                            suffix=".bin")
  cmd = ["brillo_update_payload", "sign",
         "--unsigned_payload", payload_file,
         "--payload", signed_payload_file,
         "--signature_size", "256",
         "--metadata_signature_file", signed_metadata_sig_file,
         "--payload_signature_file", signed_payload_sig_file]
  p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
  p1.communicate()
  assert p1.returncode == 0, "brillo_update_payload sign failed"

  # 4. Dump the signed payload properties.
  properties_file = common.MakeTempFile(prefix="payload-properties-",
                                        suffix=".txt")
  cmd = ["brillo_update_payload", "properties",
         "--payload", signed_payload_file,
         "--properties_file", properties_file]
  p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
  p1.communicate()
  assert p1.returncode == 0, "brillo_update_payload properties failed"

  if OPTIONS.wipe_user_data:
    with open(properties_file, "a") as f:
      f.write("POWERWASH=1\n")

  # Add the signed payload file and properties into the zip. In order to
  # support streaming, we pack payload.bin, payload_properties.txt and
  # care_map.txt as ZIP_STORED. So these entries can be read directly with
  # the offset and length pairs.
  common.ZipWrite(output_zip, signed_payload_file, arcname="payload.bin",
                  compress_type=zipfile.ZIP_STORED)
  common.ZipWrite(output_zip, properties_file,
                  arcname="payload_properties.txt",
                  compress_type=zipfile.ZIP_STORED)
  # Write the payload into output zip.
  payload.WriteToZip(output_zip)

  # If dm-verity is supported for the device, copy contents of care_map
  # into A/B OTA package.
@@ -1219,6 +1272,8 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file,
    namelist = target_zip.namelist()
    if care_map_path in namelist:
      care_map_data = target_zip.read(care_map_path)
      # In order to support streaming, care_map.txt needs to be packed as
      # ZIP_STORED.
      common.ZipWriteStr(output_zip, "care_map.txt", care_map_data,
                         compress_type=zipfile.ZIP_STORED)
    else:
+157 −1
Original line number Diff line number Diff line
@@ -15,12 +15,14 @@
#

import copy
import os
import os.path
import unittest
import zipfile

import common
from ota_from_target_files import (
    _LoadOemDicts, BuildInfo, GetPackageMetadata, PayloadSigner,
    _LoadOemDicts, BuildInfo, GetPackageMetadata, Payload, PayloadSigner,
    WriteFingerprintAssertion)


@@ -564,3 +566,157 @@ class PayloadSignerTest(unittest.TestCase):

    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
    self._assertFilesEqual(verify_file, signed_file)


class PayloadTest(unittest.TestCase):

  def setUp(self):
    self.testdata_dir = get_testdata_dir()
    self.assertTrue(os.path.exists(self.testdata_dir))

    common.OPTIONS.wipe_user_data = False
    common.OPTIONS.payload_signer = None
    common.OPTIONS.payload_signer_args = None
    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
    common.OPTIONS.key_passwords = {
        common.OPTIONS.package_key : None,
    }

  def tearDown(self):
    common.Cleanup()

  @staticmethod
  def _construct_target_files():
    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
      # META/update_engine_config.txt
      target_files_zip.writestr(
          'META/update_engine_config.txt',
          "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n")

      # META/ab_partitions.txt
      ab_partitions = ['boot', 'system', 'vendor']
      target_files_zip.writestr(
          'META/ab_partitions.txt',
          '\n'.join(ab_partitions))

      # Create dummy images for each of them.
      for partition in ab_partitions:
        target_files_zip.writestr('IMAGES/' + partition + '.img',
                                  os.urandom(len(partition)))

    return target_files

  def _create_payload_full(self):
    target_file = self._construct_target_files()
    payload = Payload()
    payload.Generate(target_file)
    return payload

  def _create_payload_incremental(self):
    target_file = self._construct_target_files()
    source_file = self._construct_target_files()
    payload = Payload()
    payload.Generate(target_file, source_file)
    return payload

  def test_Generate_full(self):
    payload = self._create_payload_full()
    self.assertTrue(os.path.exists(payload.payload_file))

  def test_Generate_incremental(self):
    payload = self._create_payload_incremental()
    self.assertTrue(os.path.exists(payload.payload_file))

  def test_Generate_additionalArgs(self):
    target_file = self._construct_target_files()
    source_file = self._construct_target_files()
    payload = Payload()
    # This should work the same as calling payload.Generate(target_file,
    # source_file).
    payload.Generate(
        target_file, additional_args=["--source_image", source_file])
    self.assertTrue(os.path.exists(payload.payload_file))

  def test_Generate_invalidInput(self):
    target_file = self._construct_target_files()
    common.ZipDelete(target_file, 'IMAGES/vendor.img')
    payload = Payload()
    self.assertRaises(AssertionError, payload.Generate, target_file)

  def test_Sign_full(self):
    payload = self._create_payload_full()
    payload.Sign(PayloadSigner())

    output_file = common.MakeTempFile(suffix='.zip')
    with zipfile.ZipFile(output_file, 'w') as output_zip:
      payload.WriteToZip(output_zip)

    import check_ota_package_signature
    check_ota_package_signature.VerifyAbOtaPayload(
        os.path.join(self.testdata_dir, 'testkey.x509.pem'),
        output_file)

  def test_Sign_incremental(self):
    payload = self._create_payload_incremental()
    payload.Sign(PayloadSigner())

    output_file = common.MakeTempFile(suffix='.zip')
    with zipfile.ZipFile(output_file, 'w') as output_zip:
      payload.WriteToZip(output_zip)

    import check_ota_package_signature
    check_ota_package_signature.VerifyAbOtaPayload(
        os.path.join(self.testdata_dir, 'testkey.x509.pem'),
        output_file)

  def test_Sign_withDataWipe(self):
    common.OPTIONS.wipe_user_data = True
    payload = self._create_payload_full()
    payload.Sign(PayloadSigner())

    with open(payload.payload_properties) as properties_fp:
      self.assertIn("POWERWASH=1", properties_fp.read())

  def test_Sign_badSigner(self):
    """Tests that signing failure can be captured."""
    payload = self._create_payload_full()
    payload_signer = PayloadSigner()
    payload_signer.signer_args.append('bad-option')
    self.assertRaises(AssertionError, payload.Sign, payload_signer)

  def test_WriteToZip(self):
    payload = self._create_payload_full()
    payload.Sign(PayloadSigner())

    output_file = common.MakeTempFile(suffix='.zip')
    with zipfile.ZipFile(output_file, 'w') as output_zip:
      payload.WriteToZip(output_zip)

    with zipfile.ZipFile(output_file) as verify_zip:
      # First make sure we have the essential entries.
      namelist = verify_zip.namelist()
      self.assertIn(Payload.PAYLOAD_BIN, namelist)
      self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist)

      # Then assert these entries are stored.
      for entry_info in verify_zip.infolist():
        if entry_info.filename not in (Payload.PAYLOAD_BIN,
                                       Payload.PAYLOAD_PROPERTIES_TXT):
          continue
        self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)

  def test_WriteToZip_unsignedPayload(self):
    """Unsigned payloads should not be allowed to be written to zip."""
    payload = self._create_payload_full()

    output_file = common.MakeTempFile(suffix='.zip')
    with zipfile.ZipFile(output_file, 'w') as output_zip:
      self.assertRaises(AssertionError, payload.WriteToZip, output_zip)

    # Also test with incremental payload.
    payload = self._create_payload_incremental()

    output_file = common.MakeTempFile(suffix='.zip')
    with zipfile.ZipFile(output_file, 'w') as output_zip:
      self.assertRaises(AssertionError, payload.WriteToZip, output_zip)