Loading tools/releasetools/common.py +30 −9 Original line number Diff line number Diff line Loading @@ -717,26 +717,46 @@ class BuildInfo(object): script.AssertOemProperty(prop, values, oem_no_mount) def ReadFromInputFile(input_file, fn): """Reads the contents of fn from input zipfile or directory.""" def DoesInputFileContain(input_file, fn): """Check whether the input target_files.zip contain an entry `fn`""" if isinstance(input_file, zipfile.ZipFile): return fn in input_file.namelist() elif zipfile.is_zipfile(input_file): with zipfile.ZipFile(input_file, "r", allowZip64=True) as zfp: return fn in zfp.namelist() else: if not os.path.isdir(input_file): raise ValueError( "Invalid input_file, accepted inputs are ZipFile object, path to .zip file on disk, or path to extracted directory. Actual: " + input_file) path = os.path.join(input_file, *fn.split("/")) return os.path.exists(path) def ReadBytesFromInputFile(input_file, fn): """Reads the bytes of fn from input zipfile or directory.""" if isinstance(input_file, zipfile.ZipFile): return input_file.read(fn).decode() return input_file.read(fn) elif zipfile.is_zipfile(input_file): with zipfile.ZipFile(input_file, "r", allowZip64=True) as zfp: return zfp.read(fn).decode() return zfp.read(fn) else: if not os.path.isdir(input_file): raise ValueError( "Invalid input_file, accepted inputs are ZipFile object, path to .zip file on disk, or path to extracted directory. Actual: " + input_file) path = os.path.join(input_file, *fn.split("/")) try: with open(path) as f: with open(path, "rb") as f: return f.read() except IOError as e: if e.errno == errno.ENOENT: raise KeyError(fn) def ReadFromInputFile(input_file, fn): """Reads the str contents of fn from input zipfile or directory.""" return ReadBytesFromInputFile(input_file, fn).decode() def ExtractFromInputFile(input_file, fn): """Extracts the contents of fn from input zipfile or directory into a file.""" if isinstance(input_file, zipfile.ZipFile): Loading Loading @@ -1540,7 +1560,8 @@ def BuildVBMeta(image_path, partitions, name, needed_partitions): custom_partitions = OPTIONS.info_dict.get( "avb_custom_images_partition_list", "").strip().split() custom_avb_partitions = ["vbmeta_" + part for part in OPTIONS.info_dict.get("avb_custom_vbmeta_images_partition_list", "").strip().split()] custom_avb_partitions = ["vbmeta_" + part for part in OPTIONS.info_dict.get( "avb_custom_vbmeta_images_partition_list", "").strip().split()] for partition, path in partitions.items(): if partition not in needed_partitions: Loading Loading @@ -2966,7 +2987,6 @@ def ZipDelete(zip_filename, entries, force=False): cmd.append(entry) RunAndCheckOutput(cmd) os.replace(new_zipfile, zip_filename) Loading Loading @@ -4071,6 +4091,7 @@ def IsSparseImage(filepath): # https://source.android.com/devices/bootloader/images return fp.read(4) == b'\x3A\xFF\x26\xED' def ParseUpdateEngineConfig(path: str): """Parse the update_engine config stored in file `path` Args Loading tools/releasetools/ota_from_target_files.py +18 −16 Original line number Diff line number Diff line Loading @@ -267,8 +267,8 @@ import care_map_pb2 import common import ota_utils from ota_utils import (UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata, PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME) from common import IsSparseImage PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME, CopyTargetFilesDir) from common import DoesInputFileContain, IsSparseImage import target_files_diff from check_target_files_vintf import CheckVintfIfTrebleEnabled from non_ab_ota import GenerateNonAbOtaPackage Loading Loading @@ -830,6 +830,12 @@ def SupportsMainlineGkiUpdates(target_file): def GenerateAbOtaPackage(target_file, output_file, source_file=None): """Generates an Android OTA package that has A/B update payload.""" # If input target_files are directories, create a copy so that we can modify # them directly if os.path.isdir(target_file): target_file = CopyTargetFilesDir(target_file) if source_file is not None and os.path.isdir(source_file): source_file = CopyTargetFilesDir(source_file) # Stage the output zip package for package signing. if not OPTIONS.no_signing: staging_file = common.MakeTempFile(suffix='.zip') Loading @@ -840,6 +846,7 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None): allowZip64=True) if source_file is not None: source_file = ota_utils.ExtractTargetFiles(source_file) assert "ab_partitions" in OPTIONS.source_info_dict, \ "META/ab_partitions.txt is required for ab_update." assert "ab_partitions" in OPTIONS.target_info_dict, \ Loading Loading @@ -942,9 +949,8 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None): elif OPTIONS.skip_postinstall: target_file = GetTargetFilesZipWithoutPostinstallConfig(target_file) # Target_file may have been modified, reparse ab_partitions with zipfile.ZipFile(target_file, allowZip64=True) as zfp: target_info.info_dict['ab_partitions'] = zfp.read( AB_PARTITIONS).decode().strip().split("\n") target_info.info_dict['ab_partitions'] = common.ReadFromInputFile(target_file, AB_PARTITIONS).strip().split("\n") CheckVintfIfTrebleEnabled(target_file, target_info) Loading Loading @@ -1042,15 +1048,13 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None): # If dm-verity is supported for the device, copy contents of care_map # into A/B OTA package. target_zip = zipfile.ZipFile(target_file, "r", allowZip64=True) if target_info.get("avb_enable") == "true": care_map_list = [x for x in ["care_map.pb", "care_map.txt"] if "META/" + x in target_zip.namelist()] # Adds care_map if either the protobuf format or the plain text one exists. if care_map_list: care_map_name = care_map_list[0] care_map_data = target_zip.read("META/" + care_map_name) for care_map_name in ["care_map.pb", "care_map.txt"]: if not DoesInputFileContain(target_file, "META/" + care_map_name): continue care_map_data = common.ReadBytesFromInputFile( target_file, "META/" + care_map_name) # In order to support streaming, care_map needs to be packed as # ZIP_STORED. common.ZipWriteStr(output_zip, care_map_name, care_map_data, Loading @@ -1060,13 +1064,11 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None): # Add the source apex version for incremental ota updates, and write the # result apex info to the ota package. ota_apex_info = ota_utils.ConstructOtaApexInfo(target_zip, source_file) ota_apex_info = ota_utils.ConstructOtaApexInfo(target_file, source_file) if ota_apex_info is not None: common.ZipWriteStr(output_zip, "apex_info.pb", ota_apex_info, compress_type=zipfile.ZIP_STORED) common.ZipClose(target_zip) # We haven't written the metadata entry yet, which will be handled in # FinalizeMetadata(). common.ZipClose(output_zip) Loading Loading @@ -1257,7 +1259,7 @@ def main(argv): if OPTIONS.extracted_input is not None: OPTIONS.info_dict = common.LoadInfoDict(OPTIONS.extracted_input) else: OPTIONS.info_dict = ParseInfoDict(args[0]) OPTIONS.info_dict = common.LoadInfoDict(args[0]) if OPTIONS.wipe_user_data: if not OPTIONS.vabc_downgrade: Loading tools/releasetools/ota_utils.py +31 −7 Original line number Diff line number Diff line Loading @@ -22,7 +22,8 @@ import zipfile import ota_metadata_pb2 import common from common import (ZipDelete, ZipClose, OPTIONS, MakeTempFile, import fnmatch from common import (ZipDelete, DoesInputFileContain, ReadBytesFromInputFile, OPTIONS, MakeTempFile, ZipWriteStr, BuildInfo, LoadDictionaryFromFile, SignFile, PARTITIONS_WITH_BUILD_PROP, PartitionBuildProps, GetRamdiskFormat, ParseUpdateEngineConfig) Loading @@ -44,7 +45,8 @@ OPTIONS.boot_variable_file = None METADATA_NAME = 'META-INF/com/android/metadata' METADATA_PROTO_NAME = 'META-INF/com/android/metadata.pb' UNZIP_PATTERN = ['IMAGES/*', 'META/*', 'OTA/*', 'RADIO/*'] UNZIP_PATTERN = ['IMAGES/*', 'META/*', 'OTA/*', 'RADIO/*', '*/build.prop', '*/default.prop', '*/build.default', "*/etc/vintf/*"] SECURITY_PATCH_LEVEL_PROP_NAME = "ro.build.version.security_patch" Loading Loading @@ -626,12 +628,10 @@ def ConstructOtaApexInfo(target_zip, source_file=None): """If applicable, add the source version to the apex info.""" def _ReadApexInfo(input_zip): if "META/apex_info.pb" not in input_zip.namelist(): if not DoesInputFileContain(input_zip, "META/apex_info.pb"): logger.warning("target_file doesn't contain apex_info.pb %s", input_zip) return None with input_zip.open("META/apex_info.pb", "r") as zfp: return zfp.read() return ReadBytesFromInputFile(input_zip, "META/apex_info.pb") target_apex_string = _ReadApexInfo(target_zip) # Return early if the target apex info doesn't exist or is empty. Loading Loading @@ -727,7 +727,7 @@ def ExtractTargetFiles(path: str): logger.info("target files %s is already extracted", path) return path extracted_dir = common.MakeTempDir("target_files") common.UnzipToDir(path, extracted_dir, UNZIP_PATTERN) common.UnzipToDir(path, extracted_dir, UNZIP_PATTERN + [""]) return extracted_dir Loading Loading @@ -1040,3 +1040,27 @@ class AbOtaPropertyFiles(StreamingPropertyFiles): assert metadata_total <= payload_size return (payload_offset, metadata_total) def Fnmatch(filename, pattersn): return any([fnmatch.fnmatch(filename, pat) for pat in pattersn]) def CopyTargetFilesDir(input_dir): output_dir = common.MakeTempDir("target_files") shutil.copytree(os.path.join(input_dir, "IMAGES"), os.path.join( output_dir, "IMAGES"), dirs_exist_ok=True) shutil.copytree(os.path.join(input_dir, "META"), os.path.join( output_dir, "META"), dirs_exist_ok=True) for (dirpath, _, filenames) in os.walk(input_dir): for filename in filenames: path = os.path.join(dirpath, filename) relative_path = path.removeprefix(input_dir).removeprefix("/") if not Fnmatch(relative_path, UNZIP_PATTERN): continue if filename.endswith(".prop") or filename == "prop.default" or "/etc/vintf/" in relative_path: target_path = os.path.join( output_dir, relative_path) os.makedirs(os.path.dirname(target_path), exist_ok=True) shutil.copy(path, target_path) return output_dir Loading
tools/releasetools/common.py +30 −9 Original line number Diff line number Diff line Loading @@ -717,26 +717,46 @@ class BuildInfo(object): script.AssertOemProperty(prop, values, oem_no_mount) def ReadFromInputFile(input_file, fn): """Reads the contents of fn from input zipfile or directory.""" def DoesInputFileContain(input_file, fn): """Check whether the input target_files.zip contain an entry `fn`""" if isinstance(input_file, zipfile.ZipFile): return fn in input_file.namelist() elif zipfile.is_zipfile(input_file): with zipfile.ZipFile(input_file, "r", allowZip64=True) as zfp: return fn in zfp.namelist() else: if not os.path.isdir(input_file): raise ValueError( "Invalid input_file, accepted inputs are ZipFile object, path to .zip file on disk, or path to extracted directory. Actual: " + input_file) path = os.path.join(input_file, *fn.split("/")) return os.path.exists(path) def ReadBytesFromInputFile(input_file, fn): """Reads the bytes of fn from input zipfile or directory.""" if isinstance(input_file, zipfile.ZipFile): return input_file.read(fn).decode() return input_file.read(fn) elif zipfile.is_zipfile(input_file): with zipfile.ZipFile(input_file, "r", allowZip64=True) as zfp: return zfp.read(fn).decode() return zfp.read(fn) else: if not os.path.isdir(input_file): raise ValueError( "Invalid input_file, accepted inputs are ZipFile object, path to .zip file on disk, or path to extracted directory. Actual: " + input_file) path = os.path.join(input_file, *fn.split("/")) try: with open(path) as f: with open(path, "rb") as f: return f.read() except IOError as e: if e.errno == errno.ENOENT: raise KeyError(fn) def ReadFromInputFile(input_file, fn): """Reads the str contents of fn from input zipfile or directory.""" return ReadBytesFromInputFile(input_file, fn).decode() def ExtractFromInputFile(input_file, fn): """Extracts the contents of fn from input zipfile or directory into a file.""" if isinstance(input_file, zipfile.ZipFile): Loading Loading @@ -1540,7 +1560,8 @@ def BuildVBMeta(image_path, partitions, name, needed_partitions): custom_partitions = OPTIONS.info_dict.get( "avb_custom_images_partition_list", "").strip().split() custom_avb_partitions = ["vbmeta_" + part for part in OPTIONS.info_dict.get("avb_custom_vbmeta_images_partition_list", "").strip().split()] custom_avb_partitions = ["vbmeta_" + part for part in OPTIONS.info_dict.get( "avb_custom_vbmeta_images_partition_list", "").strip().split()] for partition, path in partitions.items(): if partition not in needed_partitions: Loading Loading @@ -2966,7 +2987,6 @@ def ZipDelete(zip_filename, entries, force=False): cmd.append(entry) RunAndCheckOutput(cmd) os.replace(new_zipfile, zip_filename) Loading Loading @@ -4071,6 +4091,7 @@ def IsSparseImage(filepath): # https://source.android.com/devices/bootloader/images return fp.read(4) == b'\x3A\xFF\x26\xED' def ParseUpdateEngineConfig(path: str): """Parse the update_engine config stored in file `path` Args Loading
tools/releasetools/ota_from_target_files.py +18 −16 Original line number Diff line number Diff line Loading @@ -267,8 +267,8 @@ import care_map_pb2 import common import ota_utils from ota_utils import (UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata, PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME) from common import IsSparseImage PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME, CopyTargetFilesDir) from common import DoesInputFileContain, IsSparseImage import target_files_diff from check_target_files_vintf import CheckVintfIfTrebleEnabled from non_ab_ota import GenerateNonAbOtaPackage Loading Loading @@ -830,6 +830,12 @@ def SupportsMainlineGkiUpdates(target_file): def GenerateAbOtaPackage(target_file, output_file, source_file=None): """Generates an Android OTA package that has A/B update payload.""" # If input target_files are directories, create a copy so that we can modify # them directly if os.path.isdir(target_file): target_file = CopyTargetFilesDir(target_file) if source_file is not None and os.path.isdir(source_file): source_file = CopyTargetFilesDir(source_file) # Stage the output zip package for package signing. if not OPTIONS.no_signing: staging_file = common.MakeTempFile(suffix='.zip') Loading @@ -840,6 +846,7 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None): allowZip64=True) if source_file is not None: source_file = ota_utils.ExtractTargetFiles(source_file) assert "ab_partitions" in OPTIONS.source_info_dict, \ "META/ab_partitions.txt is required for ab_update." assert "ab_partitions" in OPTIONS.target_info_dict, \ Loading Loading @@ -942,9 +949,8 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None): elif OPTIONS.skip_postinstall: target_file = GetTargetFilesZipWithoutPostinstallConfig(target_file) # Target_file may have been modified, reparse ab_partitions with zipfile.ZipFile(target_file, allowZip64=True) as zfp: target_info.info_dict['ab_partitions'] = zfp.read( AB_PARTITIONS).decode().strip().split("\n") target_info.info_dict['ab_partitions'] = common.ReadFromInputFile(target_file, AB_PARTITIONS).strip().split("\n") CheckVintfIfTrebleEnabled(target_file, target_info) Loading Loading @@ -1042,15 +1048,13 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None): # If dm-verity is supported for the device, copy contents of care_map # into A/B OTA package. target_zip = zipfile.ZipFile(target_file, "r", allowZip64=True) if target_info.get("avb_enable") == "true": care_map_list = [x for x in ["care_map.pb", "care_map.txt"] if "META/" + x in target_zip.namelist()] # Adds care_map if either the protobuf format or the plain text one exists. if care_map_list: care_map_name = care_map_list[0] care_map_data = target_zip.read("META/" + care_map_name) for care_map_name in ["care_map.pb", "care_map.txt"]: if not DoesInputFileContain(target_file, "META/" + care_map_name): continue care_map_data = common.ReadBytesFromInputFile( target_file, "META/" + care_map_name) # In order to support streaming, care_map needs to be packed as # ZIP_STORED. common.ZipWriteStr(output_zip, care_map_name, care_map_data, Loading @@ -1060,13 +1064,11 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None): # Add the source apex version for incremental ota updates, and write the # result apex info to the ota package. ota_apex_info = ota_utils.ConstructOtaApexInfo(target_zip, source_file) ota_apex_info = ota_utils.ConstructOtaApexInfo(target_file, source_file) if ota_apex_info is not None: common.ZipWriteStr(output_zip, "apex_info.pb", ota_apex_info, compress_type=zipfile.ZIP_STORED) common.ZipClose(target_zip) # We haven't written the metadata entry yet, which will be handled in # FinalizeMetadata(). common.ZipClose(output_zip) Loading Loading @@ -1257,7 +1259,7 @@ def main(argv): if OPTIONS.extracted_input is not None: OPTIONS.info_dict = common.LoadInfoDict(OPTIONS.extracted_input) else: OPTIONS.info_dict = ParseInfoDict(args[0]) OPTIONS.info_dict = common.LoadInfoDict(args[0]) if OPTIONS.wipe_user_data: if not OPTIONS.vabc_downgrade: Loading
tools/releasetools/ota_utils.py +31 −7 Original line number Diff line number Diff line Loading @@ -22,7 +22,8 @@ import zipfile import ota_metadata_pb2 import common from common import (ZipDelete, ZipClose, OPTIONS, MakeTempFile, import fnmatch from common import (ZipDelete, DoesInputFileContain, ReadBytesFromInputFile, OPTIONS, MakeTempFile, ZipWriteStr, BuildInfo, LoadDictionaryFromFile, SignFile, PARTITIONS_WITH_BUILD_PROP, PartitionBuildProps, GetRamdiskFormat, ParseUpdateEngineConfig) Loading @@ -44,7 +45,8 @@ OPTIONS.boot_variable_file = None METADATA_NAME = 'META-INF/com/android/metadata' METADATA_PROTO_NAME = 'META-INF/com/android/metadata.pb' UNZIP_PATTERN = ['IMAGES/*', 'META/*', 'OTA/*', 'RADIO/*'] UNZIP_PATTERN = ['IMAGES/*', 'META/*', 'OTA/*', 'RADIO/*', '*/build.prop', '*/default.prop', '*/build.default', "*/etc/vintf/*"] SECURITY_PATCH_LEVEL_PROP_NAME = "ro.build.version.security_patch" Loading Loading @@ -626,12 +628,10 @@ def ConstructOtaApexInfo(target_zip, source_file=None): """If applicable, add the source version to the apex info.""" def _ReadApexInfo(input_zip): if "META/apex_info.pb" not in input_zip.namelist(): if not DoesInputFileContain(input_zip, "META/apex_info.pb"): logger.warning("target_file doesn't contain apex_info.pb %s", input_zip) return None with input_zip.open("META/apex_info.pb", "r") as zfp: return zfp.read() return ReadBytesFromInputFile(input_zip, "META/apex_info.pb") target_apex_string = _ReadApexInfo(target_zip) # Return early if the target apex info doesn't exist or is empty. Loading Loading @@ -727,7 +727,7 @@ def ExtractTargetFiles(path: str): logger.info("target files %s is already extracted", path) return path extracted_dir = common.MakeTempDir("target_files") common.UnzipToDir(path, extracted_dir, UNZIP_PATTERN) common.UnzipToDir(path, extracted_dir, UNZIP_PATTERN + [""]) return extracted_dir Loading Loading @@ -1040,3 +1040,27 @@ class AbOtaPropertyFiles(StreamingPropertyFiles): assert metadata_total <= payload_size return (payload_offset, metadata_total) def Fnmatch(filename, pattersn): return any([fnmatch.fnmatch(filename, pat) for pat in pattersn]) def CopyTargetFilesDir(input_dir): output_dir = common.MakeTempDir("target_files") shutil.copytree(os.path.join(input_dir, "IMAGES"), os.path.join( output_dir, "IMAGES"), dirs_exist_ok=True) shutil.copytree(os.path.join(input_dir, "META"), os.path.join( output_dir, "META"), dirs_exist_ok=True) for (dirpath, _, filenames) in os.walk(input_dir): for filename in filenames: path = os.path.join(dirpath, filename) relative_path = path.removeprefix(input_dir).removeprefix("/") if not Fnmatch(relative_path, UNZIP_PATTERN): continue if filename.endswith(".prop") or filename == "prop.default" or "/etc/vintf/" in relative_path: target_path = os.path.join( output_dir, relative_path) os.makedirs(os.path.dirname(target_path), exist_ok=True) shutil.copy(path, target_path) return output_dir