Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0573298a authored by Baligh Uddin's avatar Baligh Uddin Committed by Gerrit Code Review
Browse files

Merge changes I3fa13e3d,I7b7f0017,I2ef318e0

* changes:
  releasetools: Support signing APEXes.
  releasetools: Add apex_utils.py.
  releasetools: check_target_files_signatures.py checks APEXes.
parents 015f8313 aa7e993a
Loading
Loading
Loading
Loading
+147 −0
Original line number Diff line number Diff line
#!/usr/bin/env python
#
# Copyright (C) 2019 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os.path
import re
import shlex
import sys

import common

logger = logging.getLogger(__name__)


class ApexInfoError(Exception):
  """An Exception raised during Apex Information command."""

  def __init__(self, message):
    Exception.__init__(self, message)


class ApexSigningError(Exception):
  """An Exception raised during Apex Payload signing."""

  def __init__(self, message):
    Exception.__init__(self, message)


def SignApexPayload(payload_file, payload_key_path, payload_key_name, algorithm,
                    salt, signing_args=None):
  """Signs a given payload_file with the payload key."""
  # Add the new footer. Old footer, if any, will be replaced by avbtool.
  cmd = ['avbtool', 'add_hashtree_footer',
         '--do_not_generate_fec',
         '--algorithm', algorithm,
         '--key', payload_key_path,
         '--prop', 'apex.key:{}'.format(payload_key_name),
         '--image', payload_file,
         '--salt', salt]
  if signing_args:
    cmd.extend(shlex.split(signing_args))

  try:
    common.RunAndCheckOutput(cmd)
  except common.ExternalError as e:
    raise ApexSigningError, \
        'Failed to sign APEX payload {} with {}:\n{}'.format(
            payload_file, payload_key_path, e), sys.exc_info()[2]

  # Verify the signed payload image with specified public key.
  logger.info('Verifying %s', payload_file)
  VerifyApexPayload(payload_file, payload_key_path)


def VerifyApexPayload(payload_file, payload_key):
  """Verifies the APEX payload signature with the given key."""
  cmd = ['avbtool', 'verify_image', '--image', payload_file,
         '--key', payload_key]
  try:
    common.RunAndCheckOutput(cmd)
  except common.ExternalError as e:
    raise ApexSigningError, \
        'Failed to validate payload signing for {} with {}:\n{}'.format(
            payload_file, payload_key, e), sys.exc_info()[2]


def ParseApexPayloadInfo(payload_path):
  """Parses the APEX payload info.

  Args:
    payload_path: The path to the payload image.

  Raises:
    ApexInfoError on parsing errors.

  Returns:
    A dict that contains payload property-value pairs. The dict should at least
    contain Algorithm, Salt and apex.key.
  """
  if not os.path.exists(payload_path):
    raise ApexInfoError('Failed to find image: {}'.format(payload_path))

  cmd = ['avbtool', 'info_image', '--image', payload_path]
  try:
    output = common.RunAndCheckOutput(cmd)
  except common.ExternalError as e:
    raise ApexInfoError, \
        'Failed to get APEX payload info for {}:\n{}'.format(
            payload_path, e), sys.exc_info()[2]

  # Extract the Algorithm / Salt / Prop info from payload (i.e. an image signed
  # with avbtool). For example,
  # Algorithm:                SHA256_RSA4096
  PAYLOAD_INFO_PATTERN = (
      r'^\s*(?P<key>Algorithm|Salt|Prop)\:\s*(?P<value>.*?)$')
  payload_info_matcher = re.compile(PAYLOAD_INFO_PATTERN)

  payload_info = {}
  for line in output.split('\n'):
    line_info = payload_info_matcher.match(line)
    if not line_info:
      continue

    key, value = line_info.group('key'), line_info.group('value')

    if key == 'Prop':
      # Further extract the property key-value pair, from a 'Prop:' line. For
      # example,
      #   Prop: apex.key -> 'com.android.runtime'
      # Note that avbtool writes single or double quotes around values.
      PROPERTY_DESCRIPTOR_PATTERN = r'^\s*(?P<key>.*?)\s->\s*(?P<value>.*?)$'

      prop_matcher = re.compile(PROPERTY_DESCRIPTOR_PATTERN)
      prop = prop_matcher.match(value)
      if not prop:
        raise ApexInfoError(
            'Failed to parse prop string {}'.format(value))

      prop_key, prop_value = prop.group('key'), prop.group('value')
      if prop_key == 'apex.key':
        # avbtool dumps the prop value with repr(), which contains single /
        # double quotes that we don't want.
        payload_info[prop_key] = prop_value.strip('\"\'')

    else:
      payload_info[key] = value

  # Sanity check.
  for key in ('Algorithm', 'Salt', 'apex.key'):
    if key not in payload_info:
      raise ApexInfoError(
          'Failed to find {} prop in {}'.format(key, payload_path))

  return payload_info
+5 −4
Original line number Diff line number Diff line
@@ -168,6 +168,7 @@ def CertFromPKCS7(data, filename):


class APK(object):

  def __init__(self, full_filename, filename):
    self.filename = filename
    self.certs = None
@@ -244,12 +245,12 @@ class TargetFiles(object):
    # must decompress them individually before we perform any analysis.

    # This is the list of wildcards of files we extract from |filename|.
    apk_extensions = ['*.apk']
    apk_extensions = ['*.apk', '*.apex']

    self.certmap, compressed_extension = common.ReadApkCerts(
        zipfile.ZipFile(filename, "r"))
        zipfile.ZipFile(filename))
    if compressed_extension:
      apk_extensions.append("*.apk" + compressed_extension)
      apk_extensions.append('*.apk' + compressed_extension)

    d = common.UnzipTemp(filename, apk_extensions)
    self.apks = {}
@@ -272,7 +273,7 @@ class TargetFiles(object):
          os.remove(os.path.join(dirpath, fn))
          fn = uncompressed_fn

        if fn.endswith(".apk"):
        if fn.endswith(('.apk', '.apex')):
          fullname = os.path.join(dirpath, fn)
          displayname = fullname[len(d)+1:]
          apk = APK(fullname, displayname)
+268 −26
Original line number Diff line number Diff line
@@ -21,11 +21,17 @@ target-files zip.
Usage:  sign_target_files_apks [flags] input_target_files output_target_files

  -e  (--extra_apks)  <name,name,...=key>
      Add extra APK name/key pairs as though they appeared in
      apkcerts.txt (so mappings specified by -k and -d are applied).
      Keys specified in -e override any value for that app contained
      in the apkcerts.txt file.  Option may be repeated to give
      multiple extra packages.
      Add extra APK/APEX name/key pairs as though they appeared in apkcerts.txt
      or apexkeys.txt (so mappings specified by -k and -d are applied). Keys
      specified in -e override any value for that app contained in the
      apkcerts.txt file, or the container key for an APEX. Option may be
      repeated to give multiple extra packages.

  --extra_apex_payload_key <name=key>
      Add a mapping for APEX package name to payload signing key, which will
      override the default payload signing key in apexkeys.txt. Note that the
      container key should be overridden via the `--extra_apks` flag above.
      Option may be repeated for multiple APEXes.

  --skip_apks_with_path_prefix  <prefix>
      Skip signing an APK if it has the matching prefix in its path. The prefix
@@ -90,7 +96,7 @@ Usage: sign_target_files_apks [flags] input_target_files output_target_files
      Use the specified algorithm (e.g. SHA256_RSA4096) and the key to AVB-sign
      the specified image. Otherwise it uses the existing values in info dict.

  --avb_{boot,system,vendor,dtbo,vbmeta}_extra_args <args>
  --avb_{apex,boot,system,vendor,dtbo,vbmeta}_extra_args <args>
      Specify any additional args that are needed to AVB-sign the image
      (e.g. "--signing_helper /path/to/helper"). The args will be appended to
      the existing ones in info dict.
@@ -102,6 +108,7 @@ import base64
import copy
import errno
import gzip
import itertools
import logging
import os
import re
@@ -114,6 +121,7 @@ import zipfile
from xml.etree import ElementTree

import add_img_to_target_files
import apex_utils
import common


@@ -127,6 +135,7 @@ logger = logging.getLogger(__name__)
OPTIONS = common.OPTIONS

OPTIONS.extra_apks = {}
OPTIONS.extra_apex_payload_keys = {}
OPTIONS.skip_apks_with_path_prefix = set()
OPTIONS.key_map = {}
OPTIONS.rebuild_recovery = False
@@ -154,6 +163,41 @@ def GetApkCerts(certmap):
  return certmap


def GetApexKeys(keys_info, key_map):
  """Gets APEX payload and container signing keys by applying the mapping rules.

  We currently don't allow PRESIGNED payload / container keys.

  Args:
    keys_info: A dict that maps from APEX filenames to a tuple of (payload_key,
        container_key).
    key_map: A dict that overrides the keys, specified via command-line input.

  Returns:
    A dict that contains the updated APEX key mapping, which should be used for
    the current signing.
  """
  # Apply all the --extra_apex_payload_key options to override the payload
  # signing keys in the given keys_info.
  for apex, key in OPTIONS.extra_apex_payload_keys.items():
    assert key, 'Presigned APEX payload for {} is not allowed'.format(apex)
    keys_info[apex] = (key, keys_info[apex][1])

  # Apply the key remapping to container keys.
  for apex, (payload_key, container_key) in keys_info.items():
    keys_info[apex] = (payload_key, key_map.get(container_key, container_key))

  # Apply all the --extra_apks options to override the container keys.
  for apex, key in OPTIONS.extra_apks.items():
    # Skip non-APEX containers.
    if apex not in keys_info:
      continue
    assert key, 'Presigned APEX container for {} is not allowed'.format(apex)
    keys_info[apex][1] = key_map.get(key, key)

  return keys_info


def GetApkFileInfo(filename, compressed_extension, skipped_prefixes):
  """Returns the APK info based on the given filename.

@@ -200,34 +244,45 @@ def GetApkFileInfo(filename, compressed_extension, skipped_prefixes):
  return (True, is_compressed, should_be_skipped)


def CheckAllApksSigned(input_tf_zip, apk_key_map, compressed_extension):
  """Checks that all the APKs have keys specified, otherwise errors out.
def CheckApkAndApexKeysAvailable(input_tf_zip, known_keys,
                                 compressed_extension):
  """Checks that all the APKs and APEXes have keys specified.

  Args:
    input_tf_zip: An open target_files zip file.
    apk_key_map: A dict of known signing keys key'd by APK names.
    known_keys: A set of APKs and APEXes that have known signing keys.
    compressed_extension: The extension string of compressed APKs, such as
        ".gz", or None if there's no compressed APKs.
        '.gz', or None if there's no compressed APKs.

  Raises:
    AssertionError: On finding unknown APKs.
    AssertionError: On finding unknown APKs and APEXes.
  """
  unknown_apks = []
  unknown_files = []
  for info in input_tf_zip.infolist():
    # Handle APEXes first, e.g. SYSTEM/apex/com.android.tzdata.apex.
    if (info.filename.startswith('SYSTEM/apex') and
        info.filename.endswith('.apex')):
      name = os.path.basename(info.filename)
      if name not in known_keys:
        unknown_files.append(name)
      continue

    # And APKs.
    (is_apk, is_compressed, should_be_skipped) = GetApkFileInfo(
        info.filename, compressed_extension, OPTIONS.skip_apks_with_path_prefix)
    if not is_apk or should_be_skipped:
      continue

    name = os.path.basename(info.filename)
    if is_compressed:
      name = name[:-len(compressed_extension)]
    if name not in apk_key_map:
      unknown_apks.append(name)
    if name not in known_keys:
      unknown_files.append(name)

  assert not unknown_apks, \
  assert not unknown_files, \
      ("No key specified for:\n  {}\n"
       "Use '-e <apkname>=' to specify a key (which may be an empty string to "
       "not sign this apk).".format("\n  ".join(unknown_apks)))
       "not sign this apk).".format("\n  ".join(unknown_files)))


def SignApk(data, keyname, pw, platform_api_level, codename_to_api_level_map,
@@ -293,9 +348,69 @@ def SignApk(data, keyname, pw, platform_api_level, codename_to_api_level_map,
  return data


def SignApex(apex_data, payload_key, container_key, container_pw,
             codename_to_api_level_map, signing_args=None):
  """Signs the current APEX with the given payload/container keys.

  Args:
    apex_data: Raw APEX data.
    payload_key: The path to payload signing key (w/o extension).
    container_key: The path to container signing key (w/o extension).
    container_pw: The matching password of the container_key, or None.
    codename_to_api_level_map: A dict that maps from codename to API level.
    signing_args: Additional args to be passed to the payload signer.

  Returns:
    (signed_apex, payload_key_name): signed_apex is the path to the signed APEX
        file; payload_key_name is a str of the payload signing key name (e.g.
        com.android.tzdata).
  """
  apex_file = common.MakeTempFile(prefix='apex-', suffix='.apex')
  with open(apex_file, 'wb') as apex_fp:
    apex_fp.write(apex_data)

  APEX_PAYLOAD_IMAGE = 'apex_payload.img'

  # Signing an APEX is a two step process.
  # 1. Extract and sign the APEX_PAYLOAD_IMAGE entry with the given payload_key.
  payload_dir = common.MakeTempDir(prefix='apex-payload-')
  with zipfile.ZipFile(apex_file) as apex_fd:
    payload_file = apex_fd.extract(APEX_PAYLOAD_IMAGE, payload_dir)

  payload_info = apex_utils.ParseApexPayloadInfo(payload_file)
  apex_utils.SignApexPayload(
      payload_file,
      payload_key,
      payload_info['apex.key'],
      payload_info['Algorithm'],
      payload_info['Salt'],
      signing_args)

  common.ZipDelete(apex_file, APEX_PAYLOAD_IMAGE)
  apex_zip = zipfile.ZipFile(apex_file, 'a')
  common.ZipWrite(apex_zip, payload_file, arcname=APEX_PAYLOAD_IMAGE)
  common.ZipClose(apex_zip)

  # 2. Sign the overall APEX container with container_key.
  signed_apex = common.MakeTempFile(prefix='apex-container-', suffix='.apex')
  common.SignFile(
      apex_file,
      signed_apex,
      container_key,
      container_pw,
      codename_to_api_level_map=codename_to_api_level_map)

  signed_and_aligned_apex = common.MakeTempFile(
      prefix='apex-container-', suffix='.apex')
  common.RunAndCheckOutput(
      ['zipalign', '-f', '4096', signed_apex, signed_and_aligned_apex])

  return (signed_and_aligned_apex, payload_info['apex.key'])


def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
                       apk_key_map, key_passwords, platform_api_level,
                       codename_to_api_level_map,
                       apk_keys, apex_keys, key_passwords,
                       platform_api_level, codename_to_api_level_map,
                       compressed_extension):
  # maxsize measures the maximum filename length, including the ones to be
  # skipped.
@@ -304,6 +419,10 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
       if GetApkFileInfo(i.filename, compressed_extension, [])[0]])
  system_root_image = misc_info.get("system_root_image") == "true"

  # A dict of APEX payload public keys that should be updated, i.e. the files
  # under '/system/etc/security/apex/'.
  updated_apex_payload_keys = {}

  for info in input_tf_zip.infolist():
    filename = info.filename
    if filename.startswith("IMAGES/"):
@@ -331,7 +450,7 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
      if is_compressed:
        name = name[:-len(compressed_extension)]

      key = apk_key_map[name]
      key = apk_keys[name]
      if key not in common.SPECIAL_CERT_STRINGS:
        print("    signing: %-*s (%s)" % (maxsize, name, key))
        signed_data = SignApk(data, key, key_passwords[key], platform_api_level,
@@ -344,6 +463,30 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
            "        (skipped due to special cert string)" % (name,))
        common.ZipWriteStr(output_tf_zip, out_info, data)

    # Sign bundled APEX files.
    elif filename.startswith("SYSTEM/apex") and filename.endswith(".apex"):
      name = os.path.basename(filename)
      payload_key, container_key = apex_keys[name]

      print("    signing: %-*s container (%s)" % (maxsize, name, container_key))
      print("           : %-*s payload   (%s)" % (maxsize, name, payload_key))

      (signed_apex, payload_key_name) = SignApex(
          data,
          payload_key,
          container_key,
          key_passwords[container_key],
          codename_to_api_level_map,
          OPTIONS.avb_extra_args.get('apex'))
      common.ZipWrite(output_tf_zip, signed_apex, filename)

      updated_apex_payload_keys[payload_key_name] = payload_key

    # AVB public keys for the installed APEXes, which will be updated later.
    elif (os.path.dirname(filename) == 'SYSTEM/etc/security/apex' and
          filename != 'SYSTEM/etc/security/apex/'):
      continue

    # System properties.
    elif filename in ("SYSTEM/build.prop",
                      "VENDOR/build.prop",
@@ -406,6 +549,30 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
    else:
      common.ZipWriteStr(output_tf_zip, out_info, data)

  # Update APEX payload public keys.
  for info in input_tf_zip.infolist():
    filename = info.filename
    if (os.path.dirname(filename) != 'SYSTEM/etc/security/apex' or
        filename == 'SYSTEM/etc/security/apex/'):
      continue

    name = os.path.basename(filename)
    assert name in updated_apex_payload_keys, \
        'Unsigned APEX payload key: {}'.format(filename)

    key_path = updated_apex_payload_keys[name]
    if not os.path.exists(key_path) and not key_path.endswith('.pem'):
      key_path = '{}.pem'.format(key_path)
    assert os.path.exists(key_path), \
        'Failed to find public key file {} for APEX {}'.format(
            updated_apex_payload_keys[name], name)

    print('Replacing APEX payload public key for {} with {}'.format(
        name, key_path))

    public_key = common.ExtractAvbPublicKey(key_path)
    common.ZipWrite(output_tf_zip, public_key, arcname=filename)

  if OPTIONS.replace_ota_keys:
    ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info)

@@ -821,6 +988,67 @@ def GetCodenameToApiLevelMap(input_tf_zip):
  return result


def ReadApexKeysInfo(tf_zip):
  """Parses the APEX keys info from a given target-files zip.

  Given a target-files ZipFile, parses the META/apexkeys.txt entry and returns a
  dict that contains the mapping from APEX names (e.g. com.android.tzdata) to a
  tuple of (payload_key, container_key).

  Args:
    tf_zip: The input target_files ZipFile (already open).

  Returns:
    (payload_key, container_key): payload_key contains the path to the payload
        signing key; container_key contains the path to the container signing
        key.
  """
  keys = {}
  for line in tf_zip.read("META/apexkeys.txt").split("\n"):
    line = line.strip()
    if not line:
      continue
    matches = re.match(
        r'^name="(?P<NAME>.*)"\s+'
        r'public_key="(?P<PAYLOAD_PUBLIC_KEY>.*)"\s+'
        r'private_key="(?P<PAYLOAD_PRIVATE_KEY>.*)"\s+'
        r'container_certificate="(?P<CONTAINER_CERT>.*)"\s+'
        r'container_private_key="(?P<CONTAINER_PRIVATE_KEY>.*)"$',
        line)
    if not matches:
      continue

    name = matches.group('NAME')
    payload_public_key = matches.group("PAYLOAD_PUBLIC_KEY")
    payload_private_key = matches.group("PAYLOAD_PRIVATE_KEY")

    def CompareKeys(pubkey, pubkey_suffix, privkey, privkey_suffix):
      pubkey_suffix_len = len(pubkey_suffix)
      privkey_suffix_len = len(privkey_suffix)
      return (pubkey.endswith(pubkey_suffix) and
              privkey.endswith(privkey_suffix) and
              pubkey[:-pubkey_suffix_len] == privkey[:-privkey_suffix_len])

    PAYLOAD_PUBLIC_KEY_SUFFIX = '.avbpubkey'
    PAYLOAD_PRIVATE_KEY_SUFFIX = '.pem'
    if not CompareKeys(
        payload_public_key, PAYLOAD_PUBLIC_KEY_SUFFIX,
        payload_private_key, PAYLOAD_PRIVATE_KEY_SUFFIX):
      raise ValueError("Failed to parse payload keys: \n{}".format(line))

    container_cert = matches.group("CONTAINER_CERT")
    container_private_key = matches.group("CONTAINER_PRIVATE_KEY")
    if not CompareKeys(
        container_cert, OPTIONS.public_key_suffix,
        container_private_key, OPTIONS.private_key_suffix):
      raise ValueError("Failed to parse container keys: \n{}".format(line))

    keys[name] = (payload_private_key,
                  container_cert[:-len(OPTIONS.public_key_suffix)])

  return keys


def main(argv):

  key_mapping_options = []
@@ -831,6 +1059,9 @@ def main(argv):
      names = names.split(",")
      for n in names:
        OPTIONS.extra_apks[n] = key
    elif o == "--extra_apex_payload_key":
      apex_name, key = a.split("=")
      OPTIONS.extra_apex_payload_keys[apex_name] = key
    elif o == "--skip_apks_with_path_prefix":
      # Sanity check the prefix, which must be in all upper case.
      prefix = a.split('/')[0]
@@ -887,6 +1118,8 @@ def main(argv):
      OPTIONS.avb_algorithms['vendor'] = a
    elif o == "--avb_vendor_extra_args":
      OPTIONS.avb_extra_args['vendor'] = a
    elif o == "--avb_apex_extra_args":
      OPTIONS.avb_extra_args['apex'] = a
    else:
      return False
    return True
@@ -896,6 +1129,7 @@ def main(argv):
      extra_opts="e:d:k:ot:",
      extra_long_opts=[
          "extra_apks=",
          "extra_apex_payload_key=",
          "skip_apks_with_path_prefix=",
          "default_key_mappings=",
          "key_mapping=",
@@ -904,6 +1138,7 @@ def main(argv):
          "replace_verity_public_key=",
          "replace_verity_private_key=",
          "replace_verity_keyid=",
          "avb_apex_extra_args=",
          "avb_vbmeta_algorithm=",
          "avb_vbmeta_key=",
          "avb_vbmeta_extra_args=",
@@ -937,18 +1172,25 @@ def main(argv):

  BuildKeyMap(misc_info, key_mapping_options)

  certmap, compressed_extension = common.ReadApkCerts(input_zip)
  apk_key_map = GetApkCerts(certmap)
  CheckAllApksSigned(input_zip, apk_key_map, compressed_extension)
  apk_keys_info, compressed_extension = common.ReadApkCerts(input_zip)
  apk_keys = GetApkCerts(apk_keys_info)

  key_passwords = common.GetKeyPasswords(set(apk_key_map.values()))
  apex_keys_info = ReadApexKeysInfo(input_zip)
  apex_keys = GetApexKeys(apex_keys_info, apk_keys)

  CheckApkAndApexKeysAvailable(
      input_zip,
      set(apk_keys.keys()) | set(apex_keys.keys()),
      compressed_extension)

  key_passwords = common.GetKeyPasswords(
      set(apk_keys.values()) | set(itertools.chain(*apex_keys.values())))
  platform_api_level, _ = GetApiLevelAndCodename(input_zip)
  codename_to_api_level_map = GetCodenameToApiLevelMap(input_zip)

  ProcessTargetFiles(input_zip, output_zip, misc_info,
                     apk_key_map, key_passwords,
                     platform_api_level,
                     codename_to_api_level_map,
                     apk_keys, apex_keys, key_passwords,
                     platform_api_level, codename_to_api_level_map,
                     compressed_extension)

  common.ZipClose(input_zip)
+87 −0

File added.

Preview size limit exceeded, changes collapsed.

+72 −8

File changed.

Preview size limit exceeded, changes collapsed.

Loading