LPT26x-HSF-4MB-Hilink_14.2..../build/script/nv/nv_binary.py
2025-05-13 22:00:58 +08:00

668 lines
30 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# coding=utf-8
# Copyright (c) HiSilicon (Shanghai) Technologies Co., Ltd. 2021-2022. All rights reserved.
"""
* Description: NV binary create.
* Create: 2021-11-30
"""
import os
import re
import sys
import stat
import json
import hashlib
import struct
import shutil
import zlib
import binascii
g_root = os.path.realpath(__file__)
g_root = os.path.dirname(g_root)
g_root = os.path.realpath(os.path.join(g_root, "..", "..", ".."))
sys.path.append(os.path.join(g_root, 'build'))
sys.path.append(os.path.join(g_root, 'build', 'script'))
# print(g_root)
from conf_parser import BuildConfParser, ParserError
from build_utils import fn_str_to_int
from generate_data_stream import generate_data_stream
from ctypes import c_char, c_byte, c_ubyte, c_short, c_ushort, c_int, c_uint, Structure, sizeof
TD_CHAR = c_char
TD_S8 = c_byte
TD_U8 = c_ubyte
TD_S16 = c_short
TD_U16 = c_ushort
TD_S32 = c_int
TD_U32 = c_uint
g_u8_max = 0xFF
g_u16_max = 0xFFFF
class KeyHead(Structure):
_fields_ = [
("magic", TD_U8), # Magic number to indicate the start of the item
("valid", TD_U8), # flag to indicate whether the value is valid
("length", TD_U16), # Length of the key_data field in bytes
("type", TD_U8), # Normal (0xFF) or permanent (0x00) or keep (0x04 )
("upgrade", TD_U8),
("key_id", TD_U16), # The Key ID
("enc_key", TD_U16), # Allows some customisation of the data AES key used, 0x0 - key_data is plaintext, Others - key_data is encrypted
("version", TD_U16), # Version of the key
("rnd", TD_U32) # Key header crc calculated from length
]
class NvPageHead(Structure):
_fields_ = [
("id", TD_U16),
("reserved", TD_U8),
("num_pages", TD_U8),
("inverted_details_word", TD_U32),
("last_write", TD_U32), # last_write
("unused", TD_U32), # We want this header to be 4 words long - this allows us to alias anything after it
]
# 1.根据指定的alias合并所有配置文件
# 1.1 解析alias中所有type=Nv的配置。
# 2.如果需要从合并的配置文件中提取NV项ID生成ID枚举头文件提供给源码使用构建过程不允许修改源码因此该头文件只有需要的时候调用脚本生成
# 3.预编译所有nv结构
# 4.解析配置文件中各个nv项的数据内容与结构结合生成bin
class BuildNv:
def __init__(self, alias, root=None, targets=None, backup=False, use_crc16=False):
self.alias = alias
self.root = root if root is not None else g_root
self.targets = targets
self.is_backup = backup
self.use_crc16 = use_crc16
self.tmp_path = os.path.join(self.root, json_conf["BUILD_TEMP_PATH"])
self.nv_relative_path = os.path.join(self.root, json_conf["NV_RELATIVE_PATH"])
self.nv_root = os.path.join(self.root, json_conf["NV_DEFAULT_CFG_DIR"])
self.nv_output_dir = os.path.join(self.root, json_conf["OUT_BIN_DIR"])
if not backup:
self.nv_output_name = json_conf["OUT_BIN_NAME"]
else:
self.nv_output_name = json_conf["OUT_BACKUP_BIN_NAME"]
self.nv_ver_src_dict = dict()
self.nv_ver_dict = dict()
self.nv_flash_cfg = None
self.nv_cores_ver_bin = dict()
self.nv_chip_ver_bin = dict()
self.nv_flash_page_index = dict()
def set_nv_output_dir(self, path):
self.nv_output_dir = path
def start_work(self):
self._merge_cfgs()
self._load_nv_flash_cfg()
self._parse_etypes()
self._gen_binary()
self._create_header()
def _merge_cfgs(self):
'''
Merge config sources in self.nv_ver_src_dict.
This will build the self.nv_ver_dict like the following tree:
|--- ver1 : {
| "merged_cfg" : json file after merge all nv configuration with the same product type.
| "prod_type" : "XXXX" }
|
chip|---target1---|--- ver2 :
| |
| |--- ver3 :
| |
| |--- core : "Each target corresponds to one core."
|---targetN...
'''
for target in self.alias:
if self._nv_ver_prepare(target) is False:
continue
# print('nv_ver_src_dict: ', self.nv_ver_src_dict)
for chip in self.nv_ver_src_dict:
src_chip_dict = self.nv_ver_src_dict[chip]
# print("src_chip_dict =",src_chip_dict)
self.nv_ver_dict[chip] = {}
chip_dict = self.nv_ver_dict[chip]
for target in src_chip_dict:
if chip_dict.get(target) is None:
chip_dict[target] = {'core':src_chip_dict[target]['core']}
nv_tmp_dir = os.path.join(self.nv_relative_path)
for ver_name in src_chip_dict[target]:
if ver_name == 'core':
continue
cfg_file_prefix = os.path.join(nv_tmp_dir, 'cfg', '%s_nv' % (target)) # 生成中间文件路径
# print("cfg_file_prefix = ", cfg_file_prefix)
chip_dict[target][ver_name] = self._merge_ver_cfg(cfg_file_prefix, src_chip_dict[target][ver_name])
def _parse_etypes(self):
for chip in self.nv_ver_dict:
chip_dict = self.nv_ver_dict[chip]
for target in chip_dict:
# TODO: scons in chip dir or nv_config dir? etypes path depends on scons path
nv_tmp_dir = os.path.join(self.tmp_path, target)
etypes_path = os.path.join(nv_tmp_dir, "%s.etypes" % target) # 中间文件xxx.etypes路径
if os.path.exists(etypes_path) is not True: # 判断中间文件是否存在,如果不在说明该模块没有被编译,需要加入到编译链接中
etypes_path = os.path.join(self.tmp_path, "etypes", "%s.etypes" % target)
if os.path.exists(etypes_path) is not True:
msg = "[error] [%s] need add nv_config module in alias! %s" % (target, etypes_path)
raise ParserError(msg)
stream_gen = generate_data_stream()
stream_gen.phase_etypes(etypes_path)
chip_dict[target]["stream_gen"] = stream_gen
dtabase_txt = os.path.join(self.root, json_conf['DATABASE_TXT_FILE'])
shutil.copy(etypes_path, dtabase_txt)
def _gen_binary(self):
'''
|--- ver1 : binary file of ver1.
| (version name : product_type + version name)
chip|---core1---|--- ver2 :
| |
| |--- ver3 :
|---coreN...
'''
self._gen_binary_prepare()
self._gen_target_version_binary()
self._gen_chip_nv_binary()
def _gen_target_version_binary(self):
for chip in self.nv_cores_ver_bin:
cores = self.nv_cores_ver_bin[chip]
# print("cores =", cores)
for core in cores:
cores[core] = self._gen_version_binary(self.nv_ver_dict[chip], chip, core)
def _gen_version_binary(self, chip_ver_dict, chip, core):
ver_binary_dict = dict()
# print("chip_ver_dict = ", chip_ver_dict) # 字典信息包含核名字,配置文件路径
if chip_ver_dict is None:
return ver_binary_dict
for target in chip_ver_dict:
if chip_ver_dict[target].get('core') != core: # 判断字典里面的target读出来的core是否和core相同
print("chip_ver_dict[target].get('core') = ", chip_ver_dict[target].get('core'))
continue
stream_gen = chip_ver_dict[target].get('stream_gen')
for ver in chip_ver_dict[target]:
# print("ver =", ver)
if ver == 'core' or ver == 'stream_gen':
continue
stream_gen = chip_ver_dict[target].get('stream_gen')
cfg_file = chip_ver_dict[target][ver]["merged_cfg"]
# print("cfg_file =",cfg_file)
stream = self._gen_nv_stream(cfg_file, stream_gen, chip, core)
nv_ver_bin = \
os.path.join(self.nv_relative_path, 'bin', '%s_nv.bin' % (core))
# print("nv_ver_bin = ", nv_ver_bin) # 生成nvbin文件的路径
prod_type = chip_ver_dict[target][ver]["prod_type"]
prod_type = "all" if prod_type is None else prod_type
ver_binary_dict["%s_%s" % (chip, prod_type)] = self._write_binary_to_file(nv_ver_bin, stream)
return ver_binary_dict
def _write_binary_to_file(self, file_path, stream):
if os.path.exists(os.path.dirname(file_path)) is False:
os.makedirs(os.path.dirname(file_path))
if os.path.exists(file_path) is True:
os.remove(file_path)
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(file_path, flags, modes), 'wb') as fout:
fout.write(stream)
return file_path
def _gen_binary_prepare(self):
for chip in self.nv_ver_dict:
self.nv_chip_ver_bin[chip] = dict()
self.nv_cores_ver_bin[chip] = dict()
self.nv_flash_page_index[chip] = dict()
nv_flash_chip_cfg = self.nv_flash_cfg[chip]
cores = nv_flash_chip_cfg["cores"].keys()
chip_nv_ver = self.nv_ver_dict[chip]
for target in chip_nv_ver:
core = chip_nv_ver[target].get('core')
for core in cores:
if core not in cores:
msg = "[error] [%s] [%s] not a core cfg in nv_storage_cfg.json!" % (target, core)
raise ParserError(msg)
if self.nv_cores_ver_bin[chip].get(core) is None:
self.nv_cores_ver_bin[chip][core] = dict()
page_size = fn_str_to_int(nv_flash_chip_cfg["size"]["page_size"])
total_size = 0
for core in cores:
core_page_nums = nv_flash_chip_cfg['cores'][core]['page_nums']
total_size += core_page_nums * page_size
self.nv_flash_page_index[chip][core] = [(sizeof(NvPageHead) + num * page_size, (num + 1) * page_size) \
for num in range(0, core_page_nums)]
if total_size > fn_str_to_int(nv_flash_chip_cfg["size"]['flash_size']):
msg = "[error] cores size: %s, over total flash size: %s!" % \
(total_size, nv_flash_chip_cfg["size"]['flash_size'])
raise ParserError(msg)
def _gen_nv_stream(self, cfg_file, stream_gen, chip, core):
core_nv_bin = self._init_nv_page_head(chip, core)
cfg_data = BuildConfParser(cfg_file).get_conf_data()
key_id_list = []
last_key_item_start_addr = 0
for module in cfg_data:
for key_item in cfg_data[module]:
if key_item == 'module_id':
continue
key_struct_name = cfg_data[module][key_item].get("structure_type")
key_value = cfg_data[module][key_item].get("value")
key_attr = cfg_data[module][key_item].get("attributions")
key_id = cfg_data[module][key_item].get("key_id")
key_id = fn_str_to_int(key_id) if type(key_id) is not int else key_id
if key_struct_name is None or key_value is None or key_attr is None or key_value == []:
msg = "[error] 'structure_type' 'value' 'attributions' must be configured!"
raise ParserError(msg)
if key_id in key_id_list:
msg = "[error] key id:%d repeated, please check!" % key_id
raise ParserError(msg)
key_id_list.append(key_id)
key_data, key_data_len = stream_gen.generate(key_struct_name, key_value)
page_index, key_item_start_addr = self._find_usable_addr(chip, core, key_data, key_data_len)
core_nv_bin, key_item_start_addr = \
self._init_key_head(core_nv_bin, key_item_start_addr, key_data_len, key_id, key_attr)
core_nv_bin, key_item_start_addr = \
self._set_key_data(core_nv_bin, key_item_start_addr, key_data, key_data_len)
core_nv_bin, key_item_start_addr = \
self._set_key_hash(core_nv_bin, key_item_start_addr, key_data_len)
self._update_core_index(chip, core, page_index, key_item_start_addr)
last_key_item_start_addr = max(last_key_item_start_addr, key_item_start_addr)
core_nv_bin = self._set_unused_page(chip, core_nv_bin, last_key_item_start_addr)
self._reset_core_index(chip, core)
return core_nv_bin
def _set_unused_page(self, chip, core_nv_bin, key_item_start_addr):
page_size = fn_str_to_int(self.nv_flash_cfg[chip]['size']['page_size'])
core_nv_end_addr = (key_item_start_addr + (page_size - 1)) & ~(page_size - 1)
for i in range(key_item_start_addr, core_nv_end_addr):
core_nv_bin[i] = 0xFF
return core_nv_bin[0 : core_nv_end_addr]
def _gen_chip_nv_binary(self):
for chip in self.nv_cores_ver_bin:
chip_bins = self.nv_cores_ver_bin[chip]
ver_list = []
for core in chip_bins:
ver_list.extend(chip_bins[core].keys())
ver_list = set(ver_list)
for ver in ver_list:
self.nv_chip_ver_bin[chip][ver] = self._assemble_ver_bins(chip, ver)
def _assemble_ver_bins(self, chip, ver):
flash_bin = bytearray(fn_str_to_int(self.nv_flash_cfg[chip]['size']['flash_size']))
for i in range(0, len(flash_bin)):
flash_bin[i] = 0xFF
start_addr = 0
chip_bins = self.nv_cores_ver_bin[chip]
cores = self.nv_cores_ver_bin[chip]
for core in chip_bins:
ver_bin = chip_bins[core].get(ver)
flash_bin, start_addr = self._append_file_to_stream(flash_bin, start_addr, chip, core, ver_bin)
chip_ver_bin_file = os.path.join(self.nv_output_dir, self.nv_output_name)
# TODO: 目前只考虑单核场景去除 NV bin 末尾的无用 FF
return self._write_binary_to_file(chip_ver_bin_file, flash_bin[0 : start_addr])
def _append_file_to_stream(self, flash_bin, start_addr, chip, core, ver_bin):
core_bin_size = fn_str_to_int(self.nv_flash_cfg[chip]['size']["page_size"]) * \
self.nv_flash_cfg[chip]["cores"][core]['page_nums']
core_nv_bin = b''
if ver_bin is None:
core_nv_bin = self._init_nv_page_head(chip, core)
else:
with open(ver_bin, 'rb') as f:
core_nv_bin = f.read()
#print('core_nv_bin = ', core_nv_bin)
tail_addr = start_addr + len(core_nv_bin)
flash_bin[start_addr : tail_addr] = core_nv_bin
return flash_bin, tail_addr
def _reset_core_index(self, chip, core):
nv_flash_chip_cfg = self.nv_flash_cfg[chip]
page_size = fn_str_to_int(nv_flash_chip_cfg['size']["page_size"])
core_page_nums = nv_flash_chip_cfg['cores'][core]['page_nums']
self.nv_flash_page_index[chip][core] = [(sizeof(NvPageHead) + num * page_size, (num + 1) * page_size) \
for num in range(0, core_page_nums)]
def _update_core_index(self, chip, core, index, addr):
(start_addr, page_max_addr) = self.nv_flash_page_index[chip][core][index]
if start_addr >= addr or addr > page_max_addr:
msg = "[error] addr %s invalid!" % addr
raise ParserError(msg)
self.nv_flash_page_index[chip][core][index] = (addr, page_max_addr)
#print("update page index: \n", self.nv_flash_page_index)
def _find_usable_addr(self, chip, core, key_data, key_data_len):
page_size = fn_str_to_int(self.nv_flash_cfg[chip]['size'].get('page_size'))
key_item_total_len = self._get_key_item_len(key_data_len)
if key_item_total_len > page_size - sizeof(NvPageHead):
msg = "[error] key over page size !" % key_id
raise ParserError(msg)
index = 0
for (start_addr, page_max_addr) in self.nv_flash_page_index[chip][core]:
if start_addr + key_item_total_len > page_max_addr:
index += 1
continue
return index, start_addr
msg = "[error] no more enough space for [%s]!" % core
raise ParserError(msg)
def _get_key_type_from_attr(self, key_id, attr):
if (attr & 1) and (not (attr & ~1)):
return 0xFF
elif (attr & 2) and (not (attr & ~2)):
return 0x00
elif (attr & 4) and (not (attr & ~4)):
return 0xFF
else:
msg = "[error] attribution config err: [id-%s] [attr-%s] !" % (key_id, attr)
raise ParserError(msg)
def _get_key_upgrade_from_attr(self, key_id, attr):
if (attr & 1) and (not (attr & ~1)):
return 0xFF
elif (attr & 2) and (not (attr & ~2)):
return 0xFF
elif (attr & 4) and (not (attr & ~4)):
return 0x00
else:
msg = "[error] attribution config err: [id-%s] [attr-%s] !" % (key_id, attr)
raise ParserError(msg)
def _get_key_item_len(self, key_data_len):
if key_data_len % 4 != 0:
key_data_len += 4 - key_data_len % 4
return sizeof(KeyHead) + 4 + key_data_len
def _set_key_hash(self, nv_bin, key_item_start_addr, key_data_len):
if key_data_len % 4 != 0:
key_data_len += 4 - key_data_len % 4
hash_start_addr = key_item_start_addr - key_data_len - sizeof(KeyHead)
hash_end_addr = hash_start_addr + key_data_len + sizeof(KeyHead)
if not self.use_crc16:
crc32num = zlib.crc32(nv_bin[hash_start_addr : hash_end_addr])
else:
crc32num = binascii.crc_hqx(nv_bin[hash_start_addr : hash_end_addr], 0)
crc32ret = '{:0>8X}'.format(crc32num)
crc32ret = re.sub(r"(?<=\w)(?=(?:\w\w)+$)", " 0x", crc32ret)
crc32ret = '0x' + crc32ret
crc32list = [int(x,16) for x in crc32ret.split(" ")]
sha256bytearray = bytes(crc32list)
tail_addr = key_item_start_addr + len(sha256bytearray)
nv_bin[key_item_start_addr : tail_addr] = sha256bytearray
return nv_bin, tail_addr
def _set_key_data(self, nv_bin, key_item_start_addr, key_data, key_data_len):
if key_data_len % 4 != 0:
for i in range(0, 4 - key_data_len % 4) :
key_data += b'\x00'
key_data_len += 4 - key_data_len % 4
tail_addr = key_item_start_addr + key_data_len
nv_bin[key_item_start_addr : tail_addr] = key_data
return nv_bin, tail_addr
def _init_key_head(self, nv_bin, key_item_start_addr, key_data_len, key_id, key_attr):
nv_key_st = KeyHead.from_buffer(nv_bin[key_item_start_addr:])
nv_key_st.magic = 0xA9
nv_key_st.length = key_data_len
nv_key_st.type = self._get_key_type_from_attr(key_id, key_attr)
nv_key_st.upgrade = self._get_key_upgrade_from_attr(key_id, key_attr)
nv_key_st.key_id = key_id
nv_key_st.version = 65535
nv_key_st.enc_key = 0 # TODO: 目前不支持加密且nv加密部分不由nv脚本做可能放到初始化
tail_addr = key_item_start_addr + sizeof(KeyHead)
nv_bin[key_item_start_addr : tail_addr] = nv_key_st
return nv_bin, tail_addr
def _init_nv_page_head(self, chip, core):
nv_flash_chip_cfg = self.nv_flash_cfg[chip]
default_page_nums = nv_flash_chip_cfg.get('default_page_nums')
page_size = fn_str_to_int(nv_flash_chip_cfg['size']['page_size'])
page_nums = nv_flash_chip_cfg['cores'][core]['page_nums']
if not self.is_backup:
page_id_start = nv_flash_chip_cfg['cores'][core]['page_id_start']
else:
page_id_start = '0x34B2'
core_nv_size = page_nums * page_size
core_nv_bin = bytearray(core_nv_size)
for i in range(0, core_nv_size):
core_nv_bin[i] = 0xFF
for i in range(0, page_nums):
start_addr = i * page_size
nv_page_head = NvPageHead.from_buffer(core_nv_bin[start_addr:])
nv_page_head.id = fn_str_to_int(page_id_start)
nv_page_head.reserved = 1
nv_page_head.num_pages = i
nv_page_head.inverted_details_word = ~int.from_bytes(struct.pack('HBB', \
nv_page_head.id, nv_page_head.reserved, nv_page_head.num_pages), 'little')
nv_page_head.last_write = 0
nv_page_head.unused = ~nv_page_head.last_write
core_nv_bin[start_addr : start_addr + sizeof(NvPageHead)] = nv_page_head
return core_nv_bin
def _load_nv_flash_cfg(self):
self.nv_flash_cfg = dict()
for chip in self.nv_ver_dict:
cfg_file = os.path.join(self.root, json_conf["NV_TARGET_JSON_PATH"])
self.nv_flash_cfg[chip] = BuildConfParser(cfg_file).get_conf_data()
def _add_nv_ver(self, chip, target, core, ver, common_cfg, ver_cfg, prod_type=None):
'''
Add version config into self.nv_ver_src_dict.
There are three configuration scenarios.One target may correspond to multiple NV versions.
|--- ver1: { srcs:[default, common, cfg1], prod_type: }
|
chip|---target1---|--- ver2: { srcs:[default, cfg2], prod_type: }
| |
| |--- ver3: { srcs:[default, common], prod_type: }
| |
| |--- core : "Each target corresponds to one core."
|---targetN...
'''
ver_cfgs = []
if os.path.exists(common_cfg) is True:
ver_cfgs.append(common_cfg)
if ver_cfg is not None and os.path.exists(ver_cfg):
ver_cfgs.append(ver_cfg)
if self.nv_ver_src_dict.get(chip) is None:
self.nv_ver_src_dict[chip] = dict()
chip_dict = self.nv_ver_src_dict[chip]
if chip_dict.get(target) is not None and chip_dict[target].get(ver) is not None:
msg = "[error] Ver config Repeate!"
raise ParserError(msg)
if chip_dict.get(target) is None:
chip_dict[target] = {ver:{"srcs":ver_cfgs, 'prod_type': prod_type}}
else:
chip_dict[target].update({ver:{"srcs":ver_cfgs, 'prod_type': prod_type}})
if chip_dict[target].get('core') is None:
chip_dict[target]['core'] = core
elif chip_dict[target].get('core') != core:
msg = "[error] [%s] core not match!" % target
raise ParserError(msg)
def _nv_ver_prepare(self, target):
'''
1. Check nv configurations.
2. Add all correct config into self.nv_ver_src_dict..
'''
if type(self.alias[target]) is list:
return False
target_type = self.alias[target].get("TYPE")
if target_type is None or target_type != 'nv':
return False
core = self.alias[target].get("CORE")
if core is None:
msg = "[error] core name not exist!"
raise ParserError(msg)
chip = self.alias[target].get("CHIP")
#default_cfg = os.path.join(self.nv_root, '%s_default.json' % core)
'''
if chip is None or os.path.exists(default_cfg) is False:
msg = "[error] chip name OR %s not exist!" % default_cfg
raise ParserError(msg)
'''
kernel_name = self.alias[target].get("KERNEL_BIN")
if kernel_name is None:
msg = "[error] KERNEL is null!"
raise ParserError(msg)
if self.targets is not None and kernel_name not in self.targets:
return False
cfgs = self.alias[target].get("COMPONENT")
if cfgs is None:
msg = "[error] COMPONENT is null!"
raise ParserError(msg)
prod_type = self.alias[target].get("PRODUCT_TYPE")
if prod_type == "":
prod_type = None
if prod_type is not None and type(prod_type) is not str:
msg = "[error] PRODUCT_TYPE must be a string type, one kernel only suuport one product type!"
raise ParserError(msg)
cfg_dir = os.path.join(self.nv_root, kernel_name)
common_cfg = os.path.join(cfg_dir, 'common.json')
for cfg in cfgs:
cfg_file = os.path.join(cfg_dir, '%s.json' % cfg) if cfg != 'common' else None
self._add_nv_ver(chip, kernel_name, core, cfg, common_cfg, cfg_file, prod_type)
def _prod_type_filter(self, srcs, prod_type = None):
combination = dict()
module_dict = dict()
for src in srcs:
# print("[INFO] nv config src file: ", src)
src_conf = BuildConfParser(src).get_conf_data()
for module in src_conf:
module_id = src_conf.get(module).get('module_id')
if module_id is None:
msg = "[error][file:%s][%s] module_id is null!" % (src, module)
raise ParserError(msg)
module_id = fn_str_to_int(module_id) if type(module_id) is not int else module_id
if module_id > g_u8_max :
msg = "[error][file:%s][%s] module_id is more than 0xFF!" % (src, module)
raise ParserError(msg)
# diffrent module must config with diffrent module_id
if module_id in module_dict:
if module_dict[module_id] != module:
msg = "[error][file:%s][%s] module_id is the same to [%s]!" % (src, module, module_dict[module_id])
raise ParserError(msg)
else:
module_dict[module_id] = module
if module not in combination:
combination[module] = {'module_id': module_id}
elif combination.get(module).get('module_id') != module_id:
msg = "[error][%s][%s] module_id is not same as other file!" % (src, module)
raise ParserError(msg)
for item in src_conf.get(module):
key_cfg = src_conf.get(module).get(item)
if item == 'module_id':
continue
key_id = key_cfg.get('key_id')
key_id = None if key_id is None else (fn_str_to_int(key_id) if type(key_id) is not int else key_id)
# print("key id: %d, module id: %d, key>>8: %d"%( key_id, module_id, key_id>>8))
if key_id is None or key_id > g_u16_max :
msg = "[error][file:%s][%s][%s] key_id is null or more than unsighed 16 or not match with module_id!" % (src, module, item)
raise ParserError(msg)
item_prod_type = key_cfg.get('product_type')
key_status = key_cfg.get('key_status')
#print('prodtype: %s, key prod: %s'%(prod_type, item_prod_type))
if (prod_type == item_prod_type or (item_prod_type is not None and prod_type in item_prod_type)) \
and key_status == 'alive':
combination[module].update({item:key_cfg})
return combination
def _nv_cfg_writer(self, dst_file, combination):
if os.path.exists(os.path.dirname(dst_file)) is False:
os.makedirs(os.path.dirname(dst_file))
if os.path.exists(dst_file):
os.remove(dst_file)
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(dst_file, flags, modes), 'w') as fout:
fout.write(json.dumps(combination, indent=4))
def _merge_ver_cfg(self, file_prefix, ver_cfg):
srcs = ver_cfg.get('srcs')
if srcs is None:
msg = "[error] ver cfg file is null!"
raise ParserError(msg)
prod_type = ver_cfg.get('prod_type')
combination = self._prod_type_filter(srcs, prod_type)
dst_file = '%s.json' % file_prefix
self._nv_cfg_writer(dst_file, combination)
return { "merged_cfg" : dst_file, "prod_type" : prod_type}
def _create_header(self):
pass
# 检查配置文件中是否有必须配置项
def check_key(json_conf):
check_item = ['OUT_BIN_DIR', 'BUILD_TEMP_PATH', 'NV_TARGET_JSON_PATH', \
'NV_RELATIVE_PATH', 'NV_DEFAULT_CFG_DIR', 'DATABASE_TXT_FILE']
keys = dict.keys(json_conf)
for check_key in check_item:
if check_key not in keys:
msg = "[error] [nv_binary] need add ConfigMap (%s) in json_conf!" % (check_key)
raise ParserError(msg)
def test(targets, flag, backup, use_crc16):
root = g_root
nv_target_json_path = os.path.join(root, json_conf["NV_TARGET_JSON_PATH"])
alias_conf = BuildConfParser(nv_target_json_path).get_conf_data()
worker = BuildNv(alias_conf, root, targets, backup, use_crc16)
if flag:
worker.set_nv_output_dir(os.path.join(root, json_conf["OUT_BIN_DIR"]))
worker.start_work()
def nv_begin(in_path, targets, flag, gen_backup=False, use_crc16=False):
global json_conf
with open(in_path, 'r') as i:
json_conf = json.load(i)
check_key(json_conf)
test(targets, flag, False, use_crc16)
if gen_backup:
test(targets, flag, True, use_crc16)
print("build nv bin success!!")
if __name__ == "__main__":
in_path = sys.argv[2]
targets = sys.argv[3].split()
flag = len(sys.argv) == 3
nv_begin(in_path, targets, flag)