LPT26x-HSF-4MB-Hilink_14.2..../build/script/build_upg_pkg.py
2025-05-13 22:00:58 +08:00

925 lines
39 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# coding=utf-8
'''
* Copyright (c) HiSilicon (Shanghai) Technologies Co., Ltd. 2022-2023. All rights reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Description: part of hupg build scripts
'''
import argparse
import configparser
import os
import re
import hashlib
import sys
import subprocess
import copy
import struct
import platform
from ctypes import *
###############################Defining Basic Types###############################################
td_char = c_char
td_s8 = c_byte
td_u8 = c_ubyte
td_s16 = c_short
td_u16 = c_ushort
td_s32 = c_int
td_u32 = c_uint
file_dir = os.path.dirname(os.path.realpath(__file__))
g_root = os.path.realpath(os.path.join(file_dir, "..", ".."))
class fota_key_area_data(Structure):
pass
class fota_info_area_data(Structure):
pass
class upg_sha256_sign(Structure):
_fields_ = [
("check_sum", td_u8 * 32),
("padding", td_u8 * 224),
]
class fota_image_hash_node(Structure):
_fields_ = [
("image_id", td_u32),
("image_addr", td_u32),
("image_length", td_u32),
("image_hash", td_u8 * 32),
]
class fota_image_head(Structure):
_fields_ = [
("header_magic", td_u32),
("image_id", td_u32),
("image_offset", td_u32),
("image_len", td_u32),
("image_hash", td_u8 * 32),
("old_image_len", td_u32),
("old_image_hash", td_u8 * 32),
("new_image_len", td_u32),
("version_ext", td_u32),
("version_mask", td_u32),
("decompress_flag", td_u32),
("re_enc_flag", td_u32),
("root_key_type", td_u32),
("enc_pk_l1", td_u8 * 16),
("enc_pk_l2", td_u8 * 16),
("iv", td_u8 * 16),
("padding", td_u8 * 4),
]
class image_key_area(Structure):
_fields_ = [
("image_id", td_u32),
("structure_version", td_u32),
("structure_length", td_u32),
("signature_length", td_u32),
("key_owner_id", td_u32),
("key_id", td_u32),
("key_alg", td_u32),
("ecc_curve_type", td_u32),
("key_length", td_u32),
("key_version_ext", td_u32),
("mask_key_version_ext", td_u32),
("msid_ext", td_u32),
("mask_msid_ext", td_u32),
("maintenance_mode", td_u32),
("die_id", td_u8 * 16),
("code_info_addr", td_u32),
("reserved", td_u8 * 52),
("ext_pulic_key_area", td_u8 * 64),
("sig_key_area", td_u8 * 64),
]
class image_code_area(Structure):
_fields_ = [
("image_id", td_u32),
("structure_version", td_u32),
("structure_length", td_u32),
("signature_length", td_u32),
("version_ext", td_u32),
("mask_version_ext", td_u32),
("msid_ext", td_u32),
("mask_msid_ext", td_u32),
("code_area_addr", td_u32),
("code_area_len", td_u32),
("code_area_hash", td_u8 * 32),
("code_enc_flag", td_u32),
("protection_key_l1", td_u8 * 16),
("protection_key_l2", td_u8 * 16),
("iv", td_u8 * 16),
("code_compress_flag", td_u32),
("code_uncompress_len", td_u32),
("text_segment_size", td_u32),
("reserved", td_u8 * 248),
("sig_code_info", td_u8 * 16),
("sig_code_info_ext", td_u8 * 16),
]
class imageInfo:
def __init__(self, path):
with open(path, 'rb') as fp:
image_bin = fp.read()
key_ver = image_bin[36:40]
key_mask = image_bin[40:44]
key_area_len = sizeof(image_key_area)
code_ver = image_bin[key_area_len + 16:key_area_len + 20]
code_mask = image_bin[key_area_len + 20:key_area_len + 24]
self.key_ver = struct.unpack("I", key_ver)
self.key_mask = struct.unpack("I", key_mask)
self.code_ver = struct.unpack("I", code_ver)
self.code_mask = struct.unpack("I", code_mask)
def fota_format_init(upg_format_path):
sys.path.append(upg_format_path)
from fota_format_st import fota_key_area_data_fields, fota_info_area_data_fields
fota_key_area_data._fields_ = fota_key_area_data_fields
fota_info_area_data._fields_ = fota_info_area_data_fields
def hex2dec(string_num):
return int(string_num.upper(), 16)
def fill_fota_key_area(cf, key_ver, key_mask):
key_area_len = sizeof(fota_key_area_data)
temp_offset = [0]
temp_offset[0] = temp_offset[0] + key_area_len
fota_key_bin_tmp = bytearray(temp_offset[0])
fota_key_bin = fota_key_bin_tmp[0:key_area_len]
fota_key_head = fota_key_area_data.from_buffer(fota_key_bin)
fota_key_head.image_id = hex2dec(cf.get('FOTA_KEY_AREA', 'ImageId'))
fota_key_head.struct_version = hex2dec(cf.get('FOTA_KEY_AREA', 'StructVersion'))
fota_key_head.struct_length = key_area_len
fota_key_head.key_owner_id = int(cf.get('FOTA_KEY_AREA', 'KeyOwnerId'))
fota_key_head.key_id = int(cf.get('FOTA_KEY_AREA', 'KeyId'))
fota_key_head.key_alg = hex2dec(cf.get('FOTA_KEY_AREA', 'KeyAlg'))
if (fota_key_head.key_alg == 0x2A13C856 or fota_key_head.key_alg == 0x2A13C867):
fota_key_head.signature_length = 32
else:
fota_key_head.signature_length = sizeof(fota_key_head.sig_fota_key_area)
fota_key_head.ecc_curve_type = 0
fota_key_head.key_length = 0
# fota_key_head.fota_key_version_ext = key_ver # Version of FOTA_External_Public_Key
# fota_key_head.mask_fota_key_version_ext = key_mask
fota_key_head.fota_key_version_ext = hex2dec(cf.get('FOTA_KEY_AREA', 'KeyVersion'))
fota_key_head.mask_fota_key_version_ext = hex2dec(cf.get('FOTA_KEY_AREA', 'KeyVersionMask'))
fota_key_head.msid_ext = hex2dec(cf.get('FOTA_KEY_AREA', 'Msid'))
fota_key_head.mask_msid_ext = hex2dec(cf.get('FOTA_KEY_AREA', 'MsidMask'))
fota_key_head.maintenance_mode = 0
# fota_key_head.die_id = 0
fota_key_head.fota_info_addr = 0 # 相对fota key区末尾的地址0表示key area后接fota info区
# 剩下3个预留区、公钥、签名由签名工具填充
return key_area_len, fota_key_bin
def fill_fota_info_area(cf, image_num, fota_obj):
area_len = sizeof(fota_info_area_data)
key_alg = hex2dec(cf.get('FOTA_KEY_AREA', 'KeyAlg'))
temp_offset = [0]
temp_offset[0] = temp_offset[0] + area_len
upg_bin = bytearray(temp_offset[0])
upg_bin = upg_bin[0:area_len]
fota_info_head = fota_info_area_data.from_buffer(upg_bin)
fota_info_head.image_id = hex2dec(cf.get('FOTA_INFO_AREA', 'ImageId'))
fota_info_head.struct_version = hex2dec(cf.get('FOTA_INFO_AREA', 'StructVersion'))
fota_info_head.struct_length = area_len
if (key_alg == 0x2A13C856 or key_alg == 0x2A13C867):
fota_info_head.signature_length = 32
else:
fota_info_head.signature_length = sizeof(fota_info_head.sign_fota_info)
fota_info_head.fota_version_ext = hex2dec(cf.get('FOTA_INFO_AREA', 'Version'))
fota_info_head.mask_fota_version_ext = hex2dec(cf.get('FOTA_INFO_AREA', 'VersionMask'))
fota_info_head.msid_ext = hex2dec(cf.get('FOTA_INFO_AREA', 'Msid'))
fota_info_head.mask_msid_ext = hex2dec(cf.get('FOTA_INFO_AREA', 'MsidMask'))
# fota_info_head.image_hash_table_addr = 0 # 所有镜像处理完后再计算hash
# fota_info_head.image_hash_table_length = 0
# fota_info_head.image_hash_table_hash = 0
fota_info_head.image_num = image_num
fota_info_head.hardware_id = int(cf.get('FOTA_INFO_AREA', 'HardwareID'))
if hasattr(fota_obj, 'hardware_id') and fota_info_head.hardware_id == 0:
fota_info_head.hardware_id = fota_obj.hardware_id
#fota_info_head.reserved = 0 # 预留给用户
#fota_info_head.sign_fota_info = 0 # 签名
return area_len, upg_bin
def set_fota_info_area(fota_info_len, upg_bin, hash_table_addr, hash_table_length, hash_table_hash):
tmp_bin = upg_bin[0:fota_info_len]
fota_info_head = fota_info_area_data.from_buffer(tmp_bin)
fota_info_head.image_hash_table_addr = hash_table_addr
fota_info_head.image_hash_table_length = hash_table_length
tmp_bin[40:72] = hash_table_hash[0:32] # 填充fota_info_head.image_hash_table_hash
upg_bin[0:fota_info_len] = tmp_bin
return
def make_sha256_unsecure_signature(content):
# common段非安全签名
# Non-secure signature of common field
signature_bin = bytearray(sizeof(upg_sha256_sign))
signature = upg_sha256_sign.from_buffer(signature_bin)
common_head_sh = hashlib.sha256()
common_head_sh.update(content)
common_head_hash = common_head_sh.digest()
signature_bin[0:sizeof(signature.check_sum)] = common_head_hash
return signature_bin
def fill_fota_image_hash_node(hash_bin, image_id, image_index, tmp_hash, addr_offset, image_length):
hash_node_size = sizeof(fota_image_hash_node)
start_offset = image_index * hash_node_size
tmp_hash_bin = bytearray(hash_node_size)
fota_hash= fota_image_hash_node.from_buffer(tmp_hash_bin)
fota_hash.image_id = image_id
fota_hash.image_addr = addr_offset
fota_hash.image_length = image_length
tmp_hash_bin[12:44] = tmp_hash[0:32]
hash_bin[start_offset:start_offset + hash_node_size] = tmp_hash_bin
return
def lzma_compress_bin(src_file, dst_file, lzma_tool):
# print('lzma compress tool :', lzma_tool)
# print('lzma src file :', src_file)
# print('lzma out file :', dst_file)
cmd_list0 = []
if platform.system().lower() == "linux":
cmd_list0.append('chmod')
cmd_list0.append('755')
cmd_list0.append(lzma_tool)
elif platform.system().lower() == "windows":
cmd_list0.append('cacls')
cmd_list0.append(lzma_tool)
cmd_list0.append('/p everyone:f /e')
str_cmd=' '.join(cmd_list0)
ret = subprocess.run(str_cmd, shell=True)
if ret.returncode != 0:
sys.exit("chmod lzma tool failed: %s" % ret)
cmd_list = []
cmd_list.append(lzma_tool)
cmd_list.append('-d12 -lc0 -lp0 e')
cmd_list.append(src_file)
cmd_list.append(dst_file)
str_cmd=' '.join(cmd_list)
ret = subprocess.run(str_cmd, shell=True)
if ret.returncode != 0:
sys.exit("run lzma tool failed: %s" % ret)
def build_diff_cfg_file(fota_obj, basename, image_pairs, image_bin_file):
diff_cfg_file = os.path.join(fota_obj.temp_dir, '%s_diff.cfg'%basename)
diff_cfg_content = [
'OldVersionFile="%s"' % image_pairs[0],
'NewVersionFile="%s"' % image_pairs[1],
'DiffFile="%s"' % image_bin_file
]
remove_file(diff_cfg_file)
create_dirs(os.path.dirname(diff_cfg_file))
with open(diff_cfg_file, 'w') as fp:
fp.write('\n'.join(diff_cfg_content))
return diff_cfg_file
# 解密要求16byte对齐将lzma压缩头填充至16字节对齐[head(13byte) + body(N byte)] -> [head(16byte) + body(N byte)]
def fill_lzma_head(upg_lzma_file, filepath, filename):
temp_offset3 = [0]
dst_bin = bytearray(temp_offset3[0])
file_sta = os.stat(upg_lzma_file)
image_len = file_sta.st_size
with open(upg_lzma_file, 'rb') as fp:
src_bin= fp.read()
head_len = 13; # lzma压缩头长度
align_len = 16 # 16byte对齐后的lzma压缩头长度
dst_bin[0:head_len] = src_bin[0:head_len]
dst_bin[head_len:align_len] = bytearray(align_len - head_len)
dst_bin[align_len:align_len + image_len - head_len] = src_bin[head_len:image_len]
remove_file(upg_lzma_file)
dst_lzma_file = upg_lzma_file
with open(dst_lzma_file, 'wb') as fp:
fp.write(dst_bin)
return dst_lzma_file
def get_image_bin_data(fota_image, image_confg_key, fota_obj):
image_bin_file = fota_obj.input_dict.get(image_confg_key)
fota_image.new_image_len = os.stat(image_bin_file).st_size
(filepath, tempfilename) = os.path.split(image_bin_file)
(filename, extension) = os.path.splitext(tempfilename)
if fota_image.image_id == 0xCB9E063C: # NV镜像不做处理保持原数据
pass
elif fota_image.decompress_flag == 0x3C7896E1: # 压缩ID
# 制作压缩升级文件
upg_lzma_file = os.path.join(filepath, '%s.lzma'%filename)
lzma_compress_bin('"%s"' % image_bin_file, '"%s"' % upg_lzma_file, fota_obj.lzma_tool)
image_bin_file = fill_lzma_head(upg_lzma_file, filepath, filename)
elif fota_image.decompress_flag == 0x44494646: # 差分ID
# 制作差分升级文件
if fota_obj.diff_image_info == None:
sys.exit("[ERROR] diff image dict is None!!!")
image_pairs = fota_obj.diff_image_info.get(image_confg_key)
if image_pairs == None:
sys.exit("[ERROR] diff bin is None!!!")
# 镜像头记录新老镜像长度
fota_image.old_image_len = os.stat(image_pairs[0]).st_size
fota_image.new_image_len = os.stat(image_pairs[1]).st_size
# 制作旧镜像hash
with open(image_pairs[0], 'rb') as fp: # file为旧镜像
old_image_bin = fp.read()
old_image_hash = make_sha256_unsecure_signature(old_image_bin)
fota_image.old_image_hash[:] = old_image_hash[0:32]
# 差分后的镜像
image_bin_file = os.path.join(os.path.dirname(image_pairs[1]), '%s_diff.bin'%filename)
# 差分配置文件
diff_cfg_file = build_diff_cfg_file(fota_obj, filename, image_pairs, image_bin_file)
cmd = ' '.join([fota_obj.upg_tool, '4', '"%s"' % diff_cfg_file])
ret = subprocess.run(cmd, shell=True)
if ret.returncode != 0:
sys.exit("run diff tool failed: %d" % ret)
with open(image_bin_file, 'rb') as fp: # 不压缩时新镜像和原镜像一致, file为原镜像
image_bin= fp.read()
file_stats = os.stat(image_bin_file)
return image_bin, file_stats.st_size
def fill_fota_image_head(fota_image, cf, image_confg_key, start_offset, image_offset, fota_image_head_size):
fota_image.header_magic = hex2dec(cf.get(image_confg_key, 'HeaderMagic'))
fota_image.image_id = hex2dec(cf.get(image_confg_key, 'ImageId'))
fota_image.image_offset = start_offset + image_offset + fota_image_head_size
fota_image.old_image_len = 0
fota_image.new_image_len = 0
# fota_image.version_ext = int(''.join(map(str, info.code_ver)))
# fota_image.version_mask = int(''.join(map(str, info.code_mask)))
fota_image.version_ext = hex2dec(cf.get(image_confg_key, 'version_ext'))
fota_image.version_mask = hex2dec(cf.get(image_confg_key, 'version_mask'))
fota_image.decompress_flag = hex2dec(cf.get(image_confg_key, 'DecompressFlag'))
fota_image.re_enc_flag = hex2dec(cf.get(image_confg_key, 'ReRncFlag'))
fota_image.root_key_type = hex2dec(cf.get(image_confg_key, 'RootKeyType'))
def get_image_bin_final(image_bin, image_len):
temp_offset1 = [0]
image_bin_1 = bytearray(temp_offset1[0])
image_bin_1[0:image_len] = image_bin
tmp = image_len % 16
if tmp != 0: # 对镜像长度按照16B对齐
tmp_hash_bin = bytearray(16 - tmp)
image_bin_1[image_len:image_len + 16 - tmp] = tmp_hash_bin
image_len = image_len + 16 - tmp
return image_bin_1, image_len
def fill_fota_image_secure_head(upg_bin, fota_image, cf, image_confg_key, image_bin):
tmp_hash = make_sha256_unsecure_signature(image_bin)
fota_image.image_hash[0:32] = tmp_hash[0:32] # 填充fota_image.image_hash
# 填充加密字段
ReRncFlag = cf.get(image_confg_key, 'ReRncFlag')
if ReRncFlag == "0x3C7896E1":
fota_image_head_size = sizeof(fota_image_head)
if cf.get(image_confg_key, 'ProtectionKeyL1') != "":
data_byte1 = bytes.fromhex(cf.get(image_confg_key, 'ProtectionKeyL1'))
fota_image.enc_pk_l1[0:16] = data_byte1
if cf.get(image_confg_key, 'ProtectionKeyL2') != "":
data_byte2 = bytes.fromhex(cf.get(image_confg_key, 'ProtectionKeyL2'))
fota_image.enc_pk_l2[0:16] = data_byte2
if cf.get(image_confg_key, 'Iv') != "":
data_byte3 = bytes.fromhex(cf.get(image_confg_key, 'Iv'))
fota_image.iv[0:16] = data_byte3
def create_encry_cfg(fota_obj, upg_align_file, upg_encry_file):
remove_file(fota_obj.encry_conf_file)
create_dirs(os.path.dirname(fota_obj.encry_conf_file))
# 从基础文件中读取SIGN_CFG段并写入到加密配置文件
with open(fota_obj.encry_conf_file, 'w') as pro_file:
for item in fota_obj.base_cfg.items("SIGN_CFG"):
if item[0] == 'UpgImagePath':
pro_file.write('SrcFile="%s"\n'%upg_align_file)
elif item[0] == 'UpgSignedImagePath':
pro_file.write('DstFile="%s"\n'%upg_encry_file)
elif item[0] == 'SignSuite':
pro_file.write('SignSuite=1\n')
for item in fota_obj.base_cfg.items("application"):
if item[0] == 'PlainKey':
pro_file.write('PlainKey=%s\n'%item[1])
if item[0] == 'Iv':
pro_file.write('Iv=%s\n'%item[1])
def encry_ota_image(cf, image_confg_key, image_bin_1, fota_obj):
image_bin_file = fota_obj.input_dict.get(image_confg_key)
(filepath, tempfilename) = os.path.split(image_bin_file)
(filename, extension) = os.path.splitext(tempfilename)
upg_align_file = os.path.join(filepath, '%s.lzma.align'%filename) # 压缩并16字节对齐的文件
upg_encry_file = os.path.join(filepath, '%s.lzma.encry'%filename) # 加密文件
with open(upg_align_file, 'wb+') as fp:
fp.write(image_bin_1)
create_encry_cfg(fota_obj, upg_align_file, upg_encry_file)
cmd = ' '.join([fota_obj.upg_tool, '5', '"%s"' % fota_obj.encry_conf_file])
subprocess.run(cmd, shell=True)
with open(upg_encry_file, 'rb') as fp:
image_bin_1= fp.read()
return image_bin_1
def fill_fota_image(cf, image_confg_key, all_image_bin, fota_obj, start_offset, image_offset):
# 根据对应核的配置填充升级镜像头
fota_image_head_size = sizeof(fota_image_head)
upg_bin = bytearray(fota_image_head_size)
fota_image = fota_image_head.from_buffer(upg_bin)
fill_fota_image_head(fota_image, cf, image_confg_key, start_offset, image_offset, fota_image_head_size)
# 根据升级文件方式标志,将当前镜像处理获得新镜像(压缩、差分、原镜像三者之一)
image_bin, image_len= get_image_bin_data(fota_image, image_confg_key, fota_obj)
print("old_image_len: ", fota_image.old_image_len,
"new_image_len: ", fota_image.new_image_len,
"recovery image_len: ", image_len)
# 最终的镜像本体, 将镜像数据做对齐处理
image_bin_1, image_bin_1_len = get_image_bin_final(image_bin, image_len)
fota_image.image_len = image_bin_1_len
# 加密
ReRncFlag = cf.get(image_confg_key, 'ReRncFlag')
if ReRncFlag == "0x3C7896E1":
image_bin_1 = encry_ota_image(cf, image_confg_key, image_bin_1, fota_obj)
# 头部安全填充
fill_fota_image_secure_head(upg_bin, fota_image, cf, image_confg_key, image_bin_1)
# 组合镜像
# 1. 头部填充
head_end = start_offset + fota_image_head_size
all_image_bin[start_offset:head_end] = upg_bin
# 2. 镜像本体填充
img_end = head_end + image_bin_1_len
all_image_bin[head_end:img_end] = image_bin_1
return fota_image_head_size + image_bin_1_len
def make_upg(fota_obj):
cf = fota_obj.base_cfg
temp_offset = [0]
upg_bin = bytearray(temp_offset[0])
image_num = len(fota_obj.input_dict)
# FOTA Key Area 填充
# version字段暂来自配置文件因此key area填充放到最前面
fota_key_len, fota_key_bin_buff = fill_fota_key_area(cf, 0, 0)
# fota_info区填充除hash以外hash数据最后再填充
fota_info_len, fota_info_bin_buff = fill_fota_info_area(cf, image_num, fota_obj)
hash_node_offset = fota_key_len + fota_info_len
all_hash_node_len = image_num * sizeof(fota_image_hash_node)
tmp = all_hash_node_len % 16
if tmp != 0:
all_hash_node_len = all_hash_node_len + 16 - tmp
# hash node 尾部多余长度
tmp_hash_bin = bytearray(16 - tmp)
image_offset = hash_node_offset + all_hash_node_len
all_image_len = 0
temp_offset1 = [0]
all_image_bin = bytearray(temp_offset1[0])
temp_offset2 = [0]
all_hash_bin_buff = bytearray(temp_offset2[0])
image_index = 0
for image_confg_key in fota_obj.input_dict:
#image_confg_key = 'IMAGE_' + str(i + 1)
image_id = hex2dec(cf.get(image_confg_key, 'ImageId'))
# 填充升级镜像头部结构返回镜像总长度压缩后的实际长度不包含对齐的长度和头结构的总长度、头部的非安全hash
image_length = fill_fota_image(cf, image_confg_key, all_image_bin, fota_obj, all_image_len, image_offset)
head_hash = make_sha256_unsecure_signature(all_image_bin[all_image_len : all_image_len + sizeof(fota_image_head)])
# 计算下一个镜像的起始位置和偏移位置
addr_offset = all_image_len + image_offset
all_image_len = all_image_len + image_length
print(image_confg_key, ':', image_length, ': addr_offset:', addr_offset)
# 将计算出来的镜像头hash放到hash段的bin中并初始化hash的结构
fill_fota_image_hash_node(all_hash_bin_buff, image_id, image_index, head_hash, addr_offset, image_length)
image_index += 1 # 镜像索引递增
if tmp != 0:
all_hash_bin_buff[all_hash_node_len - len(tmp_hash_bin):all_hash_node_len] = tmp_hash_bin
# 计算hash table表hash值填到fota_info_area
hash_table_hash = make_sha256_unsecure_signature(all_hash_bin_buff)
set_fota_info_area(fota_info_len, fota_info_bin_buff, hash_node_offset, all_hash_node_len, hash_table_hash)
upg_bin[0:fota_key_len] = fota_key_bin_buff
upg_bin[fota_key_len:fota_key_len + fota_info_len] = fota_info_bin_buff
upg_bin[hash_node_offset:hash_node_offset + all_hash_node_len] = all_hash_bin_buff
upg_bin[image_offset:image_offset + all_image_len] = all_image_bin
remove_file(fota_obj.upg_image)
create_dirs(os.path.dirname(fota_obj.upg_image))
with open(fota_obj.upg_image, 'wb+') as fp:
fp.write(upg_bin)
# print_upg(upg_bin, fota_obj.upg_image)
return
def print_upg_byte_list(pname, byte_in):
length = len(byte_in)
print("[%s=0x"%pname, end="")
for i in range(0, length):
print("%02x"%byte_in[i], end="")
print("]")
def print_upg_byte(pname, byte_in):
length = len(byte_in)
print("[%s ="%pname)
for i in range(0, length):
if (i + 1) % 16 != 0:
print("0x%02x, " % byte_in[i], end="")
elif (i + 1) == length:
print("0x%02x]" % byte_in[i])
else:
print("0x%02x, " % byte_in[i])
def print_fota_key_area(fota_key_head):
print("[image_id=0x%x]" % (fota_key_head.image_id))
print("[struct_version=0x%x]" % (fota_key_head.struct_version))
print("[struct_length=0x%x]" % (fota_key_head.struct_length))
print("[signature_length=0x%x]" % (fota_key_head.signature_length))
print("[key_owner_id=0x%x]" % (fota_key_head.key_owner_id))
print("[key_id=0x%x]" % (fota_key_head.key_id))
print("[key_alg=0x%x]]" % (fota_key_head.key_alg))
print("[ecc_curve_type=0x%x]" % (fota_key_head.ecc_curve_type))
print("[key_length=0x%x]]" % (fota_key_head.key_length))
print("[fota_key_version_ext=0x%x]" % (fota_key_head.fota_key_version_ext))
print("[mask_fota_key_version_ext=0x%x]" % (fota_key_head.mask_fota_key_version_ext))
print("[msid_ext=0x%x]" % (fota_key_head.msid_ext))
print("[mask_msid_ext=0x%x]" % (fota_key_head.mask_msid_ext))
print("[maintenance_mode=0x%x]" % (fota_key_head.maintenance_mode))
print_upg_byte("die_id", fota_key_head.die_id)
print("[fota_info_addr=0x%x]]" % (fota_key_head.fota_info_addr))
#print_upg_byte("reserved", fota_key_head.reserved)
print_upg_byte("fota_external_public_key", fota_key_head.fota_external_public_key)
print_upg_byte("sig_fota_key_area", fota_key_head.sig_fota_key_area)
def print_fota_info_area(fota_info_head):
print("[image_id=0x%x]" % (fota_info_head.image_id))
print("[struct_version=0x%x]" % (fota_info_head.struct_version))
print("[struct_length=0x%x]" % (fota_info_head.struct_length))
print("[signature_length=0x%x]" % (fota_info_head.signature_length))
print("[fota_version_ext=0x%x]" % (fota_info_head.fota_version_ext))
print("[mask_fota_version_ext=0x%x]" % (fota_info_head.mask_fota_version_ext))
print("[msid_ext=0x%x]" % (fota_info_head.msid_ext))
print("[mask_msid_ext=0x%x]" % (fota_info_head.mask_msid_ext))
print("[image_hash_table_addr=0x%x]" % (fota_info_head.image_hash_table_addr))
print("[image_hash_table_length=0x%x]" % (fota_info_head.image_hash_table_length))
#print_upg_byte("image_hash_table_hash", fota_info_head.image_hash_table_hash)
print_upg_byte_list("image_hash_table_hash", fota_info_head.image_hash_table_hash)
print("[image_num=0x%x]" % (fota_info_head.image_num))
print("[hardware_id=0x%x]" % (fota_info_head.hardware_id))
#print_upg_byte("reserved", fota_info_head.reserved)
print_upg_byte("sign_fota_info", fota_info_head.sign_fota_info)
def print_fota_image_hash_table(fota_image_hash_table):
print("[image_id=0x%x]" % (fota_image_hash_table.image_id))
print("[image_addr=0x%x]" % (fota_image_hash_table.image_addr))
print("[image_length=0x%x]" % (fota_image_hash_table.image_length))
#print_upg_byte("image_hash", fota_image_hash_table.image_hash)
print_upg_byte_list("image_hash", fota_image_hash_table.image_hash)
def print_fota_image_head(image_head):
print("[header_magic=0x%x]" % (image_head.header_magic))
print("[image_id=0x%x]" % (image_head.image_id))
print("[image_offset=0x%x]" % (image_head.image_offset))
print("[image_len=0x%x]" % (image_head.image_len))
print("[new_image_len=0x%x]" % (image_head.new_image_len))
#print_upg_byte("new_image_hash", image_head.new_image_hash)
print_upg_byte_list("image_hash", image_head.image_hash)
print("[old_image_len=0x%x]" % (image_head.old_image_len))
print_upg_byte_list("old_image_hash", image_head.old_image_hash)
#print_upg_byte("old_image_hash", image_head.old_image_hash)
print("[version_ext=0x%x]" % (image_head.version_ext))
print("[version_mask=0x%x]" % (image_head.version_mask))
print("[decompress_flag=0x%x]" % (image_head.decompress_flag))
print("[root_key_type=0x%x]" % (image_head.root_key_type))
print("[re_enc_flag=0x%x]" % (image_head.re_enc_flag))
print_upg_byte("enc_pk_l1", image_head.enc_pk_l1)
print_upg_byte("enc_pk_l2", image_head.enc_pk_l2)
print_upg_byte("iv", image_head.iv)
#print_upg_byte("padding", image_head.padding)
def print_upg(upg_bin, image_file):
print("upg_bin_len %s"%len(upg_bin))
print("-------------%s fota_key_area start-------------" % (image_file))
fota_key_head = fota_key_area_data.from_buffer(upg_bin)
key_area_len = fota_key_head.struct_length
print_fota_key_area(fota_key_head)
print("-------------%s fota_info_area start-------------" % (image_file))
if fota_key_head.fota_info_addr == 0:
fota_info_offset = key_area_len
else:
fota_info_offset = fota_key_head.fota_info_addr
fota_info_head = fota_info_area_data.from_buffer(upg_bin[fota_info_offset:])
print_fota_info_area(fota_info_head)
image_num = fota_info_head.image_num
hash_node_size = sizeof(fota_image_hash_node)
if fota_info_head.image_hash_table_addr == 0:
hash_table_offset = fota_info_offset + key_area_len
else:
hash_table_offset = fota_info_head.image_hash_table_addr
for i in range(0, image_num):
print("-------------fota hash table start NO.%02d-------------" % (i + 1))
hash_table_info = fota_image_hash_node.from_buffer(upg_bin[hash_table_offset + i * hash_node_size:])
print_fota_image_hash_table(hash_table_info)
print("-------------fota image head start NO.%02d-------------" % (i + 1))
image_head_offset = hash_table_info.image_addr
image_head = fota_image_head.from_buffer(upg_bin[image_head_offset:])
print_fota_image_head(image_head)
def get_parameters():
parser = argparse.ArgumentParser()
parser.add_argument('-app_name', type=str, default='',
help='生成镜像的应用名')
parser.add_argument('-upg_format_path', type=str, default='',
help='升级包结构配置文件')
parser.add_argument('-base', type=str, default='',
help='基础配置文件路径')
parser.add_argument('-temp_dir', type=str, default='',
help='临时配置文件路径')
parser.add_argument('-old_images', type=str, default='',
help='老版本镜像文件路径')
parser.add_argument('-new_images', type=str, default='',
help='新版本镜像文件路径')
parser.add_argument('-output_dir', type=str, default='',
help='输出文件路径')
parser.add_argument('-type', type=int, default=0,
help='执行类型')
parser.add_argument('-sign_tool', type=int, default=0,
help='签名工具选择 0-本地签名工具, 1-在线签名工具')
config = parser.parse_args()
return config
config_item = ['SignSuite', 'RootKeyFile', 'SubKeyFile', 'SrcFile', 'DstFile']
class Properties:
def getProperties(self, dir):
# 处理升级配置文件,获取有效配置,将有效配置保存到字典中
# 各个镜像的配置不一样,这部分也被过滤掉了,后面要怎么区分?
#########################################################################
properties = {}
with open(dir, 'r', encoding='utf-8') as pro_file:
for line in pro_file:
if line.find('=') > 0 and '#' not in line:
strs = line.replace('\n', '').split('=')
properties[strs[0]] = strs[1]
return properties
# 获取签名相关的配置
def setProperties(self, dir, properties):
with open(dir, 'w') as pro_file:
for item in properties:
if item in config_item:
pro_file.write("{}={}\n".format(item, properties[item]))
def remove_file(path):
if os.path.exists(path): # 如果文件存在
# 删除文件,可使用以下两种方法。
os.remove(path)
def create_dirs(path):
os.makedirs(path, exist_ok=True)
def print_fota_file(image_file):
with open(image_file, 'rb') as fp:
upg_bin = fp.read()
print_upg_byte(image_file, upg_bin)
class myconf(configparser.ConfigParser):
def __init__(self, defaults=None):
configparser.ConfigParser.__init__(self, defaults=defaults)
# 这里重写了optionxform方法直接返回选项名
def optionxform(self, optionstr):
return optionstr
# def create_upg_cfg(base, input, conf_dir):
def create_upg_cfg(fota_obj):
img_cfg = copy.deepcopy(fota_obj.base_cfg)
cf_list = img_cfg.sections() # 获取所有section返回值为list
input_cf_key_list = []
input_cf_key_list.append('FOTA_KEY_AREA')
input_cf_key_list.append('FOTA_INFO_AREA')
# 将输入的段加入到列表
for image_confg_key in fota_obj.input_dict:
input_cf_key_list.append(image_confg_key)
for item in cf_list:
# 将基础配置文件中,和输入无关的段删除
if item not in input_cf_key_list:
img_cfg.remove_section(item)
remove_file(fota_obj.image_conf_file)
print(fota_obj.image_conf_file)
print(os.path.dirname(fota_obj.image_conf_file))
create_dirs(os.path.dirname(fota_obj.image_conf_file))
with open(fota_obj.image_conf_file, 'w') as configfile:
img_cfg.write(configfile)
def create_sign_cfg(fota_obj):
remove_file(fota_obj.sign_conf_file)
create_dirs(os.path.dirname(fota_obj.sign_conf_file))
# 从基础文件中读取SIGN_CFG段并写入到签名配置文件
with open(fota_obj.sign_conf_file, 'w') as pro_file:
for item in fota_obj.base_cfg.items("SIGN_CFG"):
if item[0] == 'UpgImagePath':
pro_file.write('SrcFile="%s"\n'%fota_obj.upg_image)
elif item[0] == 'UpgSignedImagePath':
pro_file.write('DstFile="%s"\n'%fota_obj.upg_signed_image)
elif 'KeyFile' in item[0]:
pro_file.write('%s="%s"\n'% (item[0], os.path.join(fota_obj.root_path, item[1])))
else:
pro_file.write('%s=%s\n'%item)
for item in fota_obj.base_cfg.items("FOTA_KEY_AREA"):
if item[0] == 'ImageId':
pro_file.write('%s=%s\n'%item)
def check_fota_image_id(fota_file):
# 各个镜像的image_id需保证不一致
cf = myconf()
cf.read(fota_file, encoding='utf-8')
cf_list = cf.sections()
image_id_dict = {}
no_sections = ['FOTA_KEY_AREA', 'FOTA_INFO_AREA']
if cf.get('FOTA_KEY_AREA', 'ImageId') != cf.get('FOTA_INFO_AREA', 'ImageId'):
msg = "FOTA_KEY_AREA and FOTA_INFO_AREA have different ImageId.\n"
sys.exit(msg)
for section in cf_list:
if section in no_sections:
continue
image_id = cf.get(section, 'ImageId')
if image_id in image_id_dict:
msg = "The ImageId of %s and %s are the same.\n" % (section, image_id_dict.get(image_id))
sys.exit(msg)
else:
image_id_dict[image_id] = section
def check_image_key(input_dict):
# 检查nv是否与主程序一起打包
pkg_bin_list = input_dict.keys()
if 'nv' in pkg_bin_list and 'application' not in pkg_bin_list:
msg = "The NV cannot be packed separately.It has to be with application!\n"
sys.exit(msg)
# 检查索引文件与资源文件是否在一起打包
if 'res_data' in pkg_bin_list and 'res_index' not in pkg_bin_list:
msg = "The res_index file and res_data should be together.\n"
sys.exit(msg)
def sign_upg_file(fota_obj):
if fota_obj.sign_tool == 1:
# 在线签名将不会在fota_obj.output_dir下生成最终升级包文件
# 需usr自行对fota_obj.temp_dir下的未签名文件update_temp.fwpkg进行签名
return
cmd = ' '.join([fota_obj.upg_tool, '3', '"%s"' % fota_obj.sign_conf_file])
subprocess.run(cmd, shell=True)
class fota_info:
def __init__(self, user_input):
self.root_path = g_root
self.app_name = user_input.app_name
self.temp_dir = user_input.temp_dir
self.output_dir = user_input.output_dir
self.image_conf_file = os.path.join(self.temp_dir, '%s.cfg' % user_input.app_name) # 升级镜像的配置文件
self.sign_conf_file = os.path.join(self.temp_dir, '%s_sign.cfg' % user_input.app_name) # 升级镜像的签名配置文件
self.encry_conf_file = os.path.join(self.temp_dir, '%s_encry.cfg' % user_input.app_name) # 升级镜像的加密配置文件
if hasattr(user_input, 'sign_tool'):
self.sign_tool = user_input.sign_tool
else:
self.sign_tool = 0
if hasattr(user_input, 'hardware_id'):
self.hardware_id = user_input.hardware_id
self.base_cfg = myconf()
self.base_cfg.read(user_input.base, encoding='utf-8')
self._set_upg_image_path()
self._set_tool_path()
self.input_dict = self._split_input_info(user_input.new_images)
self.original_dict = self._split_input_info(user_input.old_images)
print("input_dict:", self.input_dict)
print("original_dict:", self.original_dict)
self.diff_image_info = self._get_image_path_for_diff_image(self.input_dict, self.original_dict) # 差分镜像信息
def _set_tool_path(self):
if platform.system().lower() == "linux":
self.upg_tool = self.base_cfg.get('TOOLS', 'UpgToolPath')
self.lzma_tool = self.base_cfg.get('TOOLS', 'LzmaToolPath')
elif platform.system().lower() == "windows":
self.upg_tool = self.base_cfg.get('TOOLS', 'UpgToolWinPath')
self.lzma_tool = self.base_cfg.get('TOOLS', 'LzmaToolWinPath')
self.upg_tool = '"%s"' % os.path.join(self.root_path, self.upg_tool)
self.lzma_tool = '"%s"' % os.path.join(self.root_path, self.lzma_tool)
print(self.upg_tool)
print(self.lzma_tool)
def _set_upg_image_path(self):
self.upg_image = self.base_cfg.get('SIGN_CFG', 'UpgImagePath')
self.upg_signed_image = self.base_cfg.get('SIGN_CFG', 'UpgSignedImagePath')
if self.upg_image == '':
self.upg_image = os.path.join(self.temp_dir, '%s_temp.fwpkg' % self.app_name)
if self.upg_signed_image == '':
self.upg_signed_image = os.path.join(self.output_dir, '%s.fwpkg' % self.app_name)
def _split_input_info(self, input):
input = input.replace('\n','').replace('\r','').replace('\t','')
if input == '':
return None
bin_list = input.split('|')
image_num = len(bin_list)
bin_dict = {}
res_data = ""
for i in range(0, image_num):
bin_file, image_confg_key = bin_list[i].split('=')
if image_confg_key == "res_data":
res_data = bin_file
else:
bin_dict[image_confg_key] = bin_file
if res_data != "":
bin_dict["res_data"] = res_data
return bin_dict
def _get_image_path_for_diff_image(self, new_version_dict, org_version_dict):
# 解析各个镜像的老版本镜像路径、新版本路径,并一一对应
# 制作差分升级文件的镜像需要提供老版本文件路径,提供老版本的同时也必须提供对应的新版本路径
if org_version_dict == None:
return None
image_ref = {}
for image_confg_key in org_version_dict:
if image_confg_key in image_ref:
sys.exit("[ERROR] old_images bin param repeated!!!")
image_ref[image_confg_key] = [
org_version_dict[image_confg_key], # 存放老镜像路径
"", # 存放新镜像路径
"" # 存放镜像路径的前缀,取新路径前缀
]
for image_confg_key in new_version_dict:
if image_confg_key in image_ref:
image_ref[image_confg_key][1] = new_version_dict[image_confg_key] # 后存放新版本镜像到1
image_ref[image_confg_key][2] = os.path.splitext(os.path.basename(image_ref[image_confg_key][0]))[0]
for key in image_ref:
if image_ref[key][1] == "":
sys.exit("[ERROR] not find a matched image bin!!!")
return image_ref
def create_fota_file(user_input):
fota_format_init(user_input.upg_format_path)
fota_obj = fota_info(user_input)
print("root_path:", fota_obj.root_path)
# 检查打包镜像限制
check_image_key(fota_obj.input_dict)
# 生成镜像配置文件
create_upg_cfg(fota_obj)
# 镜像ID校验
check_fota_image_id(fota_obj.image_conf_file)
# 升级文件制作
make_upg(fota_obj)
# 生成签名配置文件
create_sign_cfg(fota_obj)
# 升级文件签名
sign_upg_file(fota_obj)
def begin(user_input):
if user_input.type == 0:
create_fota_file(user_input)
else:
print_fota_file(user_input.new_images)
if __name__ == "__main__":
user_input = get_parameters()
begin(user_input)