Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1f93ce20 authored by Satoshi Futenma's avatar Satoshi Futenma Committed by Håkan Kvist
Browse files

Support custom payload signer in merge_ota

Add options to handle custom payload signer as it is required in
merge_ota.py as well if the original OTA packages are signed by
the signer.

If input is only one OTA, clone apex_info.pb to the target.

Use common.ZipWriteStr instead of zipfile.writestr, this ensures
that the same permission for the entries as done by
ota_from_target_files.

Bug: 282189563
Test: manual. pass single OTA to merge_ota, with same signing
    parameters as originally used. Confirm that output zip is
    binary identical to input.
Change-Id: I3b926b8cd69bc74dff6ccf8b5ccc72bedffcac6f
parent 7f1171e4
Loading
Loading
Loading
Loading
+48 −2
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@

import argparse
import logging
import shlex
import struct
import sys
import update_payload
@@ -34,6 +35,7 @@ from ota_utils import PayloadGenerator, METADATA_PROTO_NAME, FinalizeMetadata
logger = logging.getLogger(__name__)

CARE_MAP_ENTRY = "care_map.pb"
APEX_INFO_ENTRY = "apex_info.pb"


def WriteDataBlob(payload: Payload, outfp: BinaryIO, read_size=1024*64):
@@ -188,6 +190,22 @@ def CheckDuplicatePartitions(payloads: List[Payload]):
              f"OTA {partition_to_ota[part].name} and {payload.name} have duplicating partition {part}")
        partition_to_ota[part] = payload

def ApexInfo(file_paths):
  if len(file_paths) > 1:
    logger.info("More than one target file specified, will ignore "
                "apex_info.pb (if any)")
    return None
  with zipfile.ZipFile(file_paths[0], "r", allowZip64=True) as zfp:
    if APEX_INFO_ENTRY in zfp.namelist():
      apex_info_bytes = zfp.read(APEX_INFO_ENTRY)
      return apex_info_bytes
  return None

def ParseSignerArgs(args):
  if args is None:
    return None
  return shlex.split(args)

def main(argv):
  parser = argparse.ArgumentParser(description='Merge multiple partial OTAs')
  parser.add_argument('packages', type=str, nargs='+',
@@ -196,6 +214,13 @@ def main(argv):
                      help='Paths to private key for signing payload')
  parser.add_argument('--search_path', type=str,
                      help='Search path for framework/signapk.jar')
  parser.add_argument('--payload_signer', type=str,
                      help='Path to custom payload signer')
  parser.add_argument('--payload_signer_args', type=ParseSignerArgs,
                      help='Arguments for payload signer if necessary')
  parser.add_argument('--payload_signer_maximum_signature_size', type=str,
                      help='Maximum signature size (in bytes) that would be '
                      'generated by the given payload signer')
  parser.add_argument('--output', type=str,
                      help='Paths to output merged ota', required=True)
  parser.add_argument('--metadata_ota', type=str,
@@ -203,6 +228,9 @@ def main(argv):
  parser.add_argument('--private_key_suffix', type=str,
                      help='Suffix to be appended to package_key path', default=".pk8")
  parser.add_argument('-v', action="store_true", help="Enable verbose logging", dest="verbose")
  parser.epilog = ('This tool can also be used to resign a regular OTA. For a single regular OTA, '
                   'apex_info.pb will be written to output. When merging multiple OTAs, '
                   'apex_info.pb will not be written.')
  args = parser.parse_args(argv[1:])
  file_paths = args.packages

@@ -225,6 +253,13 @@ def main(argv):

  merged_manifest = MergeManifests(payloads)

  # Get signing keys
  key_passwords = common.GetKeyPasswords([args.package_key])

  generator = PayloadGenerator()

  apex_info_bytes = ApexInfo(file_paths)

  with tempfile.NamedTemporaryFile() as unsigned_payload:
    WriteHeaderAndManifest(merged_manifest, unsigned_payload)
    ConcatBlobs(payloads, unsigned_payload)
@@ -236,20 +271,31 @@ def main(argv):

    if args.package_key:
      logger.info("Signing payload...")
      signer = PayloadSigner(args.package_key, args.private_key_suffix)
      # TODO: remove OPTIONS when no longer used as fallback in payload_signer
      common.OPTIONS.payload_signer_args = None
      common.OPTIONS.payload_signer_maximum_signature_size = None
      signer = PayloadSigner(args.package_key, args.private_key_suffix,
                             key_passwords[args.package_key],
                             payload_signer=args.payload_signer,
                             payload_signer_args=args.payload_signer_args,
                             payload_signer_maximum_signature_size=args.payload_signer_maximum_signature_size)
      generator.payload_file = unsigned_payload.name
      generator.Sign(signer)

    logger.info("Payload size: %d", os.path.getsize(generator.payload_file))

    logger.info("Writing to %s", args.output)

    key_passwords = common.GetKeyPasswords([args.package_key])
    with tempfile.NamedTemporaryFile(prefix="signed_ota", suffix=".zip") as signed_ota:
      with zipfile.ZipFile(signed_ota, "w") as zfp:
        generator.WriteToZip(zfp)
        care_map_bytes = MergeCareMap(args.packages)
        if care_map_bytes:
          zfp.writestr(CARE_MAP_ENTRY, care_map_bytes)
          common.ZipWriteStr(zfp, CARE_MAP_ENTRY, care_map_bytes)
        if apex_info_bytes:
          logger.info("Writing %s", APEX_INFO_ENTRY)
          common.ZipWriteStr(zfp, APEX_INFO_ENTRY, apex_info_bytes)
      AddOtaMetadata(signed_ota.name, metadata_ota,
                     args.output, args.package_key, key_passwords[args.package_key])
  return 0
+9 −4
Original line number Diff line number Diff line
@@ -36,11 +36,16 @@ class PayloadSigner(object):
  (OPTIONS.package_key) and calls openssl for the signing works.
  """

  def __init__(self, package_key=None, private_key_suffix=None, pw=None, payload_signer=None):
  def __init__(self, package_key=None, private_key_suffix=None, pw=None, payload_signer=None,
               payload_signer_args=None, payload_signer_maximum_signature_size=None):
    if package_key is None:
      package_key = OPTIONS.package_key
    if private_key_suffix is None:
      private_key_suffix = OPTIONS.private_key_suffix
    if payload_signer_args is None:
      payload_signer_args = OPTIONS.payload_signer_args
    if payload_signer_maximum_signature_size is None:
      payload_signer_maximum_signature_size = OPTIONS.payload_signer_maximum_signature_size

    if payload_signer is None:
      # Prepare the payload signing key.
@@ -59,10 +64,10 @@ class PayloadSigner(object):
          signing_key)
    else:
      self.signer = payload_signer
      self.signer_args = OPTIONS.payload_signer_args
      if OPTIONS.payload_signer_maximum_signature_size:
      self.signer_args = payload_signer_args
      if payload_signer_maximum_signature_size:
        self.maximum_signature_size = int(
            OPTIONS.payload_signer_maximum_signature_size)
            payload_signer_maximum_signature_size)
      else:
        # The legacy config uses RSA2048 keys.
        logger.warning("The maximum signature size for payload signer is not"