Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e128734f authored by Tao Bao's avatar Tao Bao Committed by Gerrit Code Review
Browse files

Merge "releasetools: Add AbOtaPropertyFiles."

parents 1f4f6720 b6304675
Loading
Loading
Loading
Loading
+107 −0
Original line number Diff line number Diff line
@@ -159,6 +159,7 @@ import multiprocessing
import os.path
import shlex
import shutil
import struct
import subprocess
import sys
import tempfile
@@ -1063,6 +1064,7 @@ class PropertyFiles(object):
      return '%s:%d:%d' % (os.path.basename(name), offset, size)

    tokens = []
    tokens.extend(self._GetPrecomputed(zip_file))
    for entry in self.required:
      tokens.append(ComputeEntryOffsetSize(entry))
    for entry in self.optional:
@@ -1081,6 +1083,23 @@ class PropertyFiles(object):

    return ','.join(tokens)

  def _GetPrecomputed(self, input_zip):
    """Computes the additional tokens to be included into the property-files.

    This applies to tokens without actual ZIP entries, such as
    payload_metadadata.bin. We want to expose the offset/size to updaters, so
    that they can download the payload metadata directly with the info.

    Args:
      input_zip: The input zip file.

    Returns:
      A list of strings (tokens) to be added to the property-files string.
    """
    # pylint: disable=no-self-use
    # pylint: disable=unused-argument
    return []


class StreamingPropertyFiles(PropertyFiles):
  """A subclass for computing the property-files for streaming A/B OTAs."""
@@ -1101,6 +1120,89 @@ class StreamingPropertyFiles(PropertyFiles):
    )


class AbOtaPropertyFiles(StreamingPropertyFiles):
  """The property-files for A/B OTA that includes payload_metadata.bin info.

  Since P, we expose one more token (aka property-file), in addition to the ones
  for streaming A/B OTA, for a virtual entry of 'payload_metadata.bin'.
  'payload_metadata.bin' is the header part of a payload ('payload.bin'), which
  doesn't exist as a separate ZIP entry, but can be used to verify if the
  payload can be applied on the given device.

  For backward compatibility, we keep both of the 'ota-streaming-property-files'
  and the newly added 'ota-property-files' in P. The new token will only be
  available in 'ota-property-files'.
  """

  def __init__(self):
    super(AbOtaPropertyFiles, self).__init__()
    self.name = 'ota-property-files'

  def _GetPrecomputed(self, input_zip):
    offset, size = self._GetPayloadMetadataOffsetAndSize(input_zip)
    return ['payload_metadata.bin:{}:{}'.format(offset, size)]

  @staticmethod
  def _GetPayloadMetadataOffsetAndSize(input_zip):
    """Computes the offset and size of the payload metadata for a given package.

    (From system/update_engine/update_metadata.proto)
    A delta update file contains all the deltas needed to update a system from
    one specific version to another specific version. The update format is
    represented by this struct pseudocode:

    struct delta_update_file {
      char magic[4] = "CrAU";
      uint64 file_format_version;
      uint64 manifest_size;  // Size of protobuf DeltaArchiveManifest

      // Only present if format_version > 1:
      uint32 metadata_signature_size;

      // The Bzip2 compressed DeltaArchiveManifest
      char manifest[metadata_signature_size];

      // The signature of the metadata (from the beginning of the payload up to
      // this location, not including the signature itself). This is a
      // serialized Signatures message.
      char medatada_signature_message[metadata_signature_size];

      // Data blobs for files, no specific format. The specific offset
      // and length of each data blob is recorded in the DeltaArchiveManifest.
      struct {
        char data[];
      } blobs[];

      // These two are not signed:
      uint64 payload_signatures_message_size;
      char payload_signatures_message[];
    };

    'payload-metadata.bin' contains all the bytes from the beginning of the
    payload, till the end of 'medatada_signature_message'.
    """
    payload_info = input_zip.getinfo('payload.bin')
    payload_offset = payload_info.header_offset + len(payload_info.FileHeader())
    payload_size = payload_info.file_size

    with input_zip.open('payload.bin', 'r') as payload_fp:
      header_bin = payload_fp.read(24)

    # network byte order (big-endian)
    header = struct.unpack("!IQQL", header_bin)

    # 'CrAU'
    magic = header[0]
    assert magic == 0x43724155, "Invalid magic: {:x}".format(magic)

    manifest_size = header[2]
    metadata_signature_size = header[3]
    metadata_total = 24 + manifest_size + metadata_signature_size
    assert metadata_total < payload_size

    return (payload_offset, metadata_total)


def FinalizeMetadata(metadata, input_file, output_file, needed_property_files):
  """Finalizes the metadata and signs an A/B OTA package.

@@ -1573,7 +1675,12 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file,
  # FinalizeMetadata().
  common.ZipClose(output_zip)

  # AbOtaPropertyFiles intends to replace StreamingPropertyFiles, as it covers
  # all the info of the latter. However, system updaters and OTA servers need to
  # take time to switch to the new flag. We keep both of the flags for
  # P-timeframe, and will remove StreamingPropertyFiles in later release.
  needed_property_files = (
      AbOtaPropertyFiles(),
      StreamingPropertyFiles(),
  )
  FinalizeMetadata(metadata, staging_file, output_file, needed_property_files)
+149 −1
Original line number Diff line number Diff line
@@ -17,13 +17,14 @@
import copy
import os
import os.path
import subprocess
import unittest
import zipfile

import common
import test_utils
from ota_from_target_files import (
    _LoadOemDicts, BuildInfo, GetPackageMetadata,
    _LoadOemDicts, AbOtaPropertyFiles, BuildInfo, GetPackageMetadata,
    GetTargetFilesZipForSecondaryImages,
    GetTargetFilesZipWithoutPostinstallConfig,
    Payload, PayloadSigner, POSTINSTALL_CONFIG, PropertyFiles,
@@ -842,6 +843,153 @@ class StreamingPropertyFilesTest(PropertyFilesTest):
          AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')


class AbOtaPropertyFilesTest(PropertyFilesTest):
  """Additional sanity checks specialized for AbOtaPropertyFiles."""

  # The size for payload and metadata signature size.
  SIGNATURE_SIZE = 256

  def setUp(self):
    self.testdata_dir = test_utils.get_testdata_dir()
    self.assertTrue(os.path.exists(self.testdata_dir))

    common.OPTIONS.wipe_user_data = False
    common.OPTIONS.payload_signer = None
    common.OPTIONS.payload_signer_args = None
    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
    common.OPTIONS.key_passwords = {
        common.OPTIONS.package_key : None,
    }

  def test_init(self):
    property_files = AbOtaPropertyFiles()
    self.assertEqual('ota-property-files', property_files.name)
    self.assertEqual(
        (
            'payload.bin',
            'payload_properties.txt',
        ),
        property_files.required)
    self.assertEqual(
        (
            'care_map.txt',
            'compatibility.zip',
        ),
        property_files.optional)

  def test_GetPayloadMetadataOffsetAndSize(self):
    target_file = construct_target_files()
    payload = Payload()
    payload.Generate(target_file)

    payload_signer = PayloadSigner()
    payload.Sign(payload_signer)

    output_file = common.MakeTempFile(suffix='.zip')
    with zipfile.ZipFile(output_file, 'w') as output_zip:
      payload.WriteToZip(output_zip)

    # Find out the payload metadata offset and size.
    property_files = AbOtaPropertyFiles()
    with zipfile.ZipFile(output_file) as input_zip:
      # pylint: disable=protected-access
      payload_offset, metadata_total = (
          property_files._GetPayloadMetadataOffsetAndSize(input_zip))

    # Read in the metadata signature directly.
    with open(output_file, 'rb') as verify_fp:
      verify_fp.seek(payload_offset + metadata_total - self.SIGNATURE_SIZE)
      metadata_signature = verify_fp.read(self.SIGNATURE_SIZE)

    # Now we extract the metadata hash via brillo_update_payload script, which
    # will serve as the oracle result.
    payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
    metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
    cmd = ['brillo_update_payload', 'hash',
           '--unsigned_payload', payload.payload_file,
           '--signature_size', str(self.SIGNATURE_SIZE),
           '--metadata_hash_file', metadata_sig_file,
           '--payload_hash_file', payload_sig_file]
    proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdoutdata, _ = proc.communicate()
    self.assertEqual(
        0, proc.returncode,
        'Failed to run brillo_update_payload: {}'.format(stdoutdata))

    signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)

    # Finally we can compare the two signatures.
    with open(signed_metadata_sig_file, 'rb') as verify_fp:
      self.assertEqual(verify_fp.read(), metadata_signature)

  @staticmethod
  def _construct_zip_package_withValidPayload(with_metadata=False):
    # Cannot use _construct_zip_package() since we need a "valid" payload.bin.
    target_file = construct_target_files()
    payload = Payload()
    payload.Generate(target_file)

    payload_signer = PayloadSigner()
    payload.Sign(payload_signer)

    zip_file = common.MakeTempFile(suffix='.zip')
    with zipfile.ZipFile(zip_file, 'w') as zip_fp:
      # 'payload.bin',
      payload.WriteToZip(zip_fp)

      # Other entries.
      entries = ['care_map.txt', 'compatibility.zip']

      # Put META-INF/com/android/metadata if needed.
      if with_metadata:
        entries.append('META-INF/com/android/metadata')

      for entry in entries:
        zip_fp.writestr(
            entry, entry.replace('.', '-').upper(), zipfile.ZIP_STORED)

    return zip_file

  def test_Compute(self):
    zip_file = self._construct_zip_package_withValidPayload()
    property_files = AbOtaPropertyFiles()
    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
      property_files_string = property_files.Compute(zip_fp)

    tokens = self._parse_property_files_string(property_files_string)
    # "6" indcludes the four entries above, one metadata entry, and one entry
    # for payload-metadata.bin.
    self.assertEqual(6, len(tokens))
    self._verify_entries(
        zip_file, tokens, ('care_map.txt', 'compatibility.zip'))

  def test_Finalize(self):
    zip_file = self._construct_zip_package_withValidPayload(with_metadata=True)
    property_files = AbOtaPropertyFiles()
    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
      # pylint: disable=protected-access
      raw_metadata = property_files._GetPropertyFilesString(
          zip_fp, reserve_space=False)
      property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))

    tokens = self._parse_property_files_string(property_files_string)
    # "6" indcludes the four entries above, one metadata entry, and one entry
    # for payload-metadata.bin.
    self.assertEqual(6, len(tokens))
    self._verify_entries(
        zip_file, tokens, ('care_map.txt', 'compatibility.zip'))

  def test_Verify(self):
    zip_file = self._construct_zip_package_withValidPayload(with_metadata=True)
    property_files = AbOtaPropertyFiles()
    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
      # pylint: disable=protected-access
      raw_metadata = property_files._GetPropertyFilesString(
          zip_fp, reserve_space=False)

      property_files.Verify(zip_fp, raw_metadata)


class PayloadSignerTest(unittest.TestCase):

  SIGFILE = 'sigfile.bin'