Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 46930d7a authored by Melisa Carranza Zuniga's avatar Melisa Carranza Zuniga
Browse files

Adding flags and logic to sign updateable SEPolicy in APEX

Verify with command:
sign_apex --container_key=testdata/testkey
--payload_key=testdata/testkey_RSA4096.key
--sepolicy_key=testdata/testkey_RSA4096.key
--sepolicy_cert=testdata/testkey.x509.pem
$OUT/system/apex/com.android.sepolicy.apex
$OUT/test/sepolicy.apex

Test: mma and run sign_apex
Change-Id: I8cc5bbc09058b57e463b1d40d4953d62e0438389
parent 27cbdd98
Loading
Loading
Loading
Loading
+85 −15
Original line number Diff line number Diff line
@@ -54,7 +54,7 @@ class ApexSigningError(Exception):
class ApexApkSigner(object):
  """Class to sign the apk files and other files in an apex payload image and repack the apex"""

  def __init__(self, apex_path, key_passwords, codename_to_api_level_map, avbtool=None, sign_tool=None):
  def __init__(self, apex_path, key_passwords, codename_to_api_level_map, avbtool=None, sign_tool=None, fsverity_tool=None):
    self.apex_path = apex_path
    if not key_passwords:
      self.key_passwords = dict()
@@ -65,8 +65,9 @@ class ApexApkSigner(object):
        OPTIONS.search_path, "bin", "debugfs_static")
    self.avbtool = avbtool if avbtool else "avbtool"
    self.sign_tool = sign_tool
    self.fsverity_tool = fsverity_tool if fsverity_tool else "fsverity"

  def ProcessApexFile(self, apk_keys, payload_key, signing_args=None):
  def ProcessApexFile(self, apk_keys, payload_key, signing_args=None, is_sepolicy=False, sepolicy_key=None, sepolicy_cert=None):
    """Scans and signs the payload files and repack the apex

    Args:
@@ -84,10 +85,14 @@ class ApexApkSigner(object):
                self.debugfs_path, 'list', self.apex_path]
    entries_names = common.RunAndCheckOutput(list_cmd).split()
    apk_entries = [name for name in entries_names if name.endswith('.apk')]
    sepolicy_entries = []
    if is_sepolicy:
      sepolicy_entries = [name for name in entries_names if
          name.startswith('./etc/SEPolicy') and name.endswith('.zip')]

    # No need to sign and repack, return the original apex path.
    if not apk_entries and self.sign_tool is None:
      logger.info('No apk file to sign in %s', self.apex_path)
    if not apk_entries and not sepolicy_entries and self.sign_tool is None:
      logger.info('No payload (apk or zip) file to sign in %s', self.apex_path)
      return self.apex_path

    for entry in apk_entries:
@@ -101,15 +106,16 @@ class ApexApkSigner(object):
        logger.warning('Apk path does not contain the intended directory name:'
                       ' %s', entry)

    payload_dir, has_signed_content = self.ExtractApexPayloadAndSignContents(
        apk_entries, apk_keys, payload_key, signing_args)
    payload_dir, has_signed_content = self.ExtractApexPayloadAndSignContents(apk_entries,
        apk_keys, payload_key, sepolicy_entries, sepolicy_key, sepolicy_cert, signing_args)
    if not has_signed_content:
      logger.info('No contents has been signed in %s', self.apex_path)
      return self.apex_path

    return self.RepackApexPayload(payload_dir, payload_key, signing_args)

  def ExtractApexPayloadAndSignContents(self, apk_entries, apk_keys, payload_key, signing_args):
  def ExtractApexPayloadAndSignContents(self, apk_entries, apk_keys, payload_key,
  sepolicy_entries, sepolicy_key, sepolicy_cert, signing_args):
    """Extracts the payload image and signs the containing apk files."""
    if not os.path.exists(self.debugfs_path):
      raise ApexSigningError(
@@ -141,6 +147,11 @@ class ApexApkSigner(object):
          codename_to_api_level_map=self.codename_to_api_level_map)
      has_signed_content = True

    for entry in sepolicy_entries:
      sepolicy_key = sepolicy_key if sepolicy_key else payload_key
      self.SignSePolicy(payload_dir, entry, sepolicy_key, sepolicy_cert)
      has_signed_content = True

    if self.sign_tool:
      logger.info('Signing payload contents in apex %s with %s', self.apex_path, self.sign_tool)
      # Pass avbtool to the custom signing tool
@@ -154,6 +165,36 @@ class ApexApkSigner(object):

    return payload_dir, has_signed_content

  def SignSePolicy(self, payload_dir, sepolicy_zip, sepolicy_key, sepolicy_cert):
    sepolicy_sig = sepolicy_zip + '.sig'
    sepolicy_fsv_sig = sepolicy_zip + '.fsv_sig'

    policy_zip_path = os.path.join(payload_dir, sepolicy_zip)
    sig_out_path = os.path.join(payload_dir, sepolicy_sig)
    sig_old = sig_out_path + '.old'
    if os.path.exists(sig_out_path):
      os.rename(sig_out_path, sig_old)
    sign_cmd = ['openssl', 'dgst', '-sign', sepolicy_key, '-keyform', 'PEM', '-sha256',
        '-out', sig_out_path, '-binary', policy_zip_path]
    common.RunAndCheckOutput(sign_cmd)
    if os.path.exists(sig_old):
      os.remove(sig_old)

    if not sepolicy_cert:
      logger.info('No cert provided for SEPolicy, skipping fsverity sign')
      return

    fsv_sig_out_path = os.path.join(payload_dir, sepolicy_fsv_sig)
    fsv_sig_old = fsv_sig_out_path + '.old'
    if os.path.exists(fsv_sig_out_path):
      os.rename(fsv_sig_out_path, fsv_sig_old)

    fsverity_cmd = [self.fsverity_tool, 'sign', policy_zip_path, fsv_sig_out_path,
        '--key=' + sepolicy_key, '--cert=' + sepolicy_cert]
    common.RunAndCheckOutput(fsverity_cmd)
    if os.path.exists(fsv_sig_old):
      os.remove(fsv_sig_old)

  def RepackApexPayload(self, payload_dir, payload_key, signing_args=None):
    """Rebuilds the apex file with the updated payload directory."""
    apex_dir = common.MakeTempDir()
@@ -324,7 +365,9 @@ def ParseApexPayloadInfo(avbtool, payload_path):

def SignUncompressedApex(avbtool, apex_file, payload_key, container_key,
                         container_pw, apk_keys, codename_to_api_level_map,
                         no_hashtree, signing_args=None, sign_tool=None):
                         no_hashtree, signing_args=None, sign_tool=None,
                         is_sepolicy=False, sepolicy_key=None, sepolicy_cert=None,
                         fsverity_tool=None):
  """Signs the current uncompressed APEX with the given payload/container keys.

  Args:
@@ -337,6 +380,10 @@ def SignUncompressedApex(avbtool, apex_file, payload_key, container_key,
    no_hashtree: Don't include hashtree in the signed APEX.
    signing_args: Additional args to be passed to the payload signer.
    sign_tool: A tool to sign the contents of the APEX.
    is_sepolicy: Indicates if the apex is a sepolicy.apex
    sepolicy_key: Key to sign a sepolicy zip.
    sepolicy_cert: Cert to sign a sepolicy zip.
    fsverity_tool: fsverity path to sign sepolicy zip.

  Returns:
    The path to the signed APEX file.
@@ -345,8 +392,9 @@ def SignUncompressedApex(avbtool, apex_file, payload_key, container_key,
  # the apex file after signing.
  apk_signer = ApexApkSigner(apex_file, container_pw,
                             codename_to_api_level_map,
                             avbtool, sign_tool)
  apex_file = apk_signer.ProcessApexFile(apk_keys, payload_key, signing_args)
                             avbtool, sign_tool, fsverity_tool)
  apex_file = apk_signer.ProcessApexFile(
      apk_keys, payload_key, signing_args, is_sepolicy, sepolicy_key, sepolicy_cert)

  # 2a. Extract and sign the APEX_PAYLOAD_IMAGE entry with the given
  # payload_key.
@@ -400,7 +448,9 @@ def SignUncompressedApex(avbtool, apex_file, payload_key, container_key,

def SignCompressedApex(avbtool, apex_file, payload_key, container_key,
                       container_pw, apk_keys, codename_to_api_level_map,
                       no_hashtree, signing_args=None, sign_tool=None):
                       no_hashtree, signing_args=None, sign_tool=None,
                       is_sepolicy=False, sepolicy_key=None, sepolicy_cert=None,
                       fsverity_tool=None):
  """Signs the current compressed APEX with the given payload/container keys.

  Args:
@@ -412,6 +462,10 @@ def SignCompressedApex(avbtool, apex_file, payload_key, container_key,
    codename_to_api_level_map: A dict that maps from codename to API level.
    no_hashtree: Don't include hashtree in the signed APEX.
    signing_args: Additional args to be passed to the payload signer.
    is_sepolicy: Indicates if the apex is a sepolicy.apex
    sepolicy_key: Key to sign a sepolicy zip.
    sepolicy_cert: Cert to sign a sepolicy zip.
    fsverity_tool: fsverity path to sign sepolicy zip.

  Returns:
    The path to the signed APEX file.
@@ -438,7 +492,11 @@ def SignCompressedApex(avbtool, apex_file, payload_key, container_key,
      codename_to_api_level_map,
      no_hashtree,
      signing_args,
      sign_tool)
      sign_tool,
      is_sepolicy,
      sepolicy_key,
      sepolicy_cert,
      fsverity_tool)

  # 3. Compress signed original apex.
  compressed_apex_file = common.MakeTempFile(prefix='apex-container-',
@@ -466,7 +524,8 @@ def SignCompressedApex(avbtool, apex_file, payload_key, container_key,

def SignApex(avbtool, apex_data, payload_key, container_key, container_pw,
             apk_keys, codename_to_api_level_map,
             no_hashtree, signing_args=None, sign_tool=None):
             no_hashtree, signing_args=None, sign_tool=None,
             is_sepolicy=False, sepolicy_key=None, sepolicy_cert=None, fsverity_tool=None):
  """Signs the current APEX with the given payload/container keys.

  Args:
@@ -478,6 +537,9 @@ def SignApex(avbtool, apex_data, payload_key, container_key, container_pw,
    codename_to_api_level_map: A dict that maps from codename to API level.
    no_hashtree: Don't include hashtree in the signed APEX.
    signing_args: Additional args to be passed to the payload signer.
    sepolicy_key: Key to sign a sepolicy zip.
    sepolicy_cert: Cert to sign a sepolicy zip.
    fsverity_tool: fsverity path to sign sepolicy zip.

  Returns:
    The path to the signed APEX file.
@@ -503,7 +565,11 @@ def SignApex(avbtool, apex_data, payload_key, container_key, container_pw,
          no_hashtree=no_hashtree,
          apk_keys=apk_keys,
          signing_args=signing_args,
          sign_tool=sign_tool)
          sign_tool=sign_tool,
          is_sepolicy=is_sepolicy,
          sepolicy_key=sepolicy_key,
          sepolicy_cert=sepolicy_cert,
          fsverity_tool=fsverity_tool)
    elif apex_type == 'COMPRESSED':
      return SignCompressedApex(
          avbtool,
@@ -515,7 +581,11 @@ def SignApex(avbtool, apex_data, payload_key, container_key, container_pw,
          no_hashtree=no_hashtree,
          apk_keys=apk_keys,
          signing_args=signing_args,
          sign_tool=sign_tool)
          sign_tool=sign_tool,
          is_sepolicy=is_sepolicy,
          sepolicy_key=sepolicy_key,
          sepolicy_cert=sepolicy_cert,
          fsverity_tool=fsverity_tool)
    else:
      # TODO(b/172912232): support signing compressed apex
      raise ApexInfoError('Unsupported apex type {}'.format(apex_type))
+29 −3
Original line number Diff line number Diff line
@@ -42,6 +42,15 @@ Usage: sign_apex [flags] input_apex_file output_apex_file

  --sign_tool <sign_tool>
      Optional flag that specifies a custom signing tool for the contents of the apex.

  --sepolicy_key <key>
      Optional flag that specifies the sepolicy signing key, defaults to payload_key.

  --sepolicy_cert <cert>
      Optional flag that specifies the sepolicy signing cert.

  --fsverity_tool <path>
      Optional flag that specifies the path to fsverity tool to sign SEPolicy, defaults to fsverity.
"""

import logging
@@ -55,7 +64,8 @@ logger = logging.getLogger(__name__)


def SignApexFile(avbtool, apex_file, payload_key, container_key, no_hashtree,
                 apk_keys=None, signing_args=None, codename_to_api_level_map=None, sign_tool=None):
                 apk_keys=None, signing_args=None, codename_to_api_level_map=None, sign_tool=None,
                 sepolicy_key=None, sepolicy_cert=None, fsverity_tool=None):
  """Signs the given apex file."""
  with open(apex_file, 'rb') as input_fp:
    apex_data = input_fp.read()
@@ -70,7 +80,11 @@ def SignApexFile(avbtool, apex_file, payload_key, container_key, no_hashtree,
      no_hashtree=no_hashtree,
      apk_keys=apk_keys,
      signing_args=signing_args,
      sign_tool=sign_tool)
      sign_tool=sign_tool,
      is_sepolicy=apex_file.endswith("sepolicy.apex"),
      sepolicy_key=sepolicy_key,
      sepolicy_cert=sepolicy_cert,
      fsverity_tool=fsverity_tool)


def main(argv):
@@ -106,6 +120,12 @@ def main(argv):
        options['extra_apks'].update({n: key})
    elif o == '--sign_tool':
      options['sign_tool'] = a
    elif o == '--sepolicy_key':
      options['sepolicy_key'] = a
    elif o == '--sepolicy_cert':
      options['sepolicy_cert'] = a
    elif o == '--fsverity_tool':
      options['fsverity_tool'] = a
    else:
      return False
    return True
@@ -121,6 +141,9 @@ def main(argv):
          'payload_key=',
          'extra_apks=',
          'sign_tool=',
          'sepolicy_key=',
          'sepolicy_cert=',
          'fsverity_tool='
      ],
      extra_option_handler=option_handler)

@@ -141,7 +164,10 @@ def main(argv):
      signing_args=options.get('payload_extra_args'),
      codename_to_api_level_map=options.get(
          'codename_to_api_level_map', {}),
      sign_tool=options.get('sign_tool', None))
      sign_tool=options.get('sign_tool', None),
      sepolicy_key=options.get('sepolicy_key', None),
      sepolicy_cert=options.get('sepolicy_cert', None),
      fsverity_tool=options.get('fsverity_tool', None))
  shutil.copyfile(signed_apex, args[1])
  logger.info("done.")

+18 −0
Original line number Diff line number Diff line
@@ -71,3 +71,21 @@ class SignApexTest(test_utils.ReleaseToolsTestCase):
        False,
        codename_to_api_level_map={'S': 31, 'Tiramisu' : 32})
    self.assertTrue(os.path.exists(signed_apex))

  @test_utils.SkipIfExternalToolsUnavailable()
  def test_SignApexWithSepolicy(self):
    test_apex = os.path.join(self.testdata_dir, 'sepolicy.apex')
    payload_key = os.path.join(self.testdata_dir, 'testkey_RSA4096.key')
    container_key = os.path.join(self.testdata_dir, 'testkey')
    sepolicy_key = os.path.join(self.testdata_dir, 'testkey_RSA4096.key')
    sepolicy_cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
    signed_test_apex = sign_apex.SignApexFile(
        'avbtool',
        test_apex,
        payload_key,
        container_key,
        False,
        None,
        sepolicy_key=sepolicy_key,
        sepolicy_cert=sepolicy_cert)
    self.assertTrue(os.path.exists(signed_test_apex))
+296 KiB

File added.

No diff preview for this file type.