from abc import ABC, abstractmethod from enum import Enum from pathlib import Path import re import os, sys from functools import wraps from typing import Any, Dict, List, Iterable import json sys.path.append(os.path.join(os.path.dirname(__file__), '..')) import core.logger as logger log = logger.get_logger() # 自定义 JSON 编码器,处理不可序列化的类型 class CustomJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, Enum): # 对于枚举类型,返回其值 return obj.value elif hasattr(obj, '__dict__'): # 对于具有 __dict__ 属性的对象,返回其字典表示 return obj.__dict__ try: # 尝试将对象转换为列表 iterable = iter(obj) return list(iterable) except TypeError: pass # 让基类处理其他类型,或抛出 TypeError return super().default(obj) LOAD_INDEX = 0 def loading_animation(func): """加载动画装饰器:在每次函数被call执行时显示一个循环加载动画 """ @wraps(func) def wrapper(*args, **kwargs): global LOAD_INDEX spinner = ['|', '/', '-', '\\'] if os.isatty(sys.stdout.fileno()): out_str = '\r' + spinner[LOAD_INDEX] else: out_str = spinner[LOAD_INDEX] sys.stdout.write(out_str) sys.stdout.flush() LOAD_INDEX = (LOAD_INDEX + 1) % len(spinner) return func(*args, **kwargs) return wrapper def obj2s(obj:any, indent:int=0) -> str: """将对象转换为字符串 """ if isinstance(obj, dict): ret = " "*indent + "{" for k,v in obj.items(): ret += " "*indent + "\"{}\" : {}\n".format(k, obj2s(v, indent+2)) ret = " "*indent + "}" return ret elif isinstance(obj, (list, set)): ret = "[" for idx, item in enumerate(obj): if idx != 0: ret += "," ret += " "*indent + obj2s(item, indent+2) ret = "]" return ret elif isinstance(obj, str): return "\"{}\"".format(obj) elif isinstance(obj, bool): return str(obj).lower() else: return str(obj) def get_alkaid_root(base_dir=os.path.curdir): """获取alkaid的根目录 1. 从base_dir目录开始查找,base_dir默认为当前环境目录 2. 如果找不到,则尝试从文件路径开始向上 3. 如果还是找不到,则抛出错误 """ cwd = os.path.realpath(base_dir) try_dirs = [Path(cwd).absolute(), Path(__file__).absolute()] def check_subdir(cdir,dirs): for dir_ in dirs: # print(f"check {cdir}/{dir_} exists") if not cdir.joinpath(dir_).is_dir(): return False log.info(f"Alkaid found in {cdir}") return True for item in try_dirs: try_dir = item while try_dir != Path.home(): if check_subdir(try_dir, [".repo", "project", "sdk", "kernel"]): return str(try_dir) try_dir = try_dir.parent log.trace(f"Alkaid cant found in {item} path tree") raise FileNotFoundError("Alkaid not found in this directory or any parent directory") def get_files(directory, filter=lambda root, file: True, recursive=True) -> List[str]: """获取指定目录下的所有文件 Args: directory: 目录路径 filter: 过滤函数,用于过滤文件。默认是保留所有文件 filter(root, file) 1.1 root: 当前目录, file: 当前文件 1.2 返回True表示保留,返回False表示过滤 recursive: 是否递归获取目录下的所有文件。默认是True,如果为False,则只获取当前目录下的文件 Returns: List[str]: 目录下的所有文件列表,包含完整路径 """ file_list = [] #如果传入的是一个字符串的pattern,则转化为正则表达式 if isinstance(filter, str): filter = lambda root, file: bool(re.match(filter, file)) if recursive: for root, dirs, files in os.walk(directory): files = [f for f in files if filter(root, f)] file_list += [os.path.join(root, file) for file in files] else: files = os.listdir(directory) files = [f for f in files if filter(directory, f)] file_list += [os.path.join(directory, file) for file in files] return file_list def is_valid_path(path): try: # 尝试创建一个 Path 对象 Path(path) return True except Exception: return False class ContentType(Enum): """文件内容类型标签 """ KV = "kv" #key-value格式 RAW = "raw" #原始格式 def __str__(self): return self.value def __repr__(self): return self.value TYPE = 'type' CONTENT = 'content' class ContentStruct(list): """用于结构化的存储文件内容 1. 文件内容结构化存储,便于后续的修改和同步 2. 使用列表从上到下存储,遍历列表即可获得文件内容 3. 使用字典区分内容类型,便于后续的修改和同步 """ def append(self, type:ContentType, content:Any): """在列表末尾添加一个内容 """ _item = { TYPE: type, CONTENT: content } super().append(_item) def insert(self, idx:int, type:ContentType, content:Any): """在指定位置插入一个内容 """ _item = { TYPE: type, CONTENT: content } super().insert(idx, _item) def __str__(self): return json.dumps(self, indent=4, cls=CustomJSONEncoder) def __repr__(self): return self.__str__() class FileParser(ABC): def __init__(self, file_path:str): """ 初始化文件解析器 """ self.file_path = file_path self._raws = [] self._content = ContentStruct() self.reload() def reload(self): """重新加载文件内容""" self._content.clear() with open(self.file_path, 'r') as f: self._raws = f.read().split('\n') # 使用这种方式可以保留行末的换行符 @abstractmethod def _parse(self) -> Dict[str,str]: pass @abstractmethod def __getitem__(self, key:str) -> Any: pass @abstractmethod def __setitem__(self, key:str, value:Any): pass @abstractmethod def __delitem__(self, key:str): pass def __iter__(self): """迭代器""" for item in self._content: if item[TYPE] == ContentType.KV: yield item[CONTENT][0], item[CONTENT][2] def __len__(self): """获取长度""" return len([_ for _ in self._content if _[TYPE] == ContentType.KV]) class DefconfigParser(FileParser): # 过滤器,用于过滤掉不符合条件的行 def filter(self, type:ContentType, key:str, item:Dict) -> bool: return item[TYPE] != type or key != item[CONTENT][0].strip() # 分隔符,用于分隔key和value, DELIMITER = ['='] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._parse() def _parse(self): """解析defconfig文件 """ for line in self._raws: #构建key-value对 value中可能也有=号所以要拼接一下 key, delimiter, value, tail = self._parse_line(line) if key and delimiter: self._content.append(ContentType.KV, [key, delimiter, value, tail]) else: self._content.append(ContentType.RAW, value) def _parse_line(self, raw_line): """解析defconfig文件的每一行 """ line = raw_line.strip() if line.startswith('#') or not line: return None, None, raw_line, None for delimiter in self.DELIMITER: parts = line.split(delimiter, 1) key = parts[0].strip() if len(parts) < 2: delimiter = delimiter.strip() if key.endswith(delimiter): # 例如 var_name = 这样的情况 return key[:-len(delimiter)], delimiter, '', '' continue value = parts[1] parts = value.split('#', 1) # 如果value后面存在注释,则将注释分离出来 value = parts[0] tail = '#' + parts[1] if len(parts) > 1 else '' return key, delimiter, value, tail return None, None, raw_line, '' def get(self, key:str, default:Any=None) -> str: #从后往前遍历,因为defconfig文件中可能存在多个重复的key,所以需要从后往前遍历 for item in reversed(self._content): if self.filter(ContentType.KV, key, item): continue return item[CONTENT][2].strip() #item[CONTENT] = [key, delimiter, value, tail] return default def index(self, key:str) -> int: """获取指定key的索引 """ for idx, item in enumerate(self._content): if self.filter(ContentType.KV, key, item): continue return idx return -1 def insert(self, idx, key:str, delimiter:str='=', value:Any='', tail:str=''): """在指定位置插入指定key的值 """ self._content.insert(idx, ContentType.KV, [key, delimiter, value, tail]) def __getitem__(self, key:str) -> Any: """获取指定key的值,如果key不存在,则返回None """ ret = self.get(key, None) if ret is None: raise KeyError(f"Key {key} not found in {self.file_path}") return ret def __setitem__(self, key:str, value:Any): """设置指定key的值 """ is_found = False for idx in range(len(self._content) -1, -1, -1): if self.filter(ContentType.KV, key, self._content[idx]): continue if is_found: #一个文件中可能存在多个重复的key,所以需要从后往前遍历,修改最后一个并且将其他的删除掉 del self._content[idx] continue self._content[idx][CONTENT][2] = value #item[CONTENT] = [key, delimiter, value] is_found = True if not is_found: self._content.append(ContentType.KV, [key, '=', value]) def __delitem__(self, key:str): """删除指定key的配置项 """ del_idx = [] for idx, item in enumerate(self._content): if self.filter(ContentType.KV, key, item): continue del_idx.append(idx) #return 有可能存在多个重复的key,所以不能直接返回,要全部删掉 if not del_idx: raise KeyError(f"Key {key} not found in {self.file_path}") else: for idx in reversed(del_idx): del self._content[idx] def __contains__(self, key:str) -> bool: """判断指定key是否存在 """ return self.index(key) != -1 def sync(self): """将_content中的修改同步到_raws中 同步策略: 1. 如果item[TYPE] == ContentType.RAW,则将item[CONTENT]添加到_raws中 2. 如果item[TYPE] == ContentType.KV,则将item[CONTENT]转换为字符串并添加到_raws中 3. 如果value为None,则删除该行 """ _content = [] for item in self._content: if item[TYPE] == ContentType.RAW: #原始行 _content.append(item[CONTENT]) elif item[TYPE] == ContentType.KV: #key-value行 _content.append(''.join(item[CONTENT])) self._raws = _content def flush(self, file_path=None): """将_content中的修改同步到文件中 Args: file_path: 要同步到的文件路径,默认为当前文件路径 """ if file_path is None: file_path = self.file_path self.sync() with open(file_path, 'w') as f: f.write('\n'.join(self._raws)) def __del__(self): """析构函数,确保对象销毁时自动同步到文件""" # try: # self.flush() # except Exception as e: # # 防止析构时出现异常导致程序崩溃 # log.error(f"Error during auto-sync in __del__: {e}") pass def __str__(self): return self._content.__str__() def __repr__(self): return self.__str__() class MakefileParser(FileParser): """解析 Makefile 文件,提供类似字典的接口进行访问和修改 支持的 Makefile 语法: - 变量赋值: 使用 '=', ':=' 等 - 多行续写: 使用 '\' 提供以下接口: - get(key, default): 获取指定 key 的值,如果不存在则返回 default - __getitem__(key): 获取指定 key 的值,等同于使用 parser[key] - __setitem__(key, value): 设置指定 key 的值,等同于使用 parser[key] = value - __delitem__(key): 删除指定 key,等同于使用 del parser[key] - flush(): 将内存中的修改写回文件 """ # Makefile的分隔符列表,按优先级排序 DELIMITERS = [':=', '?=', ' =', '=', ":"] def __init__(self, *args, **kwargs): self.content_modifed = False super().__init__(*args, **kwargs) self._parse() def _parse(self): self._content.clear() """解析 Makefile 文件内容,构建结构化存储""" cur_key, cur_delimiter, cur_value, tail, is_continued = None, None, [], None, False for line in self._raws: key, delimiter, value, tail = self._parse_line(line, is_continued) # 如果这一行是一个全新的key-value对, 则先将其先暂存起来 if key and delimiter: if cur_key: raise ValueError(f"Last line not ended, but new line started: {line}") cur_key, cur_delimiter = key, delimiter if value: cur_value.append(value) # 如果这一行是上一行的续行, 这只需要将这一行value添加到current_value中 elif is_continued and value and not key and not delimiter: cur_value.append(value) is_continued = True if tail == '\\' else False # 如果当前存在key和delimiter,并且不是续行(已经结束) 则将当前的key-value对保存到内容结构中 if cur_key and not is_continued: self._content.append(ContentType.KV, [cur_key, cur_delimiter, cur_value, tail]) cur_key, cur_delimiter, cur_value, tail, is_continued = None, None, [], None, False elif not cur_key: self._content.append(ContentType.RAW, line) # 处理最后一个键值对 if cur_key: self._content.append(ContentType.KV, [cur_key, cur_delimiter, cur_value, tail]) def _parse_line(self, raw_line, is_continued): """解析行,识别分隔符、键和值 Args: raw_line: 原始行 is_continued: 是否为续行 Returns: (key, delimiter, value, tail) key: 键 delimiter: 分隔符 value: 值 tail: 行尾部的数据,比如注释 """ line = raw_line.strip() if line.startswith('#') or not line: # 单独的一个注释行 或者空行 return None, None, raw_line, None if is_continued: # 说明这一行的内容就是上一行的接续,所有内容都是value # line = line[:-1].strip() if raw_line.endswith('\\'): # 如果存在续行符,则将续行符分离出来 return None, None, raw_line[:-1], raw_line[-1] return None, None, raw_line, None # 尝试切割划出key和value for delimiter in self.DELIMITERS: parts = line.split(delimiter, 1) if len(parts) < 2: if line.endswith(delimiter): # 例如 var_name := 这样的情况 return parts[0].strip(), delimiter, None, None continue key, value = parts[0].strip(), parts[1] if not key.isidentifier(): # 如果key不是一个合法的标识符,说明这个分隔符是误会 continue if value.endswith('\\'): # 如果value后面存在续行符,则将续行符分离出来 value, tail = value[:-1], value[-1] else: parts = value.split('#', 1) # 如果value后面存在注释,则将注释分离出来 value, tail = parts[0], '#' + parts[1] if len(parts) > 1 else None return key, delimiter, value, tail # 如果没有任何分隔符,则将整行作为 value 返回 return None, None, raw_line, None def get(self, key, default=None): """获取指定键的值,如果不存在则返回默认值""" idx = self.index(key) if idx == -1: return default item = self._content[idx] value = item[CONTENT][2] return self._value_process(value) def _value_process(self, value): """处理value 将value中存在的空格去掉 """ ret = [] for _ in value: ret += _.split() value = ret return [_.strip() for _ in value] def index(self, key): """获取指定键的索引""" for idx, item in enumerate(self._content): if item[TYPE] == ContentType.KV and item[CONTENT][0].strip() == key: return idx return -1 def insert(self, idx:Any, key:str, delimiter:str=':=', values:Any=None, tail:str=None): """插入指定位置插入值,如果idx是key,则将key插入到key的第一个匹配项的位置 Args: idx: 插入位置,可以是索引,也可以是key key: 键 delimiter: 分隔符 values: 值 tail: 行尾部的数据,比如注释 """ if isinstance(idx, str): idx = self.index(idx) if idx == -1: raise KeyError(f"Key '{idx}' not found in {self.file_path}") if not isinstance(idx, int): raise ValueError(f"Invalid index type: {type(idx)}") if not isinstance(values, list): values = [str(values)] self._content.insert(idx, ContentType.KV, [key, delimiter, values, tail]) def replace(self, old_pattern, new:str, global_replace=True): """替换指定键的值 Args: old_pattern: 旧的pattern new: 新的pattern global_replace: 是否全局替换 """ if self.content_modifed: self.sync() for idx, item in enumerate(self._raws): if not re.match(old_pattern, item): continue self._raws[idx] = re.sub(old_pattern, new, item) if not global_replace: break log.info(f"{self._raws}") self._parse() def delete(self, old_pattern, global_delete=True): """删除指定键的值 Args: old_pattern: 旧的pattern global_delete: 是否全局删除 """ del_idx = [] if self.content_modifed: self.sync() for idx, item in enumerate(self._raws): if re.match(old_pattern, item): del_idx.append(idx) if not global_delete: break for idx in reversed(del_idx): del self._raws[idx] self._parse() def __getitem__(self, key): """获取指定键的值""" if isinstance(key, str): value = self.get(key, None) if value is None: raise KeyError(f"Key '{key}' not found in {self.file_path}") elif isinstance(key, int): return self._value_process(self._content[key][CONTENT][2]) else: raise KeyError(f"Invalid key type: {type(key)}") def __setitem__(self, key, value): """设置指定键的值 如果键已存在,则更新最后一个匹配项并删除其他匹配项 如果键不存在,则在文件末尾添加 """ # 将单个字符串转换为列表 if not isinstance(value, list): value = [str(value)] found = False log.info(f"set [{key}] = {value}") # 从后往前遍历以找到并更新最后一个匹配项 for i in range(len(self._content) - 1, -1, -1): item = self._content[i] if item[TYPE] == ContentType.KV and item[CONTENT][0].strip() == key.strip(): if not found: # 更新最后一个匹配项 log.info(f"update [{key}] = {self._content[i][CONTENT][2]} to {value}") self._content[i][CONTENT][2] = value found = True else: # 删除其他匹配项 del self._content[i] self.content_modifed = True # 如果没有找到匹配项,添加新项 if not found: if not key.isidentifier(): # 如果key中是非标识符字符,则认为这是一个正则pattern self.replace(key, value[0]) else: # 新增一个kv结构 self._content.append(ContentType.KV, [key, ' := ', value, None]) self.content_modifed = True def __delitem__(self, key): """删除指定键的所有实例""" found = False for i in range(len(self._content) - 1, -1, -1): item = self._content[i] if item[TYPE] == ContentType.KV and item[CONTENT][0].strip() == key.strip(): del self._content[i] found = True if not found: raise KeyError(f"Key '{key}' not found in {self.file_path}") else: self.delete(key) def sync(self): """同步文件内容""" if not self.content_modifed: return self.content_modifed = False _content = [] for item in self._content: if item[TYPE] == ContentType.RAW: line = item[CONTENT] elif item[TYPE] == ContentType.KV: key, delimiter, values, tail = item[CONTENT] line = f"{key}{delimiter}" line += '\\\n'.join(values) line += tail if tail else '' else: raise ValueError(f"Invalid item type: {item[TYPE]}") _content.append(line) _content = '\n'.join(_content) self._raws = _content.split('\n') def flush(self, file_path=None): """将内存中的修改写回文件""" if file_path is None: file_path = self.file_path self.sync() with open(file_path, 'w') as f: f.write('\n'.join(self._raws)) def __str__(self): return json.dumps(self._content, indent=4, cls=CustomJSONEncoder) def __repr__(self): return self.__str__()