Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0c28d2d0 authored by Tao Bao's avatar Tao Bao
Browse files

releasetools: Clean up sign_target_files_apks.py.

Mostly cosmetic changes, such as replacing print statement with print
function. Also change 'import cStringIO' to optionally look for the one
in io module, to allow Python 2/3 compatibility.

Test: pylint --rcfile=pylintrc sign_target_files_apks.py
Test: Run sign_target_files_apks.py on marlin target_files.zip.
Change-Id: I4dc98b01da6f89e624114bbca5522f659901c1f2
parent a52691b1
Loading
Loading
Loading
Loading
+115 −94
Original line number Diff line number Diff line
@@ -90,14 +90,9 @@ Usage: sign_target_files_apks [flags] input_target_files output_target_files
      the existing ones in info dict.
"""

import sys

if sys.hexversion < 0x02070000:
  print >> sys.stderr, "Python 2.7 or newer is required."
  sys.exit(1)
from __future__ import print_function

import base64
import cStringIO
import copy
import errno
import gzip
@@ -106,12 +101,19 @@ import re
import shutil
import stat
import subprocess
import sys
import tempfile
import zipfile

import add_img_to_target_files
import common


if sys.hexversion < 0x02070000:
  print("Python 2.7 or newer is required.", file=sys.stderr)
  sys.exit(1)


OPTIONS = common.OPTIONS

OPTIONS.extra_apks = {}
@@ -126,6 +128,7 @@ OPTIONS.avb_keys = {}
OPTIONS.avb_algorithms = {}
OPTIONS.avb_extra_args = {}


def GetApkCerts(certmap):
  # apply the key remapping to the contents of the file
  for apk, cert in certmap.iteritems():
@@ -149,17 +152,18 @@ def CheckAllApksSigned(input_tf_zip, apk_key_map, compressed_extension):
    compressed_apk_extension = ".apk" + compressed_extension
  for info in input_tf_zip.infolist():
    if (info.filename.endswith(".apk") or
        (compressed_apk_extension and info.filename.endswith(compressed_apk_extension))):
        (compressed_apk_extension and
         info.filename.endswith(compressed_apk_extension))):
      name = os.path.basename(info.filename)
      if compressed_apk_extension and name.endswith(compressed_apk_extension):
        name = name[:-len(compressed_extension)]
      if name not in apk_key_map:
        unknown_apks.append(name)
  if unknown_apks:
    print "ERROR: no key specified for:\n\n ",
    print "\n  ".join(unknown_apks)
    print "\nUse '-e <apkname>=' to specify a key (which may be an"
    print "empty string to not sign this apk)."
    print("ERROR: no key specified for:\n")
    print("  " + "\n  ".join(unknown_apks))
    print("\nUse '-e <apkname>=' to specify a key (which may be an empty "
          "string to not sign this apk).")
    sys.exit(1)


@@ -171,7 +175,8 @@ def SignApk(data, keyname, pw, platform_api_level, codename_to_api_level_map,

  if is_compressed:
    uncompressed = tempfile.NamedTemporaryFile()
    with gzip.open(unsigned.name, "rb") as in_file, open(uncompressed.name, "wb") as out_file:
    with gzip.open(unsigned.name, "rb") as in_file, \
         open(uncompressed.name, "wb") as out_file:
      shutil.copyfileobj(in_file, out_file)

    # Finally, close the "unsigned" file (which is gzip compressed), and then
@@ -206,11 +211,12 @@ def SignApk(data, keyname, pw, platform_api_level, codename_to_api_level_map,
                  min_api_level=min_api_level,
                  codename_to_api_level_map=codename_to_api_level_map)

  data = None;
  data = None
  if is_compressed:
    # Recompress the file after it has been signed.
    compressed = tempfile.NamedTemporaryFile()
    with open(signed.name, "rb") as in_file, gzip.open(compressed.name, "wb") as out_file:
    with open(signed.name, "rb") as in_file, \
         gzip.open(compressed.name, "wb") as out_file:
      shutil.copyfileobj(in_file, out_file)

    data = compressed.read()
@@ -233,10 +239,11 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
  if compressed_extension:
    compressed_apk_extension = ".apk" + compressed_extension

  maxsize = max([len(os.path.basename(i.filename))
                 for i in input_tf_zip.infolist()
                 if i.filename.endswith('.apk') or
                 (compressed_apk_extension and i.filename.endswith(compressed_apk_extension))])
  maxsize = max(
      [len(os.path.basename(i.filename)) for i in input_tf_zip.infolist()
       if (i.filename.endswith('.apk') or
           (compressed_apk_extension and
            i.filename.endswith(compressed_apk_extension)))])
  system_root_image = misc_info.get("system_root_image") == "true"

  for info in input_tf_zip.infolist():
@@ -248,21 +255,23 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,

    # Sign APKs.
    if (info.filename.endswith(".apk") or
        (compressed_apk_extension and info.filename.endswith(compressed_apk_extension))):
      is_compressed = compressed_extension and info.filename.endswith(compressed_apk_extension)
        (compressed_apk_extension and
         info.filename.endswith(compressed_apk_extension))):
      is_compressed = (compressed_extension and
                       info.filename.endswith(compressed_apk_extension))
      name = os.path.basename(info.filename)
      if is_compressed:
        name = name[:-len(compressed_extension)]

      key = apk_key_map[name]
      if key not in common.SPECIAL_CERT_STRINGS:
        print "    signing: %-*s (%s)" % (maxsize, name, key)
        print("    signing: %-*s (%s)" % (maxsize, name, key))
        signed_data = SignApk(data, key, key_passwords[key], platform_api_level,
                              codename_to_api_level_map, is_compressed)
        common.ZipWriteStr(output_tf_zip, out_info, signed_data)
      else:
        # an APK we're not supposed to sign.
        print "NOT signing: %s" % (name,)
        print("NOT signing: %s" % (name,))
        common.ZipWriteStr(output_tf_zip, out_info, data)

    # System properties.
@@ -274,7 +283,7 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
                           "ROOT/default.prop",  # legacy
                           "RECOVERY/RAMDISK/prop.default",
                           "RECOVERY/RAMDISK/default.prop"):  # legacy
      print "rewriting %s:" % (info.filename,)
      print("Rewriting %s:" % (info.filename,))
      if stat.S_ISLNK(info.external_attr >> 16):
        new_data = data
      else:
@@ -282,7 +291,7 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
      common.ZipWriteStr(output_tf_zip, out_info, new_data)

    elif info.filename.endswith("mac_permissions.xml"):
      print "rewriting %s with new keys." % (info.filename,)
      print("Rewriting %s with new keys." % (info.filename,))
      new_data = ReplaceCerts(data)
      common.ZipWriteStr(output_tf_zip, out_info, new_data)

@@ -333,10 +342,7 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
    ReplaceVerityPrivateKey(misc_info, OPTIONS.replace_verity_private_key[1])

  if OPTIONS.replace_verity_public_key:
    if system_root_image:
      dest = "ROOT/verity_key"
    else:
      dest = "BOOT/RAMDISK/verity_key"
    dest = "ROOT/verity_key" if system_root_image else "BOOT/RAMDISK/verity_key"
    # We are replacing the one in boot image only, since the one under
    # recovery won't ever be needed.
    ReplaceVerityPublicKey(
@@ -361,7 +367,7 @@ def ReplaceCerts(data):
  for old, new in OPTIONS.key_map.iteritems():
    try:
      if OPTIONS.verbose:
        print "    Replacing %s.x509.pem with %s.x509.pem" % (old, new)
        print("    Replacing %s.x509.pem with %s.x509.pem" % (old, new))
      f = open(old + ".x509.pem")
      old_cert16 = base64.b16encode(common.ParseCertificate(f.read())).lower()
      f.close()
@@ -372,14 +378,14 @@ def ReplaceCerts(data):
      pattern = "\\b" + old_cert16 + "\\b"
      (data, num) = re.subn(pattern, new_cert16, data, flags=re.IGNORECASE)
      if OPTIONS.verbose:
        print "    Replaced %d occurence(s) of %s.x509.pem with " \
            "%s.x509.pem" % (num, old, new)
        print("    Replaced %d occurence(s) of %s.x509.pem with "
              "%s.x509.pem" % (num, old, new))
    except IOError as e:
      if e.errno == errno.ENOENT and not OPTIONS.verbose:
        continue

      print "    Error accessing %s. %s. Skip replacing %s.x509.pem " \
          "with %s.x509.pem." % (e.filename, e.strerror, old, new)
      print("    Error accessing %s. %s. Skip replacing %s.x509.pem with "
            "%s.x509.pem." % (e.filename, e.strerror, old, new))

  return data

@@ -445,8 +451,8 @@ def RewriteProps(data):
        value = " ".join(value)
      line = key + "=" + value
    if line != original_line:
      print "  replace: ", original_line
      print "     with: ", line
      print("  replace: ", original_line)
      print("     with: ", line)
    output.append(line)
  return "\n".join(output) + "\n"

@@ -462,7 +468,7 @@ def ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info):
    extra_recovery_keys = [OPTIONS.key_map.get(k, k) + ".x509.pem"
                           for k in extra_recovery_keys.split()]
    if extra_recovery_keys:
      print "extra recovery-only key(s): " + ", ".join(extra_recovery_keys)
      print("extra recovery-only key(s): " + ", ".join(extra_recovery_keys))
  else:
    extra_recovery_keys = []

@@ -476,8 +482,8 @@ def ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info):
    mapped_keys.append(OPTIONS.key_map.get(k, k) + ".x509.pem")

  if mapped_keys:
    print "using:\n   ", "\n   ".join(mapped_keys)
    print "for OTA package verification"
    print("using:\n   ", "\n   ".join(mapped_keys))
    print("for OTA package verification")
  else:
    devkey = misc_info.get("default_system_dev_certificate",
                           "build/target/product/security/testkey")
@@ -511,7 +517,11 @@ def ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info):
  # put into a zipfile system/etc/security/otacerts.zip.
  # We DO NOT include the extra_recovery_keys (if any) here.

  temp_file = cStringIO.StringIO()
  try:
    from StringIO import StringIO
  except ImportError:
    from io import StringIO
  temp_file = StringIO()
  certs_zip = zipfile.ZipFile(temp_file, "w")
  for k in mapped_keys:
    common.ZipWrite(certs_zip, k)
@@ -527,7 +537,7 @@ def ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info):
      print("\n  WARNING: Found more than one OTA keys; Using the first one"
            " as payload verification key.\n\n")

    print "Using %s for payload verification." % (mapped_keys[0],)
    print("Using %s for payload verification." % (mapped_keys[0],))
    cmd = common.Run(
        ["openssl", "x509", "-pubkey", "-noout", "-in", mapped_keys[0]],
        stdout=subprocess.PIPE)
@@ -544,40 +554,53 @@ def ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info):
  return new_recovery_keys


def ReplaceVerityPublicKey(targetfile_zip, filename, key_path):
  print "Replacing verity public key with %s" % (key_path,)
  common.ZipWrite(targetfile_zip, key_path, arcname=filename)
def ReplaceVerityPublicKey(output_zip, filename, key_path):
  """Replaces the verity public key at the given path in the given zip.

  Args:
    output_zip: The output target_files zip.
    filename: The archive name in the output zip.
    key_path: The path to the public key.
  """
  print("Replacing verity public key with %s" % (key_path,))
  common.ZipWrite(output_zip, key_path, arcname=filename)


def ReplaceVerityPrivateKey(misc_info, key_path):
  print "Replacing verity private key with %s" % (key_path,)
  """Replaces the verity private key in misc_info dict.

  Args:
    misc_info: The info dict.
    key_path: The path to the private key in PKCS#8 format.
  """
  print("Replacing verity private key with %s" % (key_path,))
  misc_info["verity_key"] = key_path


def ReplaceVerityKeyId(targetfile_input_zip, targetfile_output_zip, keypath):
def ReplaceVerityKeyId(targetfile_input_zip, targetfile_output_zip, key_path):
  in_cmdline = targetfile_input_zip.read("BOOT/cmdline")
  # copy in_cmdline to output_zip if veritykeyid is not present in in_cmdline
  if "veritykeyid" not in in_cmdline:
    common.ZipWriteStr(targetfile_output_zip, "BOOT/cmdline", in_cmdline)
    return in_cmdline
  out_cmdline = []
  out_buffer = []
  for param in in_cmdline.split():
    if "veritykeyid" in param:
      # extract keyid using openssl command
      p = common.Run(
          ["openssl", "x509", "-in", keypath, "-text"],
          ["openssl", "x509", "-in", key_path, "-text"],
          stdout=subprocess.PIPE)
      keyid, stderr = p.communicate()
      keyid = re.search(
          r'keyid:([0-9a-fA-F:]*)', keyid).group(1).replace(':', '').lower()
      print "Replacing verity keyid with %s error=%s" % (keyid, stderr)
      out_cmdline.append("veritykeyid=id:%s" % (keyid,))
      print("Replacing verity keyid with %s error=%s" % (keyid, stderr))
      out_buffer.append("veritykeyid=id:%s" % (keyid,))
    else:
      out_cmdline.append(param)
      out_buffer.append(param)

  out_cmdline = ' '.join(out_cmdline)
  out_cmdline = ' '.join(out_buffer)
  out_cmdline = out_cmdline.strip()
  print "out_cmdline %s" % (out_cmdline)
  print("out_cmdline %s" % (out_cmdline))
  common.ZipWriteStr(targetfile_output_zip, "BOOT/cmdline", out_cmdline)


@@ -616,15 +639,15 @@ def ReplaceAvbSigningKeys(misc_info):
    algorithm = OPTIONS.avb_algorithms.get(partition)
    assert algorithm, 'Missing AVB signing algorithm for %s' % (partition,)

    print 'Replacing AVB signing key for %s with "%s" (%s)' % (
        partition, key, algorithm)
    print('Replacing AVB signing key for %s with "%s" (%s)' % (
        partition, key, algorithm))
    misc_info['avb_' + partition + '_algorithm'] = algorithm
    misc_info['avb_' + partition + '_key_path'] = key

    extra_args = OPTIONS.avb_extra_args.get(partition)
    if extra_args:
      print 'Setting extra AVB signing args for %s to "%s"' % (
          partition, extra_args)
      print('Setting extra AVB signing args for %s to "%s"' % (
          partition, extra_args))
      args_key = AVB_FOOTER_ARGS_BY_PARTITION[partition]
      misc_info[args_key] = (misc_info.get(args_key, '') + ' ' + extra_args)

@@ -832,16 +855,14 @@ def main(argv):
  new_args.append(args[1])
  add_img_to_target_files.main(new_args)

  print "done."
  print("done.")


if __name__ == '__main__':
  try:
    main(sys.argv[1:])
  except common.ExternalError, e:
    print
    print "   ERROR: %s" % (e,)
    print
  except common.ExternalError as e:
    print("\n   ERROR: %s\n" % (e,))
    sys.exit(1)
  finally:
    common.Cleanup()