Untitled
unknown
plain_text
2 years ago
5.3 kB
13
Indexable
#!/usr/bin/env python3
import struct
import hashlib
import bz2
import sys
import argparse
import bsdiff4
import io
import os
try:
import lzma
except ImportError:
from backports import lzma
import update_metadata_pb2 as um
flatten = lambda l: [item for sublist in l for item in sublist]
def u32(x):
return struct.unpack('>I', x)[0]
def u64(x):
return struct.unpack('>Q', x)[0]
def verify_contiguous(exts):
blocks = 0
for ext in exts:
if ext.start_block != blocks:
return False
blocks += ext.num_blocks
return True
def data_for_op(op,out_file,old_file):
args.payloadfile.seek(data_offset + op.data_offset)
data = args.payloadfile.read(op.data_length)
# assert hashlib.sha256(data).digest() == op.data_sha256_hash, 'operation data hash mismatch'
if op.type == op.REPLACE_XZ:
dec = lzma.LZMADecompressor()
data = dec.decompress(data)
out_file.seek(op.dst_extents[0].start_block*block_size)
out_file.write(data)
elif op.type == op.REPLACE_BZ:
dec = bz2.BZ2Decompressor()
data = dec.decompress(data)
out_file.seek(op.dst_extents[0].start_block*block_size)
out_file.write(data)
elif op.type == op.REPLACE:
out_file.seek(op.dst_extents[0].start_block*block_size)
out_file.write(data)
elif op.type == op.SOURCE_COPY:
if not args.diff:
print ("SOURCE_COPY supported only for differential OTA")
sys.exit(-2)
out_file.seek(op.dst_extents[0].start_block*block_size)
for ext in op.src_extents:
old_file.seek(ext.start_block*block_size)
data = old_file.read(ext.num_blocks*block_size)
out_file.write(data)
elif op.type == op.SOURCE_BSDIFF:
if not args.diff:
print ("SOURCE_BSDIFF supported only for differential OTA")
sys.exit(-3)
out_file.seek(op.dst_extents[0].start_block*block_size)
tmp_buff = io.BytesIO()
for ext in op.src_extents:
old_file.seek(ext.start_block*block_size)
old_data = old_file.read(ext.num_blocks*block_size)
tmp_buff.write(old_data)
tmp_buff.seek(0)
old_data = tmp_buff.read()
tmp_buff.seek(0)
tmp_buff.write(bsdiff4.patch(old_data, data))
n = 0;
tmp_buff.seek(0)
for ext in op.dst_extents:
tmp_buff.seek(n*block_size)
n += ext.num_blocks
data = tmp_buff.read(ext.num_blocks*block_size)
out_file.seek(ext.start_block*block_size)
out_file.write(data)
elif op.type == op.ZERO:
for ext in op.dst_extents:
out_file.seek(ext.start_block*block_size)
out_file.write(b'\x00' * ext.num_blocks*block_size)
else:
print ("Unsupported type = %d" % op.type)
sys.exit(-1)
return data
def dump_part(part):
sys.stdout.write("Processing %s partition" % part.partition_name)
sys.stdout.flush()
out_file = open('%s/%s.img' % (args.out, part.partition_name), 'wb')
h = hashlib.sha256()
if args.diff:
old_file = open('%s/%s.img' % (args.old, part.partition_name), 'rb')
else:
old_file = None
for op in part.operations:
data = data_for_op(op,out_file,old_file)
sys.stdout.write(".")
sys.stdout.flush()
print("Done")
parser = argparse.ArgumentParser(description='OTA payload dumper')
parser.add_argument('payloadfile', type=argparse.FileType('rb'),
help='payload file name')
parser.add_argument('--out', default='output',
help='output directory (defaul: output)')
parser.add_argument('--diff',action='store_true',
help='extract differential OTA, you need put original images to old dir')
parser.add_argument('--old', default='old',
help='directory with original images for differential OTA (defaul: old)')
parser.add_argument('--images', default="",
help='images to extract (default: empty)')
args = parser.parse_args()
#Check for --out directory exists
if not os.path.exists(args.out):
os.makedirs(args.out)
magic = args.payloadfile.read(4)
assert magic == b'CrAU'
file_format_version = u64(args.payloadfile.read(8))
assert file_format_version == 2
manifest_size = u64(args.payloadfile.read(8))
metadata_signature_size = 0
if file_format_version > 1:
metadata_signature_size = u32(args.payloadfile.read(4))
manifest = args.payloadfile.read(manifest_size)
metadata_signature = args.payloadfile.read(metadata_signature_size)
data_offset = args.payloadfile.tell()
dam = um.DeltaArchiveManifest()
dam.ParseFromString(manifest)
block_size = dam.block_size
if args.images == "":
for part in dam.partitions:
dump_part(part)
else:
images = args.images.split(",")
for image in images:
partition = [part for part in dam.partitions if part.partition_name == image]
if partition:
dump_part(partition[0])
else:
sys.stderr.write("Partition %s not found in payload!\n" % image)Editor is loading...