lws_client/sscp_tool/sscp.py

203 lines
6.3 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
sscp - 简化从L3到L0的文件传输工具
用法: sscp <源文件/目录> [<源文件/目录>...] <目标路径>
"""
import os
import sys
import json
import argparse
import subprocess
import re
import shutil
from pathlib import Path
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='从L3向L0传输文件的简化工具')
parser.add_argument('sources', nargs='+', help='源文件或目录路径')
parser.add_argument('target', help='目标路径')
parser.add_argument('-v', '--verbose', action='store_true', help='显示详细信息')
parser.add_argument('-r', '--recursive', action='store_true', help='递归处理目录')
return parser.parse_args()
def get_mount_points():
"""从配置文件获取挂载点信息"""
config_path = Path.home() / ".config" / "sscp" / "config.json"
if not config_path.exists():
print(f"警告: 找不到配置文件 {config_path},将使用默认挂载点 ~/mnt")
return {str(Path.home() / "mnt"): "/l0"}
with open(config_path, 'r') as f:
config = json.load(f)
# 从配置中提取挂载点信息
mount_points = []
mount_points = config['mounts'].copy()
return mount_points
def encode_path(path):
"""
对L0路径进行编码
/l0x/path/to/file 编码为 l0x@path@to@file
同时处理路径中已有的@符号
"""
# 先将已有的@符号替换为特殊序列
path_escaped = path.replace('@', '@@')
# 将路径中的斜杠替换为@符号
encoded_path = path_escaped.replace('/', '@')
# 确保路径以l0开头
if not encoded_path.startswith('@l0'):
raise ValueError(f"无法识别的L0路径格式: {path}")
# 移除开头的@
return encoded_path[1:]
def create_symlink(source_path, encoded_path):
"""创建源文件或目录到临时位置的软链接"""
temp_dir = Path("/tmp/sscp")
temp_dir.mkdir(exist_ok=True)
temp_link = temp_dir / encoded_path
# 确保父目录存在
temp_link.parent.mkdir(parents=True, exist_ok=True)
# 如果已存在,先删除
if temp_link.exists():
if temp_link.is_dir() and not temp_link.is_symlink():
shutil.rmtree(temp_link)
else:
temp_link.unlink()
# 创建软链接
os.symlink(os.path.abspath(source_path), temp_link)
return temp_link
def call_ssbuild(encoded_path, verbose=False):
"""调用ssbuild工具发送文件或目录"""
try:
cmd = ["ssbuild", str(encoded_path)]
if verbose:
print(f"执行命令: {' '.join(cmd)}")
result = subprocess.run(
cmd,
check=True,
capture_output=True,
text=True
)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"调用ssbuild失败: {e}")
print(f"错误输出: {e.stderr}")
sys.exit(1)
except FileNotFoundError:
print("错误: 找不到ssbuild命令请确保它已安装并在PATH中")
sys.exit(1)
def is_mount_path(path, mount_points):
"""检查路径是否在挂载点中"""
path_str = str(path)
for mount_point in mount_points:
if path_str.startswith(mount_point):
return mount_point
return None
def process_source(source_path, target_path, mount_points, verbose=False):
"""处理单个源文件或目录"""
source_path = Path(source_path).resolve()
target_path = Path(target_path)
# 检查源是否存在
if not source_path.exists():
print(f"错误: 源 '{source_path}' 不存在")
return False
# 如果目标是目录,添加源文件名
if target_path.is_dir() or (len(mount_points) > 0 and not target_path.suffix):
final_target = target_path / source_path.name
else:
final_target = target_path
# 检查目标路径是否在挂载点中
mount_point = is_mount_path(final_target, mount_points)
if not mount_point:
print(f"错误: 目标路径 '{final_target}' 不在已知的挂载目录中")
print(f"有效的挂载点: {', '.join(mount_points.keys())}")
return False
try:
# 从挂载路径提取L0路径部分
target_str = str(final_target)
relative_path = target_str[len(mount_point):]
# 构建完整的L0路径
l0_path = mount_points[mount_point] + relative_path
# 对L0路径进行编码
encoded_path = encode_path(l0_path)
if verbose:
print(f"源路径: {source_path}")
print(f"目标路径: {final_target}")
print(f"挂载点: {mount_point}")
print(f"L0路径: {l0_path}")
print(f"编码后的路径: {encoded_path}")
# 创建源的软链接
temp_link = create_symlink(source_path, encoded_path)
if verbose:
print(f"创建软链接: {source_path} -> {temp_link}")
# 调用ssbuild工具发送文件或目录
print(f"正在发送: {source_path} -> {final_target}")
output = call_ssbuild(temp_link, verbose)
if verbose and output:
print(f"ssbuild输出: {output}")
print(f"已成功传输到 {final_target}")
return True
except Exception as e:
print(f"处理 {source_path} 时出错: {e}")
return False
def main():
args = parse_arguments()
# 获取挂载点信息
mount_points = get_mount_points()
if args.verbose:
print(f"已识别的挂载点: {mount_points}")
# 最后一个参数是目标路径
target = args.sources.pop()
sources = args.sources
if not sources:
print("错误: 至少需要一个源文件或目录")
sys.exit(1)
# 处理每个源文件或目录
success_count = 0
for source in sources:
if process_source(source, target, mount_points, args.verbose):
success_count += 1
# 报告结果
if success_count == len(sources):
print(f"所有 {len(sources)} 个文件/目录已成功传输")
else:
print(f"传输完成: {success_count}/{len(sources)} 个文件/目录成功")
sys.exit(1)
if __name__ == "__main__":
main()