668 lines
30 KiB
Python
668 lines
30 KiB
Python
|
#!/usr/bin/env python3
|
|||
|
# coding=utf-8
|
|||
|
# Copyright (c) HiSilicon (Shanghai) Technologies Co., Ltd. 2021-2022. All rights reserved.
|
|||
|
|
|||
|
"""
|
|||
|
* Description: NV binary create.
|
|||
|
* Create: 2021-11-30
|
|||
|
"""
|
|||
|
|
|||
|
import os
|
|||
|
import re
|
|||
|
import sys
|
|||
|
import stat
|
|||
|
import json
|
|||
|
import hashlib
|
|||
|
import struct
|
|||
|
import shutil
|
|||
|
import zlib
|
|||
|
import binascii
|
|||
|
|
|||
|
g_root = os.path.realpath(__file__)
|
|||
|
g_root = os.path.dirname(g_root)
|
|||
|
g_root = os.path.realpath(os.path.join(g_root, "..", "..", ".."))
|
|||
|
sys.path.append(os.path.join(g_root, 'build'))
|
|||
|
sys.path.append(os.path.join(g_root, 'build', 'script'))
|
|||
|
# print(g_root)
|
|||
|
|
|||
|
from conf_parser import BuildConfParser, ParserError
|
|||
|
from build_utils import fn_str_to_int
|
|||
|
from generate_data_stream import generate_data_stream
|
|||
|
|
|||
|
from ctypes import c_char, c_byte, c_ubyte, c_short, c_ushort, c_int, c_uint, Structure, sizeof
|
|||
|
|
|||
|
TD_CHAR = c_char
|
|||
|
TD_S8 = c_byte
|
|||
|
TD_U8 = c_ubyte
|
|||
|
TD_S16 = c_short
|
|||
|
TD_U16 = c_ushort
|
|||
|
TD_S32 = c_int
|
|||
|
TD_U32 = c_uint
|
|||
|
g_u8_max = 0xFF
|
|||
|
g_u16_max = 0xFFFF
|
|||
|
class KeyHead(Structure):
|
|||
|
_fields_ = [
|
|||
|
("magic", TD_U8), # Magic number to indicate the start of the item
|
|||
|
("valid", TD_U8), # flag to indicate whether the value is valid
|
|||
|
("length", TD_U16), # Length of the key_data field in bytes
|
|||
|
("type", TD_U8), # Normal (0xFF) or permanent (0x00) or keep (0x04 )
|
|||
|
("upgrade", TD_U8),
|
|||
|
|
|||
|
("key_id", TD_U16), # The Key ID
|
|||
|
("enc_key", TD_U16), # Allows some customisation of the data AES key used, 0x0 - key_data is plaintext, Others - key_data is encrypted
|
|||
|
("version", TD_U16), # Version of the key
|
|||
|
("rnd", TD_U32) # Key header crc calculated from length
|
|||
|
]
|
|||
|
|
|||
|
class NvPageHead(Structure):
|
|||
|
_fields_ = [
|
|||
|
("id", TD_U16),
|
|||
|
("reserved", TD_U8),
|
|||
|
("num_pages", TD_U8),
|
|||
|
("inverted_details_word", TD_U32),
|
|||
|
("last_write", TD_U32), # last_write
|
|||
|
("unused", TD_U32), # We want this header to be 4 words long - this allows us to alias anything after it
|
|||
|
]
|
|||
|
|
|||
|
# 1.根据指定的alias,合并所有配置文件
|
|||
|
# 1.1 解析alias中所有type=Nv的配置。
|
|||
|
|
|||
|
# 2.如果需要,从合并的配置文件中提取NV项ID,生成ID枚举头文件提供给源码使用(构建过程不允许修改源码,因此该头文件只有需要的时候调用脚本生成)
|
|||
|
# 3.预编译所有nv结构
|
|||
|
# 4.解析配置文件中各个nv项的数据内容,与结构结合,生成bin
|
|||
|
|
|||
|
|
|||
|
|
|||
|
class BuildNv:
|
|||
|
def __init__(self, alias, root=None, targets=None, backup=False, use_crc16=False):
|
|||
|
self.alias = alias
|
|||
|
self.root = root if root is not None else g_root
|
|||
|
self.targets = targets
|
|||
|
self.is_backup = backup
|
|||
|
self.use_crc16 = use_crc16
|
|||
|
self.tmp_path = os.path.join(self.root, json_conf["BUILD_TEMP_PATH"])
|
|||
|
self.nv_relative_path = os.path.join(self.root, json_conf["NV_RELATIVE_PATH"])
|
|||
|
self.nv_root = os.path.join(self.root, json_conf["NV_DEFAULT_CFG_DIR"])
|
|||
|
self.nv_output_dir = os.path.join(self.root, json_conf["OUT_BIN_DIR"])
|
|||
|
if not backup:
|
|||
|
self.nv_output_name = json_conf["OUT_BIN_NAME"]
|
|||
|
else:
|
|||
|
self.nv_output_name = json_conf["OUT_BACKUP_BIN_NAME"]
|
|||
|
self.nv_ver_src_dict = dict()
|
|||
|
self.nv_ver_dict = dict()
|
|||
|
self.nv_flash_cfg = None
|
|||
|
self.nv_cores_ver_bin = dict()
|
|||
|
self.nv_chip_ver_bin = dict()
|
|||
|
self.nv_flash_page_index = dict()
|
|||
|
|
|||
|
def set_nv_output_dir(self, path):
|
|||
|
self.nv_output_dir = path
|
|||
|
|
|||
|
def start_work(self):
|
|||
|
self._merge_cfgs()
|
|||
|
self._load_nv_flash_cfg()
|
|||
|
self._parse_etypes()
|
|||
|
self._gen_binary()
|
|||
|
self._create_header()
|
|||
|
|
|||
|
|
|||
|
def _merge_cfgs(self):
|
|||
|
'''
|
|||
|
Merge config sources in self.nv_ver_src_dict.
|
|||
|
This will build the self.nv_ver_dict like the following tree:
|
|||
|
|--- ver1 : {
|
|||
|
| "merged_cfg" : json file after merge all nv configuration with the same product type.
|
|||
|
| "prod_type" : "XXXX" }
|
|||
|
|
|
|||
|
chip|---target1---|--- ver2 :
|
|||
|
| |
|
|||
|
| |--- ver3 :
|
|||
|
| |
|
|||
|
| |--- core : "Each target corresponds to one core."
|
|||
|
|---targetN...
|
|||
|
'''
|
|||
|
for target in self.alias:
|
|||
|
if self._nv_ver_prepare(target) is False:
|
|||
|
continue
|
|||
|
# print('nv_ver_src_dict: ', self.nv_ver_src_dict)
|
|||
|
|
|||
|
for chip in self.nv_ver_src_dict:
|
|||
|
src_chip_dict = self.nv_ver_src_dict[chip]
|
|||
|
# print("src_chip_dict =",src_chip_dict)
|
|||
|
self.nv_ver_dict[chip] = {}
|
|||
|
chip_dict = self.nv_ver_dict[chip]
|
|||
|
for target in src_chip_dict:
|
|||
|
if chip_dict.get(target) is None:
|
|||
|
chip_dict[target] = {'core':src_chip_dict[target]['core']}
|
|||
|
nv_tmp_dir = os.path.join(self.nv_relative_path)
|
|||
|
for ver_name in src_chip_dict[target]:
|
|||
|
if ver_name == 'core':
|
|||
|
continue
|
|||
|
cfg_file_prefix = os.path.join(nv_tmp_dir, 'cfg', '%s_nv' % (target)) # 生成中间文件路径
|
|||
|
# print("cfg_file_prefix = ", cfg_file_prefix)
|
|||
|
chip_dict[target][ver_name] = self._merge_ver_cfg(cfg_file_prefix, src_chip_dict[target][ver_name])
|
|||
|
|
|||
|
def _parse_etypes(self):
|
|||
|
for chip in self.nv_ver_dict:
|
|||
|
chip_dict = self.nv_ver_dict[chip]
|
|||
|
for target in chip_dict:
|
|||
|
# TODO: scons in chip dir or nv_config dir? etypes path depends on scons path
|
|||
|
nv_tmp_dir = os.path.join(self.tmp_path, target)
|
|||
|
etypes_path = os.path.join(nv_tmp_dir, "%s.etypes" % target) # 中间文件xxx.etypes路径
|
|||
|
if os.path.exists(etypes_path) is not True: # 判断中间文件是否存在,如果不在说明该模块没有被编译,需要加入到编译链接中
|
|||
|
etypes_path = os.path.join(self.tmp_path, "etypes", "%s.etypes" % target)
|
|||
|
if os.path.exists(etypes_path) is not True:
|
|||
|
msg = "[error] [%s] need add nv_config module in alias! %s" % (target, etypes_path)
|
|||
|
raise ParserError(msg)
|
|||
|
stream_gen = generate_data_stream()
|
|||
|
stream_gen.phase_etypes(etypes_path)
|
|||
|
chip_dict[target]["stream_gen"] = stream_gen
|
|||
|
dtabase_txt = os.path.join(self.root, json_conf['DATABASE_TXT_FILE'])
|
|||
|
shutil.copy(etypes_path, dtabase_txt)
|
|||
|
|
|||
|
|
|||
|
def _gen_binary(self):
|
|||
|
'''
|
|||
|
|--- ver1 : binary file of ver1.
|
|||
|
| (version name : product_type + version name)
|
|||
|
chip|---core1---|--- ver2 :
|
|||
|
| |
|
|||
|
| |--- ver3 :
|
|||
|
|---coreN...
|
|||
|
'''
|
|||
|
self._gen_binary_prepare()
|
|||
|
self._gen_target_version_binary()
|
|||
|
self._gen_chip_nv_binary()
|
|||
|
|
|||
|
def _gen_target_version_binary(self):
|
|||
|
for chip in self.nv_cores_ver_bin:
|
|||
|
cores = self.nv_cores_ver_bin[chip]
|
|||
|
# print("cores =", cores)
|
|||
|
for core in cores:
|
|||
|
cores[core] = self._gen_version_binary(self.nv_ver_dict[chip], chip, core)
|
|||
|
|
|||
|
def _gen_version_binary(self, chip_ver_dict, chip, core):
|
|||
|
ver_binary_dict = dict()
|
|||
|
# print("chip_ver_dict = ", chip_ver_dict) # 字典信息包含核名字,配置文件路径
|
|||
|
if chip_ver_dict is None:
|
|||
|
return ver_binary_dict
|
|||
|
for target in chip_ver_dict:
|
|||
|
if chip_ver_dict[target].get('core') != core: # 判断字典里面的target读出来的core是否和core相同
|
|||
|
print("chip_ver_dict[target].get('core') = ", chip_ver_dict[target].get('core'))
|
|||
|
continue
|
|||
|
stream_gen = chip_ver_dict[target].get('stream_gen')
|
|||
|
for ver in chip_ver_dict[target]:
|
|||
|
# print("ver =", ver)
|
|||
|
if ver == 'core' or ver == 'stream_gen':
|
|||
|
continue
|
|||
|
stream_gen = chip_ver_dict[target].get('stream_gen')
|
|||
|
cfg_file = chip_ver_dict[target][ver]["merged_cfg"]
|
|||
|
# print("cfg_file =",cfg_file)
|
|||
|
stream = self._gen_nv_stream(cfg_file, stream_gen, chip, core)
|
|||
|
nv_ver_bin = \
|
|||
|
os.path.join(self.nv_relative_path, 'bin', '%s_nv.bin' % (core))
|
|||
|
# print("nv_ver_bin = ", nv_ver_bin) # 生成nvbin文件的路径
|
|||
|
prod_type = chip_ver_dict[target][ver]["prod_type"]
|
|||
|
prod_type = "all" if prod_type is None else prod_type
|
|||
|
ver_binary_dict["%s_%s" % (chip, prod_type)] = self._write_binary_to_file(nv_ver_bin, stream)
|
|||
|
return ver_binary_dict
|
|||
|
|
|||
|
def _write_binary_to_file(self, file_path, stream):
|
|||
|
if os.path.exists(os.path.dirname(file_path)) is False:
|
|||
|
os.makedirs(os.path.dirname(file_path))
|
|||
|
if os.path.exists(file_path) is True:
|
|||
|
os.remove(file_path)
|
|||
|
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
|
|||
|
modes = stat.S_IWUSR | stat.S_IRUSR
|
|||
|
with os.fdopen(os.open(file_path, flags, modes), 'wb') as fout:
|
|||
|
fout.write(stream)
|
|||
|
return file_path
|
|||
|
|
|||
|
def _gen_binary_prepare(self):
|
|||
|
for chip in self.nv_ver_dict:
|
|||
|
self.nv_chip_ver_bin[chip] = dict()
|
|||
|
self.nv_cores_ver_bin[chip] = dict()
|
|||
|
self.nv_flash_page_index[chip] = dict()
|
|||
|
nv_flash_chip_cfg = self.nv_flash_cfg[chip]
|
|||
|
cores = nv_flash_chip_cfg["cores"].keys()
|
|||
|
chip_nv_ver = self.nv_ver_dict[chip]
|
|||
|
for target in chip_nv_ver:
|
|||
|
core = chip_nv_ver[target].get('core')
|
|||
|
for core in cores:
|
|||
|
if core not in cores:
|
|||
|
msg = "[error] [%s] [%s] not a core cfg in nv_storage_cfg.json!" % (target, core)
|
|||
|
raise ParserError(msg)
|
|||
|
if self.nv_cores_ver_bin[chip].get(core) is None:
|
|||
|
self.nv_cores_ver_bin[chip][core] = dict()
|
|||
|
page_size = fn_str_to_int(nv_flash_chip_cfg["size"]["page_size"])
|
|||
|
total_size = 0
|
|||
|
for core in cores:
|
|||
|
core_page_nums = nv_flash_chip_cfg['cores'][core]['page_nums']
|
|||
|
total_size += core_page_nums * page_size
|
|||
|
self.nv_flash_page_index[chip][core] = [(sizeof(NvPageHead) + num * page_size, (num + 1) * page_size) \
|
|||
|
for num in range(0, core_page_nums)]
|
|||
|
if total_size > fn_str_to_int(nv_flash_chip_cfg["size"]['flash_size']):
|
|||
|
msg = "[error] cores size: %s, over total flash size: %s!" % \
|
|||
|
(total_size, nv_flash_chip_cfg["size"]['flash_size'])
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
def _gen_nv_stream(self, cfg_file, stream_gen, chip, core):
|
|||
|
core_nv_bin = self._init_nv_page_head(chip, core)
|
|||
|
|
|||
|
cfg_data = BuildConfParser(cfg_file).get_conf_data()
|
|||
|
key_id_list = []
|
|||
|
last_key_item_start_addr = 0
|
|||
|
for module in cfg_data:
|
|||
|
for key_item in cfg_data[module]:
|
|||
|
if key_item == 'module_id':
|
|||
|
continue
|
|||
|
key_struct_name = cfg_data[module][key_item].get("structure_type")
|
|||
|
key_value = cfg_data[module][key_item].get("value")
|
|||
|
key_attr = cfg_data[module][key_item].get("attributions")
|
|||
|
key_id = cfg_data[module][key_item].get("key_id")
|
|||
|
key_id = fn_str_to_int(key_id) if type(key_id) is not int else key_id
|
|||
|
if key_struct_name is None or key_value is None or key_attr is None or key_value == []:
|
|||
|
msg = "[error] 'structure_type' 'value' 'attributions' must be configured!"
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
if key_id in key_id_list:
|
|||
|
msg = "[error] key id:%d repeated, please check!" % key_id
|
|||
|
raise ParserError(msg)
|
|||
|
key_id_list.append(key_id)
|
|||
|
|
|||
|
key_data, key_data_len = stream_gen.generate(key_struct_name, key_value)
|
|||
|
page_index, key_item_start_addr = self._find_usable_addr(chip, core, key_data, key_data_len)
|
|||
|
core_nv_bin, key_item_start_addr = \
|
|||
|
self._init_key_head(core_nv_bin, key_item_start_addr, key_data_len, key_id, key_attr)
|
|||
|
|
|||
|
core_nv_bin, key_item_start_addr = \
|
|||
|
self._set_key_data(core_nv_bin, key_item_start_addr, key_data, key_data_len)
|
|||
|
core_nv_bin, key_item_start_addr = \
|
|||
|
self._set_key_hash(core_nv_bin, key_item_start_addr, key_data_len)
|
|||
|
self._update_core_index(chip, core, page_index, key_item_start_addr)
|
|||
|
last_key_item_start_addr = max(last_key_item_start_addr, key_item_start_addr)
|
|||
|
core_nv_bin = self._set_unused_page(chip, core_nv_bin, last_key_item_start_addr)
|
|||
|
self._reset_core_index(chip, core)
|
|||
|
return core_nv_bin
|
|||
|
|
|||
|
def _set_unused_page(self, chip, core_nv_bin, key_item_start_addr):
|
|||
|
page_size = fn_str_to_int(self.nv_flash_cfg[chip]['size']['page_size'])
|
|||
|
core_nv_end_addr = (key_item_start_addr + (page_size - 1)) & ~(page_size - 1)
|
|||
|
for i in range(key_item_start_addr, core_nv_end_addr):
|
|||
|
core_nv_bin[i] = 0xFF
|
|||
|
return core_nv_bin[0 : core_nv_end_addr]
|
|||
|
|
|||
|
def _gen_chip_nv_binary(self):
|
|||
|
for chip in self.nv_cores_ver_bin:
|
|||
|
chip_bins = self.nv_cores_ver_bin[chip]
|
|||
|
ver_list = []
|
|||
|
for core in chip_bins:
|
|||
|
ver_list.extend(chip_bins[core].keys())
|
|||
|
|
|||
|
ver_list = set(ver_list)
|
|||
|
for ver in ver_list:
|
|||
|
self.nv_chip_ver_bin[chip][ver] = self._assemble_ver_bins(chip, ver)
|
|||
|
|
|||
|
def _assemble_ver_bins(self, chip, ver):
|
|||
|
flash_bin = bytearray(fn_str_to_int(self.nv_flash_cfg[chip]['size']['flash_size']))
|
|||
|
for i in range(0, len(flash_bin)):
|
|||
|
flash_bin[i] = 0xFF
|
|||
|
|
|||
|
start_addr = 0
|
|||
|
chip_bins = self.nv_cores_ver_bin[chip]
|
|||
|
cores = self.nv_cores_ver_bin[chip]
|
|||
|
for core in chip_bins:
|
|||
|
ver_bin = chip_bins[core].get(ver)
|
|||
|
flash_bin, start_addr = self._append_file_to_stream(flash_bin, start_addr, chip, core, ver_bin)
|
|||
|
chip_ver_bin_file = os.path.join(self.nv_output_dir, self.nv_output_name)
|
|||
|
# TODO: 目前只考虑单核场景去除 NV bin 末尾的无用 FF
|
|||
|
return self._write_binary_to_file(chip_ver_bin_file, flash_bin[0 : start_addr])
|
|||
|
|
|||
|
def _append_file_to_stream(self, flash_bin, start_addr, chip, core, ver_bin):
|
|||
|
core_bin_size = fn_str_to_int(self.nv_flash_cfg[chip]['size']["page_size"]) * \
|
|||
|
self.nv_flash_cfg[chip]["cores"][core]['page_nums']
|
|||
|
core_nv_bin = b''
|
|||
|
if ver_bin is None:
|
|||
|
core_nv_bin = self._init_nv_page_head(chip, core)
|
|||
|
else:
|
|||
|
with open(ver_bin, 'rb') as f:
|
|||
|
core_nv_bin = f.read()
|
|||
|
#print('core_nv_bin = ', core_nv_bin)
|
|||
|
tail_addr = start_addr + len(core_nv_bin)
|
|||
|
flash_bin[start_addr : tail_addr] = core_nv_bin
|
|||
|
|
|||
|
return flash_bin, tail_addr
|
|||
|
|
|||
|
def _reset_core_index(self, chip, core):
|
|||
|
nv_flash_chip_cfg = self.nv_flash_cfg[chip]
|
|||
|
page_size = fn_str_to_int(nv_flash_chip_cfg['size']["page_size"])
|
|||
|
core_page_nums = nv_flash_chip_cfg['cores'][core]['page_nums']
|
|||
|
self.nv_flash_page_index[chip][core] = [(sizeof(NvPageHead) + num * page_size, (num + 1) * page_size) \
|
|||
|
for num in range(0, core_page_nums)]
|
|||
|
|
|||
|
def _update_core_index(self, chip, core, index, addr):
|
|||
|
(start_addr, page_max_addr) = self.nv_flash_page_index[chip][core][index]
|
|||
|
if start_addr >= addr or addr > page_max_addr:
|
|||
|
msg = "[error] addr %s invalid!" % addr
|
|||
|
raise ParserError(msg)
|
|||
|
self.nv_flash_page_index[chip][core][index] = (addr, page_max_addr)
|
|||
|
#print("update page index: \n", self.nv_flash_page_index)
|
|||
|
|
|||
|
def _find_usable_addr(self, chip, core, key_data, key_data_len):
|
|||
|
page_size = fn_str_to_int(self.nv_flash_cfg[chip]['size'].get('page_size'))
|
|||
|
key_item_total_len = self._get_key_item_len(key_data_len)
|
|||
|
if key_item_total_len > page_size - sizeof(NvPageHead):
|
|||
|
msg = "[error] key over page size !" % key_id
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
index = 0
|
|||
|
for (start_addr, page_max_addr) in self.nv_flash_page_index[chip][core]:
|
|||
|
if start_addr + key_item_total_len > page_max_addr:
|
|||
|
index += 1
|
|||
|
continue
|
|||
|
return index, start_addr
|
|||
|
msg = "[error] no more enough space for [%s]!" % core
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
def _get_key_type_from_attr(self, key_id, attr):
|
|||
|
if (attr & 1) and (not (attr & ~1)):
|
|||
|
return 0xFF
|
|||
|
elif (attr & 2) and (not (attr & ~2)):
|
|||
|
return 0x00
|
|||
|
elif (attr & 4) and (not (attr & ~4)):
|
|||
|
return 0xFF
|
|||
|
else:
|
|||
|
msg = "[error] attribution config err: [id-%s] [attr-%s] !" % (key_id, attr)
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
def _get_key_upgrade_from_attr(self, key_id, attr):
|
|||
|
if (attr & 1) and (not (attr & ~1)):
|
|||
|
return 0xFF
|
|||
|
elif (attr & 2) and (not (attr & ~2)):
|
|||
|
return 0xFF
|
|||
|
elif (attr & 4) and (not (attr & ~4)):
|
|||
|
return 0x00
|
|||
|
else:
|
|||
|
msg = "[error] attribution config err: [id-%s] [attr-%s] !" % (key_id, attr)
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
def _get_key_item_len(self, key_data_len):
|
|||
|
if key_data_len % 4 != 0:
|
|||
|
key_data_len += 4 - key_data_len % 4
|
|||
|
return sizeof(KeyHead) + 4 + key_data_len
|
|||
|
|
|||
|
def _set_key_hash(self, nv_bin, key_item_start_addr, key_data_len):
|
|||
|
if key_data_len % 4 != 0:
|
|||
|
key_data_len += 4 - key_data_len % 4
|
|||
|
hash_start_addr = key_item_start_addr - key_data_len - sizeof(KeyHead)
|
|||
|
hash_end_addr = hash_start_addr + key_data_len + sizeof(KeyHead)
|
|||
|
if not self.use_crc16:
|
|||
|
crc32num = zlib.crc32(nv_bin[hash_start_addr : hash_end_addr])
|
|||
|
else:
|
|||
|
crc32num = binascii.crc_hqx(nv_bin[hash_start_addr : hash_end_addr], 0)
|
|||
|
crc32ret = '{:0>8X}'.format(crc32num)
|
|||
|
crc32ret = re.sub(r"(?<=\w)(?=(?:\w\w)+$)", " 0x", crc32ret)
|
|||
|
crc32ret = '0x' + crc32ret
|
|||
|
crc32list = [int(x,16) for x in crc32ret.split(" ")]
|
|||
|
sha256bytearray = bytes(crc32list)
|
|||
|
tail_addr = key_item_start_addr + len(sha256bytearray)
|
|||
|
nv_bin[key_item_start_addr : tail_addr] = sha256bytearray
|
|||
|
return nv_bin, tail_addr
|
|||
|
|
|||
|
def _set_key_data(self, nv_bin, key_item_start_addr, key_data, key_data_len):
|
|||
|
if key_data_len % 4 != 0:
|
|||
|
for i in range(0, 4 - key_data_len % 4) :
|
|||
|
key_data += b'\x00'
|
|||
|
key_data_len += 4 - key_data_len % 4
|
|||
|
tail_addr = key_item_start_addr + key_data_len
|
|||
|
nv_bin[key_item_start_addr : tail_addr] = key_data
|
|||
|
return nv_bin, tail_addr
|
|||
|
|
|||
|
def _init_key_head(self, nv_bin, key_item_start_addr, key_data_len, key_id, key_attr):
|
|||
|
nv_key_st = KeyHead.from_buffer(nv_bin[key_item_start_addr:])
|
|||
|
|
|||
|
nv_key_st.magic = 0xA9
|
|||
|
nv_key_st.length = key_data_len
|
|||
|
nv_key_st.type = self._get_key_type_from_attr(key_id, key_attr)
|
|||
|
nv_key_st.upgrade = self._get_key_upgrade_from_attr(key_id, key_attr)
|
|||
|
nv_key_st.key_id = key_id
|
|||
|
|
|||
|
nv_key_st.version = 65535
|
|||
|
nv_key_st.enc_key = 0 # TODO: 目前不支持加密,且nv加密部分不由nv脚本做,可能放到初始化?
|
|||
|
tail_addr = key_item_start_addr + sizeof(KeyHead)
|
|||
|
nv_bin[key_item_start_addr : tail_addr] = nv_key_st
|
|||
|
return nv_bin, tail_addr
|
|||
|
|
|||
|
def _init_nv_page_head(self, chip, core):
|
|||
|
nv_flash_chip_cfg = self.nv_flash_cfg[chip]
|
|||
|
default_page_nums = nv_flash_chip_cfg.get('default_page_nums')
|
|||
|
page_size = fn_str_to_int(nv_flash_chip_cfg['size']['page_size'])
|
|||
|
page_nums = nv_flash_chip_cfg['cores'][core]['page_nums']
|
|||
|
if not self.is_backup:
|
|||
|
page_id_start = nv_flash_chip_cfg['cores'][core]['page_id_start']
|
|||
|
else:
|
|||
|
page_id_start = '0x34B2'
|
|||
|
core_nv_size = page_nums * page_size
|
|||
|
core_nv_bin = bytearray(core_nv_size)
|
|||
|
|
|||
|
for i in range(0, core_nv_size):
|
|||
|
core_nv_bin[i] = 0xFF
|
|||
|
|
|||
|
for i in range(0, page_nums):
|
|||
|
start_addr = i * page_size
|
|||
|
nv_page_head = NvPageHead.from_buffer(core_nv_bin[start_addr:])
|
|||
|
nv_page_head.id = fn_str_to_int(page_id_start)
|
|||
|
nv_page_head.reserved = 1
|
|||
|
nv_page_head.num_pages = i
|
|||
|
nv_page_head.inverted_details_word = ~int.from_bytes(struct.pack('HBB', \
|
|||
|
nv_page_head.id, nv_page_head.reserved, nv_page_head.num_pages), 'little')
|
|||
|
nv_page_head.last_write = 0
|
|||
|
nv_page_head.unused = ~nv_page_head.last_write
|
|||
|
core_nv_bin[start_addr : start_addr + sizeof(NvPageHead)] = nv_page_head
|
|||
|
return core_nv_bin
|
|||
|
|
|||
|
def _load_nv_flash_cfg(self):
|
|||
|
self.nv_flash_cfg = dict()
|
|||
|
for chip in self.nv_ver_dict:
|
|||
|
cfg_file = os.path.join(self.root, json_conf["NV_TARGET_JSON_PATH"])
|
|||
|
self.nv_flash_cfg[chip] = BuildConfParser(cfg_file).get_conf_data()
|
|||
|
|
|||
|
def _add_nv_ver(self, chip, target, core, ver, common_cfg, ver_cfg, prod_type=None):
|
|||
|
'''
|
|||
|
Add version config into self.nv_ver_src_dict.
|
|||
|
There are three configuration scenarios.One target may correspond to multiple NV versions.
|
|||
|
|
|||
|
|--- ver1: { srcs:[default, common, cfg1], prod_type: }
|
|||
|
|
|
|||
|
chip|---target1---|--- ver2: { srcs:[default, cfg2], prod_type: }
|
|||
|
| |
|
|||
|
| |--- ver3: { srcs:[default, common], prod_type: }
|
|||
|
| |
|
|||
|
| |--- core : "Each target corresponds to one core."
|
|||
|
|---targetN...
|
|||
|
'''
|
|||
|
|
|||
|
ver_cfgs = []
|
|||
|
if os.path.exists(common_cfg) is True:
|
|||
|
ver_cfgs.append(common_cfg)
|
|||
|
if ver_cfg is not None and os.path.exists(ver_cfg):
|
|||
|
ver_cfgs.append(ver_cfg)
|
|||
|
|
|||
|
if self.nv_ver_src_dict.get(chip) is None:
|
|||
|
self.nv_ver_src_dict[chip] = dict()
|
|||
|
|
|||
|
chip_dict = self.nv_ver_src_dict[chip]
|
|||
|
if chip_dict.get(target) is not None and chip_dict[target].get(ver) is not None:
|
|||
|
msg = "[error] Ver config Repeate!"
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
if chip_dict.get(target) is None:
|
|||
|
chip_dict[target] = {ver:{"srcs":ver_cfgs, 'prod_type': prod_type}}
|
|||
|
else:
|
|||
|
chip_dict[target].update({ver:{"srcs":ver_cfgs, 'prod_type': prod_type}})
|
|||
|
|
|||
|
if chip_dict[target].get('core') is None:
|
|||
|
chip_dict[target]['core'] = core
|
|||
|
elif chip_dict[target].get('core') != core:
|
|||
|
msg = "[error] [%s] core not match!" % target
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
def _nv_ver_prepare(self, target):
|
|||
|
'''
|
|||
|
1. Check nv configurations.
|
|||
|
2. Add all correct config into self.nv_ver_src_dict..
|
|||
|
'''
|
|||
|
if type(self.alias[target]) is list:
|
|||
|
return False
|
|||
|
target_type = self.alias[target].get("TYPE")
|
|||
|
if target_type is None or target_type != 'nv':
|
|||
|
return False
|
|||
|
|
|||
|
core = self.alias[target].get("CORE")
|
|||
|
if core is None:
|
|||
|
msg = "[error] core name not exist!"
|
|||
|
raise ParserError(msg)
|
|||
|
chip = self.alias[target].get("CHIP")
|
|||
|
#default_cfg = os.path.join(self.nv_root, '%s_default.json' % core)
|
|||
|
'''
|
|||
|
if chip is None or os.path.exists(default_cfg) is False:
|
|||
|
msg = "[error] chip name OR %s not exist!" % default_cfg
|
|||
|
raise ParserError(msg)
|
|||
|
'''
|
|||
|
kernel_name = self.alias[target].get("KERNEL_BIN")
|
|||
|
if kernel_name is None:
|
|||
|
msg = "[error] KERNEL is null!"
|
|||
|
raise ParserError(msg)
|
|||
|
if self.targets is not None and kernel_name not in self.targets:
|
|||
|
return False
|
|||
|
cfgs = self.alias[target].get("COMPONENT")
|
|||
|
if cfgs is None:
|
|||
|
msg = "[error] COMPONENT is null!"
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
prod_type = self.alias[target].get("PRODUCT_TYPE")
|
|||
|
if prod_type == "":
|
|||
|
prod_type = None
|
|||
|
if prod_type is not None and type(prod_type) is not str:
|
|||
|
msg = "[error] PRODUCT_TYPE must be a string type, one kernel only suuport one product type!"
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
cfg_dir = os.path.join(self.nv_root, kernel_name)
|
|||
|
common_cfg = os.path.join(cfg_dir, 'common.json')
|
|||
|
|
|||
|
for cfg in cfgs:
|
|||
|
cfg_file = os.path.join(cfg_dir, '%s.json' % cfg) if cfg != 'common' else None
|
|||
|
self._add_nv_ver(chip, kernel_name, core, cfg, common_cfg, cfg_file, prod_type)
|
|||
|
|
|||
|
def _prod_type_filter(self, srcs, prod_type = None):
|
|||
|
combination = dict()
|
|||
|
module_dict = dict()
|
|||
|
for src in srcs:
|
|||
|
# print("[INFO] nv config src file: ", src)
|
|||
|
src_conf = BuildConfParser(src).get_conf_data()
|
|||
|
for module in src_conf:
|
|||
|
module_id = src_conf.get(module).get('module_id')
|
|||
|
if module_id is None:
|
|||
|
msg = "[error][file:%s][%s] module_id is null!" % (src, module)
|
|||
|
raise ParserError(msg)
|
|||
|
module_id = fn_str_to_int(module_id) if type(module_id) is not int else module_id
|
|||
|
if module_id > g_u8_max :
|
|||
|
msg = "[error][file:%s][%s] module_id is more than 0xFF!" % (src, module)
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
# diffrent module must config with diffrent module_id
|
|||
|
if module_id in module_dict:
|
|||
|
if module_dict[module_id] != module:
|
|||
|
msg = "[error][file:%s][%s] module_id is the same to [%s]!" % (src, module, module_dict[module_id])
|
|||
|
raise ParserError(msg)
|
|||
|
else:
|
|||
|
module_dict[module_id] = module
|
|||
|
|
|||
|
if module not in combination:
|
|||
|
combination[module] = {'module_id': module_id}
|
|||
|
elif combination.get(module).get('module_id') != module_id:
|
|||
|
msg = "[error][%s][%s] module_id is not same as other file!" % (src, module)
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
for item in src_conf.get(module):
|
|||
|
key_cfg = src_conf.get(module).get(item)
|
|||
|
if item == 'module_id':
|
|||
|
continue
|
|||
|
|
|||
|
key_id = key_cfg.get('key_id')
|
|||
|
key_id = None if key_id is None else (fn_str_to_int(key_id) if type(key_id) is not int else key_id)
|
|||
|
# print("key id: %d, module id: %d, key>>8: %d"%( key_id, module_id, key_id>>8))
|
|||
|
if key_id is None or key_id > g_u16_max :
|
|||
|
msg = "[error][file:%s][%s][%s] key_id is null or more than unsighed 16 or not match with module_id!" % (src, module, item)
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
item_prod_type = key_cfg.get('product_type')
|
|||
|
key_status = key_cfg.get('key_status')
|
|||
|
#print('prodtype: %s, key prod: %s'%(prod_type, item_prod_type))
|
|||
|
if (prod_type == item_prod_type or (item_prod_type is not None and prod_type in item_prod_type)) \
|
|||
|
and key_status == 'alive':
|
|||
|
combination[module].update({item:key_cfg})
|
|||
|
return combination
|
|||
|
|
|||
|
def _nv_cfg_writer(self, dst_file, combination):
|
|||
|
if os.path.exists(os.path.dirname(dst_file)) is False:
|
|||
|
os.makedirs(os.path.dirname(dst_file))
|
|||
|
if os.path.exists(dst_file):
|
|||
|
os.remove(dst_file)
|
|||
|
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
|
|||
|
modes = stat.S_IWUSR | stat.S_IRUSR
|
|||
|
with os.fdopen(os.open(dst_file, flags, modes), 'w') as fout:
|
|||
|
fout.write(json.dumps(combination, indent=4))
|
|||
|
|
|||
|
def _merge_ver_cfg(self, file_prefix, ver_cfg):
|
|||
|
srcs = ver_cfg.get('srcs')
|
|||
|
if srcs is None:
|
|||
|
msg = "[error] ver cfg file is null!"
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
prod_type = ver_cfg.get('prod_type')
|
|||
|
combination = self._prod_type_filter(srcs, prod_type)
|
|||
|
dst_file = '%s.json' % file_prefix
|
|||
|
self._nv_cfg_writer(dst_file, combination)
|
|||
|
return { "merged_cfg" : dst_file, "prod_type" : prod_type}
|
|||
|
|
|||
|
def _create_header(self):
|
|||
|
pass
|
|||
|
|
|||
|
# 检查配置文件中是否有必须配置项
|
|||
|
def check_key(json_conf):
|
|||
|
check_item = ['OUT_BIN_DIR', 'BUILD_TEMP_PATH', 'NV_TARGET_JSON_PATH', \
|
|||
|
'NV_RELATIVE_PATH', 'NV_DEFAULT_CFG_DIR', 'DATABASE_TXT_FILE']
|
|||
|
keys = dict.keys(json_conf)
|
|||
|
for check_key in check_item:
|
|||
|
if check_key not in keys:
|
|||
|
msg = "[error] [nv_binary] need add ConfigMap (%s) in json_conf!" % (check_key)
|
|||
|
raise ParserError(msg)
|
|||
|
|
|||
|
def test(targets, flag, backup, use_crc16):
|
|||
|
root = g_root
|
|||
|
nv_target_json_path = os.path.join(root, json_conf["NV_TARGET_JSON_PATH"])
|
|||
|
alias_conf = BuildConfParser(nv_target_json_path).get_conf_data()
|
|||
|
worker = BuildNv(alias_conf, root, targets, backup, use_crc16)
|
|||
|
if flag:
|
|||
|
worker.set_nv_output_dir(os.path.join(root, json_conf["OUT_BIN_DIR"]))
|
|||
|
worker.start_work()
|
|||
|
|
|||
|
def nv_begin(in_path, targets, flag, gen_backup=False, use_crc16=False):
|
|||
|
global json_conf
|
|||
|
with open(in_path, 'r') as i:
|
|||
|
json_conf = json.load(i)
|
|||
|
|
|||
|
check_key(json_conf)
|
|||
|
test(targets, flag, False, use_crc16)
|
|||
|
if gen_backup:
|
|||
|
test(targets, flag, True, use_crc16)
|
|||
|
print("build nv bin success!!")
|
|||
|
|
|||
|
if __name__ == "__main__":
|
|||
|
in_path = sys.argv[2]
|
|||
|
targets = sys.argv[3].split()
|
|||
|
flag = len(sys.argv) == 3
|
|||
|
|
|||
|
nv_begin(in_path, targets, flag)
|