Loading tools/releasetools/ota_from_target_files.py +126 −71 Original line number Diff line number Diff line Loading @@ -360,6 +360,122 @@ class PayloadSigner(object): return out_file class Payload(object): """Manages the creation and the signing of an A/B OTA Payload.""" PAYLOAD_BIN = 'payload.bin' PAYLOAD_PROPERTIES_TXT = 'payload_properties.txt' def __init__(self): # The place where the output from the subprocess should go. self._log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE self.payload_file = None self.payload_properties = None def Generate(self, target_file, source_file=None, additional_args=None): """Generates a payload from the given target-files zip(s). Args: target_file: The filename of the target build target-files zip. source_file: The filename of the source build target-files zip; or None if generating a full OTA. additional_args: A list of additional args that should be passed to brillo_update_payload script; or None. """ if additional_args is None: additional_args = [] payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin") cmd = ["brillo_update_payload", "generate", "--payload", payload_file, "--target_image", target_file] if source_file is not None: cmd.extend(["--source_image", source_file]) cmd.extend(additional_args) p = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) stdoutdata, _ = p.communicate() assert p.returncode == 0, \ "brillo_update_payload generate failed: {}".format(stdoutdata) self.payload_file = payload_file self.payload_properties = None def Sign(self, payload_signer): """Generates and signs the hashes of the payload and metadata. Args: payload_signer: A PayloadSigner() instance that serves the signing work. Raises: AssertionError: On any failure when calling brillo_update_payload script. """ assert isinstance(payload_signer, PayloadSigner) # 1. Generate hashes of the payload and metadata files. payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") cmd = ["brillo_update_payload", "hash", "--unsigned_payload", self.payload_file, "--signature_size", "256", "--metadata_hash_file", metadata_sig_file, "--payload_hash_file", payload_sig_file] p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload hash failed" # 2. Sign the hashes. signed_payload_sig_file = payload_signer.Sign(payload_sig_file) signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file) # 3. Insert the signatures back into the payload file. signed_payload_file = common.MakeTempFile(prefix="signed-payload-", suffix=".bin") cmd = ["brillo_update_payload", "sign", "--unsigned_payload", self.payload_file, "--payload", signed_payload_file, "--signature_size", "256", "--metadata_signature_file", signed_metadata_sig_file, "--payload_signature_file", signed_payload_sig_file] p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload sign failed" # 4. Dump the signed payload properties. properties_file = common.MakeTempFile(prefix="payload-properties-", suffix=".txt") cmd = ["brillo_update_payload", "properties", "--payload", signed_payload_file, "--properties_file", properties_file] p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload properties failed" if OPTIONS.wipe_user_data: with open(properties_file, "a") as f: f.write("POWERWASH=1\n") self.payload_file = signed_payload_file self.payload_properties = properties_file def WriteToZip(self, output_zip): """Writes the payload to the given zip. Args: output_zip: The output ZipFile instance. """ assert self.payload_file is not None assert self.payload_properties is not None # Add the signed payload file and properties into the zip. In order to # support streaming, we pack them as ZIP_STORED. So these entries can be # read directly with the offset and length pairs. common.ZipWrite(output_zip, self.payload_file, arcname=Payload.PAYLOAD_BIN, compress_type=zipfile.ZIP_STORED) common.ZipWrite(output_zip, self.payload_properties, arcname=Payload.PAYLOAD_PROPERTIES_TXT, compress_type=zipfile.ZIP_STORED) def SignOutput(temp_zip_name, output_zip_name): pw = OPTIONS.key_passwords[OPTIONS.package_key] Loading Loading @@ -1122,12 +1238,6 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file, value += ' ' * (expected_length - len(value)) return value # The place where the output from the subprocess should go. log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE # Get the PayloadSigner to be used in step 3. payload_signer = PayloadSigner() # Stage the output zip package for package signing. temp_zip_file = tempfile.NamedTemporaryFile() output_zip = zipfile.ZipFile(temp_zip_file, "w", Loading @@ -1143,72 +1253,15 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file, # Metadata to comply with Android OTA package format. metadata = GetPackageMetadata(target_info, source_info) # 1. Generate payload. payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin") cmd = ["brillo_update_payload", "generate", "--payload", payload_file, "--target_image", target_file] if source_file is not None: cmd.extend(["--source_image", source_file]) p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload generate failed" # 2. Generate hashes of the payload and metadata files. payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") cmd = ["brillo_update_payload", "hash", "--unsigned_payload", payload_file, "--signature_size", "256", "--metadata_hash_file", metadata_sig_file, "--payload_hash_file", payload_sig_file] p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload hash failed" # Generate payload. payload = Payload() payload.Generate(target_file, source_file) # 3. Sign the hashes and insert them back into the payload file. # 3a. Sign the payload hash. signed_payload_sig_file = payload_signer.Sign(payload_sig_file) # Sign the payload. payload.Sign(PayloadSigner()) # 3b. Sign the metadata hash. signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file) # 3c. Insert the signatures back into the payload file. signed_payload_file = common.MakeTempFile(prefix="signed-payload-", suffix=".bin") cmd = ["brillo_update_payload", "sign", "--unsigned_payload", payload_file, "--payload", signed_payload_file, "--signature_size", "256", "--metadata_signature_file", signed_metadata_sig_file, "--payload_signature_file", signed_payload_sig_file] p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload sign failed" # 4. Dump the signed payload properties. properties_file = common.MakeTempFile(prefix="payload-properties-", suffix=".txt") cmd = ["brillo_update_payload", "properties", "--payload", signed_payload_file, "--properties_file", properties_file] p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload properties failed" if OPTIONS.wipe_user_data: with open(properties_file, "a") as f: f.write("POWERWASH=1\n") # Add the signed payload file and properties into the zip. In order to # support streaming, we pack payload.bin, payload_properties.txt and # care_map.txt as ZIP_STORED. So these entries can be read directly with # the offset and length pairs. common.ZipWrite(output_zip, signed_payload_file, arcname="payload.bin", compress_type=zipfile.ZIP_STORED) common.ZipWrite(output_zip, properties_file, arcname="payload_properties.txt", compress_type=zipfile.ZIP_STORED) # Write the payload into output zip. payload.WriteToZip(output_zip) # If dm-verity is supported for the device, copy contents of care_map # into A/B OTA package. Loading @@ -1219,6 +1272,8 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file, namelist = target_zip.namelist() if care_map_path in namelist: care_map_data = target_zip.read(care_map_path) # In order to support streaming, care_map.txt needs to be packed as # ZIP_STORED. common.ZipWriteStr(output_zip, "care_map.txt", care_map_data, compress_type=zipfile.ZIP_STORED) else: Loading tools/releasetools/test_ota_from_target_files.py +157 −1 Original line number Diff line number Diff line Loading @@ -15,12 +15,14 @@ # import copy import os import os.path import unittest import zipfile import common from ota_from_target_files import ( _LoadOemDicts, BuildInfo, GetPackageMetadata, PayloadSigner, _LoadOemDicts, BuildInfo, GetPackageMetadata, Payload, PayloadSigner, WriteFingerprintAssertion) Loading Loading @@ -564,3 +566,157 @@ class PayloadSignerTest(unittest.TestCase): verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE) self._assertFilesEqual(verify_file, signed_file) class PayloadTest(unittest.TestCase): def setUp(self): self.testdata_dir = get_testdata_dir() self.assertTrue(os.path.exists(self.testdata_dir)) common.OPTIONS.wipe_user_data = False common.OPTIONS.payload_signer = None common.OPTIONS.payload_signer_args = None common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey') common.OPTIONS.key_passwords = { common.OPTIONS.package_key : None, } def tearDown(self): common.Cleanup() @staticmethod def _construct_target_files(): target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') with zipfile.ZipFile(target_files, 'w') as target_files_zip: # META/update_engine_config.txt target_files_zip.writestr( 'META/update_engine_config.txt', "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n") # META/ab_partitions.txt ab_partitions = ['boot', 'system', 'vendor'] target_files_zip.writestr( 'META/ab_partitions.txt', '\n'.join(ab_partitions)) # Create dummy images for each of them. for partition in ab_partitions: target_files_zip.writestr('IMAGES/' + partition + '.img', os.urandom(len(partition))) return target_files def _create_payload_full(self): target_file = self._construct_target_files() payload = Payload() payload.Generate(target_file) return payload def _create_payload_incremental(self): target_file = self._construct_target_files() source_file = self._construct_target_files() payload = Payload() payload.Generate(target_file, source_file) return payload def test_Generate_full(self): payload = self._create_payload_full() self.assertTrue(os.path.exists(payload.payload_file)) def test_Generate_incremental(self): payload = self._create_payload_incremental() self.assertTrue(os.path.exists(payload.payload_file)) def test_Generate_additionalArgs(self): target_file = self._construct_target_files() source_file = self._construct_target_files() payload = Payload() # This should work the same as calling payload.Generate(target_file, # source_file). payload.Generate( target_file, additional_args=["--source_image", source_file]) self.assertTrue(os.path.exists(payload.payload_file)) def test_Generate_invalidInput(self): target_file = self._construct_target_files() common.ZipDelete(target_file, 'IMAGES/vendor.img') payload = Payload() self.assertRaises(AssertionError, payload.Generate, target_file) def test_Sign_full(self): payload = self._create_payload_full() payload.Sign(PayloadSigner()) output_file = common.MakeTempFile(suffix='.zip') with zipfile.ZipFile(output_file, 'w') as output_zip: payload.WriteToZip(output_zip) import check_ota_package_signature check_ota_package_signature.VerifyAbOtaPayload( os.path.join(self.testdata_dir, 'testkey.x509.pem'), output_file) def test_Sign_incremental(self): payload = self._create_payload_incremental() payload.Sign(PayloadSigner()) output_file = common.MakeTempFile(suffix='.zip') with zipfile.ZipFile(output_file, 'w') as output_zip: payload.WriteToZip(output_zip) import check_ota_package_signature check_ota_package_signature.VerifyAbOtaPayload( os.path.join(self.testdata_dir, 'testkey.x509.pem'), output_file) def test_Sign_withDataWipe(self): common.OPTIONS.wipe_user_data = True payload = self._create_payload_full() payload.Sign(PayloadSigner()) with open(payload.payload_properties) as properties_fp: self.assertIn("POWERWASH=1", properties_fp.read()) def test_Sign_badSigner(self): """Tests that signing failure can be captured.""" payload = self._create_payload_full() payload_signer = PayloadSigner() payload_signer.signer_args.append('bad-option') self.assertRaises(AssertionError, payload.Sign, payload_signer) def test_WriteToZip(self): payload = self._create_payload_full() payload.Sign(PayloadSigner()) output_file = common.MakeTempFile(suffix='.zip') with zipfile.ZipFile(output_file, 'w') as output_zip: payload.WriteToZip(output_zip) with zipfile.ZipFile(output_file) as verify_zip: # First make sure we have the essential entries. namelist = verify_zip.namelist() self.assertIn(Payload.PAYLOAD_BIN, namelist) self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist) # Then assert these entries are stored. for entry_info in verify_zip.infolist(): if entry_info.filename not in (Payload.PAYLOAD_BIN, Payload.PAYLOAD_PROPERTIES_TXT): continue self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type) def test_WriteToZip_unsignedPayload(self): """Unsigned payloads should not be allowed to be written to zip.""" payload = self._create_payload_full() output_file = common.MakeTempFile(suffix='.zip') with zipfile.ZipFile(output_file, 'w') as output_zip: self.assertRaises(AssertionError, payload.WriteToZip, output_zip) # Also test with incremental payload. payload = self._create_payload_incremental() output_file = common.MakeTempFile(suffix='.zip') with zipfile.ZipFile(output_file, 'w') as output_zip: self.assertRaises(AssertionError, payload.WriteToZip, output_zip) Loading
tools/releasetools/ota_from_target_files.py +126 −71 Original line number Diff line number Diff line Loading @@ -360,6 +360,122 @@ class PayloadSigner(object): return out_file class Payload(object): """Manages the creation and the signing of an A/B OTA Payload.""" PAYLOAD_BIN = 'payload.bin' PAYLOAD_PROPERTIES_TXT = 'payload_properties.txt' def __init__(self): # The place where the output from the subprocess should go. self._log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE self.payload_file = None self.payload_properties = None def Generate(self, target_file, source_file=None, additional_args=None): """Generates a payload from the given target-files zip(s). Args: target_file: The filename of the target build target-files zip. source_file: The filename of the source build target-files zip; or None if generating a full OTA. additional_args: A list of additional args that should be passed to brillo_update_payload script; or None. """ if additional_args is None: additional_args = [] payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin") cmd = ["brillo_update_payload", "generate", "--payload", payload_file, "--target_image", target_file] if source_file is not None: cmd.extend(["--source_image", source_file]) cmd.extend(additional_args) p = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) stdoutdata, _ = p.communicate() assert p.returncode == 0, \ "brillo_update_payload generate failed: {}".format(stdoutdata) self.payload_file = payload_file self.payload_properties = None def Sign(self, payload_signer): """Generates and signs the hashes of the payload and metadata. Args: payload_signer: A PayloadSigner() instance that serves the signing work. Raises: AssertionError: On any failure when calling brillo_update_payload script. """ assert isinstance(payload_signer, PayloadSigner) # 1. Generate hashes of the payload and metadata files. payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") cmd = ["brillo_update_payload", "hash", "--unsigned_payload", self.payload_file, "--signature_size", "256", "--metadata_hash_file", metadata_sig_file, "--payload_hash_file", payload_sig_file] p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload hash failed" # 2. Sign the hashes. signed_payload_sig_file = payload_signer.Sign(payload_sig_file) signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file) # 3. Insert the signatures back into the payload file. signed_payload_file = common.MakeTempFile(prefix="signed-payload-", suffix=".bin") cmd = ["brillo_update_payload", "sign", "--unsigned_payload", self.payload_file, "--payload", signed_payload_file, "--signature_size", "256", "--metadata_signature_file", signed_metadata_sig_file, "--payload_signature_file", signed_payload_sig_file] p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload sign failed" # 4. Dump the signed payload properties. properties_file = common.MakeTempFile(prefix="payload-properties-", suffix=".txt") cmd = ["brillo_update_payload", "properties", "--payload", signed_payload_file, "--properties_file", properties_file] p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload properties failed" if OPTIONS.wipe_user_data: with open(properties_file, "a") as f: f.write("POWERWASH=1\n") self.payload_file = signed_payload_file self.payload_properties = properties_file def WriteToZip(self, output_zip): """Writes the payload to the given zip. Args: output_zip: The output ZipFile instance. """ assert self.payload_file is not None assert self.payload_properties is not None # Add the signed payload file and properties into the zip. In order to # support streaming, we pack them as ZIP_STORED. So these entries can be # read directly with the offset and length pairs. common.ZipWrite(output_zip, self.payload_file, arcname=Payload.PAYLOAD_BIN, compress_type=zipfile.ZIP_STORED) common.ZipWrite(output_zip, self.payload_properties, arcname=Payload.PAYLOAD_PROPERTIES_TXT, compress_type=zipfile.ZIP_STORED) def SignOutput(temp_zip_name, output_zip_name): pw = OPTIONS.key_passwords[OPTIONS.package_key] Loading Loading @@ -1122,12 +1238,6 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file, value += ' ' * (expected_length - len(value)) return value # The place where the output from the subprocess should go. log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE # Get the PayloadSigner to be used in step 3. payload_signer = PayloadSigner() # Stage the output zip package for package signing. temp_zip_file = tempfile.NamedTemporaryFile() output_zip = zipfile.ZipFile(temp_zip_file, "w", Loading @@ -1143,72 +1253,15 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file, # Metadata to comply with Android OTA package format. metadata = GetPackageMetadata(target_info, source_info) # 1. Generate payload. payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin") cmd = ["brillo_update_payload", "generate", "--payload", payload_file, "--target_image", target_file] if source_file is not None: cmd.extend(["--source_image", source_file]) p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload generate failed" # 2. Generate hashes of the payload and metadata files. payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") cmd = ["brillo_update_payload", "hash", "--unsigned_payload", payload_file, "--signature_size", "256", "--metadata_hash_file", metadata_sig_file, "--payload_hash_file", payload_sig_file] p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload hash failed" # Generate payload. payload = Payload() payload.Generate(target_file, source_file) # 3. Sign the hashes and insert them back into the payload file. # 3a. Sign the payload hash. signed_payload_sig_file = payload_signer.Sign(payload_sig_file) # Sign the payload. payload.Sign(PayloadSigner()) # 3b. Sign the metadata hash. signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file) # 3c. Insert the signatures back into the payload file. signed_payload_file = common.MakeTempFile(prefix="signed-payload-", suffix=".bin") cmd = ["brillo_update_payload", "sign", "--unsigned_payload", payload_file, "--payload", signed_payload_file, "--signature_size", "256", "--metadata_signature_file", signed_metadata_sig_file, "--payload_signature_file", signed_payload_sig_file] p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload sign failed" # 4. Dump the signed payload properties. properties_file = common.MakeTempFile(prefix="payload-properties-", suffix=".txt") cmd = ["brillo_update_payload", "properties", "--payload", signed_payload_file, "--properties_file", properties_file] p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) p1.communicate() assert p1.returncode == 0, "brillo_update_payload properties failed" if OPTIONS.wipe_user_data: with open(properties_file, "a") as f: f.write("POWERWASH=1\n") # Add the signed payload file and properties into the zip. In order to # support streaming, we pack payload.bin, payload_properties.txt and # care_map.txt as ZIP_STORED. So these entries can be read directly with # the offset and length pairs. common.ZipWrite(output_zip, signed_payload_file, arcname="payload.bin", compress_type=zipfile.ZIP_STORED) common.ZipWrite(output_zip, properties_file, arcname="payload_properties.txt", compress_type=zipfile.ZIP_STORED) # Write the payload into output zip. payload.WriteToZip(output_zip) # If dm-verity is supported for the device, copy contents of care_map # into A/B OTA package. Loading @@ -1219,6 +1272,8 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file, namelist = target_zip.namelist() if care_map_path in namelist: care_map_data = target_zip.read(care_map_path) # In order to support streaming, care_map.txt needs to be packed as # ZIP_STORED. common.ZipWriteStr(output_zip, "care_map.txt", care_map_data, compress_type=zipfile.ZIP_STORED) else: Loading
tools/releasetools/test_ota_from_target_files.py +157 −1 Original line number Diff line number Diff line Loading @@ -15,12 +15,14 @@ # import copy import os import os.path import unittest import zipfile import common from ota_from_target_files import ( _LoadOemDicts, BuildInfo, GetPackageMetadata, PayloadSigner, _LoadOemDicts, BuildInfo, GetPackageMetadata, Payload, PayloadSigner, WriteFingerprintAssertion) Loading Loading @@ -564,3 +566,157 @@ class PayloadSignerTest(unittest.TestCase): verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE) self._assertFilesEqual(verify_file, signed_file) class PayloadTest(unittest.TestCase): def setUp(self): self.testdata_dir = get_testdata_dir() self.assertTrue(os.path.exists(self.testdata_dir)) common.OPTIONS.wipe_user_data = False common.OPTIONS.payload_signer = None common.OPTIONS.payload_signer_args = None common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey') common.OPTIONS.key_passwords = { common.OPTIONS.package_key : None, } def tearDown(self): common.Cleanup() @staticmethod def _construct_target_files(): target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') with zipfile.ZipFile(target_files, 'w') as target_files_zip: # META/update_engine_config.txt target_files_zip.writestr( 'META/update_engine_config.txt', "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n") # META/ab_partitions.txt ab_partitions = ['boot', 'system', 'vendor'] target_files_zip.writestr( 'META/ab_partitions.txt', '\n'.join(ab_partitions)) # Create dummy images for each of them. for partition in ab_partitions: target_files_zip.writestr('IMAGES/' + partition + '.img', os.urandom(len(partition))) return target_files def _create_payload_full(self): target_file = self._construct_target_files() payload = Payload() payload.Generate(target_file) return payload def _create_payload_incremental(self): target_file = self._construct_target_files() source_file = self._construct_target_files() payload = Payload() payload.Generate(target_file, source_file) return payload def test_Generate_full(self): payload = self._create_payload_full() self.assertTrue(os.path.exists(payload.payload_file)) def test_Generate_incremental(self): payload = self._create_payload_incremental() self.assertTrue(os.path.exists(payload.payload_file)) def test_Generate_additionalArgs(self): target_file = self._construct_target_files() source_file = self._construct_target_files() payload = Payload() # This should work the same as calling payload.Generate(target_file, # source_file). payload.Generate( target_file, additional_args=["--source_image", source_file]) self.assertTrue(os.path.exists(payload.payload_file)) def test_Generate_invalidInput(self): target_file = self._construct_target_files() common.ZipDelete(target_file, 'IMAGES/vendor.img') payload = Payload() self.assertRaises(AssertionError, payload.Generate, target_file) def test_Sign_full(self): payload = self._create_payload_full() payload.Sign(PayloadSigner()) output_file = common.MakeTempFile(suffix='.zip') with zipfile.ZipFile(output_file, 'w') as output_zip: payload.WriteToZip(output_zip) import check_ota_package_signature check_ota_package_signature.VerifyAbOtaPayload( os.path.join(self.testdata_dir, 'testkey.x509.pem'), output_file) def test_Sign_incremental(self): payload = self._create_payload_incremental() payload.Sign(PayloadSigner()) output_file = common.MakeTempFile(suffix='.zip') with zipfile.ZipFile(output_file, 'w') as output_zip: payload.WriteToZip(output_zip) import check_ota_package_signature check_ota_package_signature.VerifyAbOtaPayload( os.path.join(self.testdata_dir, 'testkey.x509.pem'), output_file) def test_Sign_withDataWipe(self): common.OPTIONS.wipe_user_data = True payload = self._create_payload_full() payload.Sign(PayloadSigner()) with open(payload.payload_properties) as properties_fp: self.assertIn("POWERWASH=1", properties_fp.read()) def test_Sign_badSigner(self): """Tests that signing failure can be captured.""" payload = self._create_payload_full() payload_signer = PayloadSigner() payload_signer.signer_args.append('bad-option') self.assertRaises(AssertionError, payload.Sign, payload_signer) def test_WriteToZip(self): payload = self._create_payload_full() payload.Sign(PayloadSigner()) output_file = common.MakeTempFile(suffix='.zip') with zipfile.ZipFile(output_file, 'w') as output_zip: payload.WriteToZip(output_zip) with zipfile.ZipFile(output_file) as verify_zip: # First make sure we have the essential entries. namelist = verify_zip.namelist() self.assertIn(Payload.PAYLOAD_BIN, namelist) self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist) # Then assert these entries are stored. for entry_info in verify_zip.infolist(): if entry_info.filename not in (Payload.PAYLOAD_BIN, Payload.PAYLOAD_PROPERTIES_TXT): continue self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type) def test_WriteToZip_unsignedPayload(self): """Unsigned payloads should not be allowed to be written to zip.""" payload = self._create_payload_full() output_file = common.MakeTempFile(suffix='.zip') with zipfile.ZipFile(output_file, 'w') as output_zip: self.assertRaises(AssertionError, payload.WriteToZip, output_zip) # Also test with incremental payload. payload = self._create_payload_incremental() output_file = common.MakeTempFile(suffix='.zip') with zipfile.ZipFile(output_file, 'w') as output_zip: self.assertRaises(AssertionError, payload.WriteToZip, output_zip)