alkaid_release_platform/utils/utils.py
ekko.bao 0a0f6a6054 初次创建仓库提交代码
1. 已经构建好了架子了。
2. 添加了示例的插件
2025-04-21 06:37:06 +00:00

623 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
import re
import os, sys
from functools import wraps
from typing import Any, Dict, List, Iterable
import json
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import core.logger as logger
log = logger.get_logger()
# 自定义 JSON 编码器,处理不可序列化的类型
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Enum):
# 对于枚举类型,返回其值
return obj.value
elif hasattr(obj, '__dict__'):
# 对于具有 __dict__ 属性的对象,返回其字典表示
return obj.__dict__
try:
# 尝试将对象转换为列表
iterable = iter(obj)
return list(iterable)
except TypeError:
pass
# 让基类处理其他类型,或抛出 TypeError
return super().default(obj)
LOAD_INDEX = 0
def loading_animation(func):
"""加载动画装饰器在每次函数被call执行时显示一个循环加载动画
"""
@wraps(func)
def wrapper(*args, **kwargs):
global LOAD_INDEX
spinner = ['|', '/', '-', '\\']
if os.isatty(sys.stdout.fileno()):
out_str = '\r' + spinner[LOAD_INDEX]
else:
out_str = spinner[LOAD_INDEX]
sys.stdout.write(out_str)
sys.stdout.flush()
LOAD_INDEX = (LOAD_INDEX + 1) % len(spinner)
return func(*args, **kwargs)
return wrapper
def obj2s(obj:any, indent:int=0) -> str:
"""将对象转换为字符串
"""
if isinstance(obj, dict):
ret = " "*indent + "{"
for k,v in obj.items():
ret += " "*indent + "\"{}\" : {}\n".format(k, obj2s(v, indent+2))
ret = " "*indent + "}"
return ret
elif isinstance(obj, (list, set)):
ret = "["
for idx, item in enumerate(obj):
if idx != 0:
ret += ","
ret += " "*indent + obj2s(item, indent+2)
ret = "]"
return ret
elif isinstance(obj, str):
return "\"{}\"".format(obj)
elif isinstance(obj, bool):
return str(obj).lower()
else:
return str(obj)
def get_alkaid_root(base_dir=os.path.curdir):
"""获取alkaid的根目录
1. 从base_dir目录开始查找base_dir默认为当前环境目录
2. 如果找不到,则尝试从文件路径开始向上
3. 如果还是找不到,则抛出错误
"""
cwd = os.path.realpath(base_dir)
try_dirs = [Path(cwd).absolute(), Path(__file__).absolute()]
def check_subdir(cdir,dirs):
for dir_ in dirs:
# print(f"check {cdir}/{dir_} exists")
if not cdir.joinpath(dir_).is_dir():
return False
log.info(f"Alkaid found in {cdir}")
return True
for item in try_dirs:
try_dir = item
while try_dir != Path.home():
if check_subdir(try_dir, [".repo", "project", "sdk", "kernel"]):
return str(try_dir)
try_dir = try_dir.parent
log.trace(f"Alkaid cant found in {item} path tree")
raise FileNotFoundError("Alkaid not found in this directory or any parent directory")
def get_files(directory, filter=lambda root, file: True, recursive=True) -> List[str]:
"""获取指定目录下的所有文件
Args:
directory: 目录路径
filter: 过滤函数,用于过滤文件。默认是保留所有文件
filter(root, file)
1.1 root: 当前目录, file: 当前文件
1.2 返回True表示保留返回False表示过滤
recursive: 是否递归获取目录下的所有文件。默认是True如果为False则只获取当前目录下的文件
Returns:
List[str]: 目录下的所有文件列表,包含完整路径
"""
file_list = []
#如果传入的是一个字符串的pattern则转化为正则表达式
if isinstance(filter, str):
filter = lambda root, file: bool(re.match(filter, file))
if recursive:
for root, dirs, files in os.walk(directory):
files = [f for f in files if filter(root, f)]
file_list += [os.path.join(root, file) for file in files]
else:
files = os.listdir(directory)
files = [f for f in files if filter(directory, f)]
file_list += [os.path.join(directory, file) for file in files]
return file_list
def is_valid_path(path):
try:
# 尝试创建一个 Path 对象
Path(path)
return True
except Exception:
return False
class ContentType(Enum):
"""文件内容类型标签
"""
KV = "kv" #key-value格式
RAW = "raw" #原始格式
def __str__(self):
return self.value
def __repr__(self):
return self.value
TYPE = 'type'
CONTENT = 'content'
class ContentStruct(list):
"""用于结构化的存储文件内容
1. 文件内容结构化存储,便于后续的修改和同步
2. 使用列表从上到下存储,遍历列表即可获得文件内容
3. 使用字典区分内容类型,便于后续的修改和同步
"""
def append(self, type:ContentType, content:Any):
"""在列表末尾添加一个内容
"""
_item = {
TYPE: type,
CONTENT: content
}
super().append(_item)
def insert(self, idx:int, type:ContentType, content:Any):
"""在指定位置插入一个内容
"""
_item = {
TYPE: type,
CONTENT: content
}
super().insert(idx, _item)
def __str__(self):
return json.dumps(self, indent=4, cls=CustomJSONEncoder)
def __repr__(self):
return self.__str__()
class FileParser(ABC):
def __init__(self, file_path:str):
"""
初始化文件解析器
"""
self.file_path = file_path
self._raws = []
self._content = ContentStruct()
self.reload()
def reload(self):
"""重新加载文件内容"""
self._content.clear()
with open(self.file_path, 'r') as f:
self._raws = f.read().split('\n') # 使用这种方式可以保留行末的换行符
@abstractmethod
def _parse(self) -> Dict[str,str]:
pass
@abstractmethod
def __getitem__(self, key:str) -> Any:
pass
@abstractmethod
def __setitem__(self, key:str, value:Any):
pass
@abstractmethod
def __delitem__(self, key:str):
pass
def __iter__(self):
"""迭代器"""
for item in self._content:
if item[TYPE] == ContentType.KV:
yield item[CONTENT][0], item[CONTENT][2]
def __len__(self):
"""获取长度"""
return len([_ for _ in self._content if _[TYPE] == ContentType.KV])
class DefconfigParser(FileParser):
# 过滤器,用于过滤掉不符合条件的行
def filter(self, type:ContentType, key:str, item:Dict) -> bool:
return item[TYPE] != type or key != item[CONTENT][0].strip()
# 分隔符用于分隔key和value
DELIMITER = ['=']
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._parse()
def _parse(self):
"""解析defconfig文件
"""
for line in self._raws:
#构建key-value对 value中可能也有=号所以要拼接一下
key, delimiter, value, tail = self._parse_line(line)
if key and delimiter:
self._content.append(ContentType.KV, [key, delimiter, value, tail])
else:
self._content.append(ContentType.RAW, value)
def _parse_line(self, raw_line):
"""解析defconfig文件的每一行
"""
line = raw_line.strip()
if line.startswith('#') or not line:
return None, None, raw_line, None
for delimiter in self.DELIMITER:
parts = line.split(delimiter, 1)
key = parts[0].strip()
if len(parts) < 2:
delimiter = delimiter.strip()
if key.endswith(delimiter): # 例如 var_name = 这样的情况
return key[:-len(delimiter)], delimiter, '', ''
continue
value = parts[1]
parts = value.split('#', 1) # 如果value后面存在注释则将注释分离出来
value = parts[0]
tail = '#' + parts[1] if len(parts) > 1 else ''
return key, delimiter, value, tail
return None, None, raw_line, ''
def get(self, key:str, default:Any=None) -> str:
#从后往前遍历因为defconfig文件中可能存在多个重复的key所以需要从后往前遍历
for item in reversed(self._content):
if self.filter(ContentType.KV, key, item):
continue
return item[CONTENT][2].strip() #item[CONTENT] = [key, delimiter, value, tail]
return default
def index(self, key:str) -> int:
"""获取指定key的索引
"""
for idx, item in enumerate(self._content):
if self.filter(ContentType.KV, key, item):
continue
return idx
return -1
def insert(self, idx, key:str, delimiter:str='=', value:Any='', tail:str=''):
"""在指定位置插入指定key的值
"""
self._content.insert(idx, ContentType.KV, [key, delimiter, value, tail])
def __getitem__(self, key:str) -> Any:
"""获取指定key的值,如果key不存在则返回None
"""
ret = self.get(key, None)
if ret is None:
raise KeyError(f"Key {key} not found in {self.file_path}")
return ret
def __setitem__(self, key:str, value:Any):
"""设置指定key的值
"""
is_found = False
for idx in range(len(self._content) -1, -1, -1):
if self.filter(ContentType.KV, key, self._content[idx]):
continue
if is_found: #一个文件中可能存在多个重复的key所以需要从后往前遍历修改最后一个并且将其他的删除掉
del self._content[idx]
continue
self._content[idx][CONTENT][2] = value #item[CONTENT] = [key, delimiter, value]
is_found = True
if not is_found:
self._content.append(ContentType.KV, [key, '=', value])
def __delitem__(self, key:str):
"""删除指定key的配置项
"""
del_idx = []
for idx, item in enumerate(self._content):
if self.filter(ContentType.KV, key, item):
continue
del_idx.append(idx)
#return 有可能存在多个重复的key所以不能直接返回要全部删掉
if not del_idx:
raise KeyError(f"Key {key} not found in {self.file_path}")
else:
for idx in reversed(del_idx):
del self._content[idx]
def __contains__(self, key:str) -> bool:
"""判断指定key是否存在
"""
return self.index(key) != -1
def sync(self):
"""将_content中的修改同步到_raws中
同步策略:
1. 如果item[TYPE] == ContentType.RAW则将item[CONTENT]添加到_raws中
2. 如果item[TYPE] == ContentType.KV则将item[CONTENT]转换为字符串并添加到_raws中
3. 如果value为None则删除该行
"""
_content = []
for item in self._content:
if item[TYPE] == ContentType.RAW: #原始行
_content.append(item[CONTENT])
elif item[TYPE] == ContentType.KV: #key-value行
_content.append(''.join(item[CONTENT]))
self._raws = _content
def flush(self, file_path=None):
"""将_content中的修改同步到文件中
Args:
file_path: 要同步到的文件路径,默认为当前文件路径
"""
if file_path is None:
file_path = self.file_path
self.sync()
with open(file_path, 'w') as f:
f.write('\n'.join(self._raws))
def __del__(self):
"""析构函数,确保对象销毁时自动同步到文件"""
# try:
# self.flush()
# except Exception as e:
# # 防止析构时出现异常导致程序崩溃
# log.error(f"Error during auto-sync in __del__: {e}")
pass
def __str__(self):
return self._content.__str__()
def __repr__(self):
return self.__str__()
class MakefileParser(FileParser):
"""解析 Makefile 文件,提供类似字典的接口进行访问和修改
支持的 Makefile 语法:
- 变量赋值: 使用 '=', ':='
- 多行续写: 使用 '\'
提供以下接口:
- get(key, default): 获取指定 key 的值,如果不存在则返回 default
- __getitem__(key): 获取指定 key 的值,等同于使用 parser[key]
- __setitem__(key, value): 设置指定 key 的值,等同于使用 parser[key] = value
- __delitem__(key): 删除指定 key等同于使用 del parser[key]
- flush(): 将内存中的修改写回文件
"""
# Makefile的分隔符列表按优先级排序
DELIMITERS = [':=', '?=', ' =', '=', ":"]
def __init__(self, *args, **kwargs):
self.content_modifed = False
super().__init__(*args, **kwargs)
self._parse()
def _parse(self):
self._content.clear()
"""解析 Makefile 文件内容,构建结构化存储"""
cur_key, cur_delimiter, cur_value, tail, is_continued = None, None, [], None, False
for line in self._raws:
key, delimiter, value, tail = self._parse_line(line, is_continued)
# 如果这一行是一个全新的key-value对 则先将其先暂存起来
if key and delimiter:
if cur_key:
raise ValueError(f"Last line not ended, but new line started: {line}")
cur_key, cur_delimiter = key, delimiter
if value:
cur_value.append(value)
# 如果这一行是上一行的续行, 这只需要将这一行value添加到current_value中
elif is_continued and value and not key and not delimiter:
cur_value.append(value)
is_continued = True if tail == '\\' else False
# 如果当前存在key和delimiter并且不是续行已经结束 则将当前的key-value对保存到内容结构中
if cur_key and not is_continued:
self._content.append(ContentType.KV, [cur_key, cur_delimiter, cur_value, tail])
cur_key, cur_delimiter, cur_value, tail, is_continued = None, None, [], None, False
elif not cur_key:
self._content.append(ContentType.RAW, line)
# 处理最后一个键值对
if cur_key:
self._content.append(ContentType.KV, [cur_key, cur_delimiter, cur_value, tail])
def _parse_line(self, raw_line, is_continued):
"""解析行,识别分隔符、键和值
Args:
raw_line: 原始行
is_continued: 是否为续行
Returns:
(key, delimiter, value, tail)
key: 键
delimiter: 分隔符
value: 值
tail: 行尾部的数据,比如注释
"""
line = raw_line.strip()
if line.startswith('#') or not line: # 单独的一个注释行 或者空行
return None, None, raw_line, None
if is_continued: # 说明这一行的内容就是上一行的接续所有内容都是value
# line = line[:-1].strip()
if raw_line.endswith('\\'): # 如果存在续行符,则将续行符分离出来
return None, None, raw_line[:-1], raw_line[-1]
return None, None, raw_line, None
# 尝试切割划出key和value
for delimiter in self.DELIMITERS:
parts = line.split(delimiter, 1)
if len(parts) < 2:
if line.endswith(delimiter): # 例如 var_name := 这样的情况
return parts[0].strip(), delimiter, None, None
continue
key, value = parts[0].strip(), parts[1]
if not key.isidentifier(): # 如果key不是一个合法的标识符说明这个分隔符是误会
continue
if value.endswith('\\'): # 如果value后面存在续行符则将续行符分离出来
value, tail = value[:-1], value[-1]
else:
parts = value.split('#', 1) # 如果value后面存在注释则将注释分离出来
value, tail = parts[0], '#' + parts[1] if len(parts) > 1 else None
return key, delimiter, value, tail
# 如果没有任何分隔符,则将整行作为 value 返回
return None, None, raw_line, None
def get(self, key, default=None):
"""获取指定键的值,如果不存在则返回默认值"""
idx = self.index(key)
if idx == -1:
return default
item = self._content[idx]
value = item[CONTENT][2]
return self._value_process(value)
def _value_process(self, value):
"""处理value 将value中存在的空格去掉
"""
ret = []
for _ in value:
ret += _.split()
value = ret
return [_.strip() for _ in value]
def index(self, key):
"""获取指定键的索引"""
for idx, item in enumerate(self._content):
if item[TYPE] == ContentType.KV and item[CONTENT][0].strip() == key:
return idx
return -1
def insert(self, idx:Any, key:str, delimiter:str=':=', values:Any=None, tail:str=None):
"""插入指定位置插入值如果idx是key则将key插入到key的第一个匹配项的位置
Args:
idx: 插入位置可以是索引也可以是key
key: 键
delimiter: 分隔符
values: 值
tail: 行尾部的数据,比如注释
"""
if isinstance(idx, str):
idx = self.index(idx)
if idx == -1:
raise KeyError(f"Key '{idx}' not found in {self.file_path}")
if not isinstance(idx, int):
raise ValueError(f"Invalid index type: {type(idx)}")
if not isinstance(values, list):
values = [str(values)]
self._content.insert(idx, ContentType.KV, [key, delimiter, values, tail])
def replace(self, old_pattern, new:str, global_replace=True):
"""替换指定键的值
Args:
old_pattern: 旧的pattern
new: 新的pattern
global_replace: 是否全局替换
"""
if self.content_modifed:
self.sync()
for idx, item in enumerate(self._raws):
if not re.match(old_pattern, item):
continue
self._raws[idx] = re.sub(old_pattern, new, item)
if not global_replace:
break
log.info(f"{self._raws}")
self._parse()
def delete(self, old_pattern, global_delete=True):
"""删除指定键的值
Args:
old_pattern: 旧的pattern
global_delete: 是否全局删除
"""
del_idx = []
if self.content_modifed:
self.sync()
for idx, item in enumerate(self._raws):
if re.match(old_pattern, item):
del_idx.append(idx)
if not global_delete:
break
for idx in reversed(del_idx):
del self._raws[idx]
self._parse()
def __getitem__(self, key):
"""获取指定键的值"""
if isinstance(key, str):
value = self.get(key, None)
if value is None:
raise KeyError(f"Key '{key}' not found in {self.file_path}")
elif isinstance(key, int):
return self._value_process(self._content[key][CONTENT][2])
else:
raise KeyError(f"Invalid key type: {type(key)}")
def __setitem__(self, key, value):
"""设置指定键的值
如果键已存在,则更新最后一个匹配项并删除其他匹配项
如果键不存在,则在文件末尾添加
"""
# 将单个字符串转换为列表
if not isinstance(value, list):
value = [str(value)]
found = False
log.info(f"set [{key}] = {value}")
# 从后往前遍历以找到并更新最后一个匹配项
for i in range(len(self._content) - 1, -1, -1):
item = self._content[i]
if item[TYPE] == ContentType.KV and item[CONTENT][0].strip() == key.strip():
if not found:
# 更新最后一个匹配项
log.info(f"update [{key}] = {self._content[i][CONTENT][2]} to {value}")
self._content[i][CONTENT][2] = value
found = True
else:
# 删除其他匹配项
del self._content[i]
self.content_modifed = True
# 如果没有找到匹配项,添加新项
if not found:
if not key.isidentifier(): # 如果key中是非标识符字符则认为这是一个正则pattern
self.replace(key, value[0])
else:
# 新增一个kv结构
self._content.append(ContentType.KV, [key, ' := ', value, None])
self.content_modifed = True
def __delitem__(self, key):
"""删除指定键的所有实例"""
found = False
for i in range(len(self._content) - 1, -1, -1):
item = self._content[i]
if item[TYPE] == ContentType.KV and item[CONTENT][0].strip() == key.strip():
del self._content[i]
found = True
if not found:
raise KeyError(f"Key '{key}' not found in {self.file_path}")
else:
self.delete(key)
def sync(self):
"""同步文件内容"""
if not self.content_modifed:
return
self.content_modifed = False
_content = []
for item in self._content:
if item[TYPE] == ContentType.RAW:
line = item[CONTENT]
elif item[TYPE] == ContentType.KV:
key, delimiter, values, tail = item[CONTENT]
line = f"{key}{delimiter}"
line += '\\\n'.join(values)
line += tail if tail else ''
else:
raise ValueError(f"Invalid item type: {item[TYPE]}")
_content.append(line)
_content = '\n'.join(_content)
self._raws = _content.split('\n')
def flush(self, file_path=None):
"""将内存中的修改写回文件"""
if file_path is None:
file_path = self.file_path
self.sync()
with open(file_path, 'w') as f:
f.write('\n'.join(self._raws))
def __str__(self):
return json.dumps(self._content, indent=4, cls=CustomJSONEncoder)
def __repr__(self):
return self.__str__()