excel_translate/excel_translate.py
Begild ebd25b9cdb 解决退出报错event loop closed的错误
尝试了几个模型发现还是gpt-4o-mini更好,更准确
2024-11-04 22:36:01 +08:00

220 lines
8.4 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from dataclasses import dataclass
import shutil
from typing import List, Tuple
import os
import sys
import platform
try:
import openpyxl
except ImportError:
print("openpyxl is not installed, installing...")
os.system("pip install openpyxl")
import openpyxl
try:
from openai import AsyncOpenAI
except ImportError:
print("openai is not installed, installing...")
os.system("pip install openai")
from openai import AsyncOpenAI
import asyncio
# Set up OpenAI API key
API_KEY = "sk-ckFgxmnjJAJoVfcVF918CbFbEc5a459eA72cA51e4dB24dAf" #来自V3API
API_URL = "https://api.gpt.ge/v1"
# completion = client.chat.completions.create(
# model="gpt-4o-mini",
# messages=[
# {"role": "system", "content": "You are a helpful assistant."},
# {
# "role": "user",
# "content": "Write a haiku about recursion in programming."
# }
# ]
# )
# print(completion.choices[0].message)
# exit(0)
class Model:
default = "gpt-4o-mini"
doubao_lite_32k = "doubao-lite-32k"
gpt_4o_mini = "gpt-4o-mini"
text_davinci_002 = "text-davinci-002"
gpt_4o = "gpt-4o"
doubao_lite_128k = 'doubao-lite-128k'
# PROMT = """
# 将我所提供的如下内容翻译为英文,
# 内容主要是自行车,电助力自行车,仪表,嵌入式等相关的内容请翻译注意该领域内容的准确性。
# 原始内容是我从excel中按照单元格提取出来的每个单元格的内容前后会用{}包裹,
# 输出结果只能是一个json的list的代码块json的每个元素是一个原来{}包裹的内容翻译后的内容,请转义\n以保留换行的格式。
# 翻译时请结合所有内容整体去进行理解含义而不仅仅是单个单元格的内容。
# 注意仅输出翻译后的内容即可,不要保留原文任何内容!!!
# 请翻译:\n"""
PROMT = "将如下内容翻译为英文,仅输出翻译后的内容,不输出任何原文:"
async def chinese2english(text, model=Model.default):
# print("start translate")
#Translate the text using OpenAI
client = AsyncOpenAI(api_key=API_KEY, base_url=API_URL)
response = await client.chat.completions.create(
model = model,
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": PROMT + text},
],
}
]
)
# print("translate done")
translated_text = response.choices[0].message.content
return translated_text
# def chinese2english_stream(text, model=Model.default):
# client = OpenAI(api_key=API_KEY, base_url=API_URL)
# with client.chat.completions.with_streaming_response.create(
# model = model,
# messages = [
# {
# "role": "user",
# "content": [
# {"type": "text", "text": PROMT + text},
# ],
# }
# ]
# ) as response:
# result = response.json()
# print(result)
# return ''
def idx2excel_pos(row_idx, col_idx):
if col_idx < 1:
return ''
ret = ''
while col_idx:
ret += chr(ord('A') + (col_idx - 1) % 26)
col_idx = (col_idx - 1) // 26
return f"{ret[::-1]}{row_idx}"
def update_translate(output_sheet, map_list, translated_text):
pos = [f"{idx2excel_pos(row_idx, col_idx)}" for row_idx, col_idx in map_list]
pos = ','.join(pos)
print(f"========================= [{pos}] Update Translated Text :\n{translated_text}")
for row_idx, col_idx in map_list:
output_sheet.cell(row=row_idx, column=col_idx).value = translated_text
# def get_json_content(content):
# content = content.strip().split('\n')
# #去除掉api返回的code代码提示信息
# if content[0].strip().startswith('```') and content[-1].strip().endswith('```'):
# content = content[1:-1]
# return '\n'.join(content)
def is_all_ascii(s):
return all(ord(char) < 128 for char in s)
@dataclass
class TaskInfo:
original_text: str
translated_text: str
pos:List[Tuple[int, int]]
class TaskManager:
def __init__(self, output_sheet):
self.task_list = []
self.output_sheet = output_sheet
async def add_task(self, pos_list, original_text):
task = TaskInfo(original_text, '', pos_list)
self.task_list.append(task)
if len(self.task_list) >= 20:
await self.do_task()
async def do_task(self):
tasks = []
for task in self.task_list:
tasks.append(asyncio.create_task(chinese2english(task.original_text)))
results = await asyncio.gather(*tasks)
for idx, task in enumerate(self.task_list):
task.translated_text = results[idx]
update_translate(self.output_sheet, task.pos, task.translated_text)
self.task_list = []
async def finish(self):
if len(self.task_list) > 0:
await self.do_task()
async def main() -> None:
if len(sys.argv) < 2:
print("Usage: python excel_translate.py input_file_path")
sys.exit(0)
input_file = sys.argv[1]
output = os.path.splitext(input_file)[0] + '_translated' + os.path.splitext(input_file)[1]
await translate_excel_process(input_file, output)
print(f"{input_file} Translation complete => {output}.")
async def translate_excel_process(input_file_path="input.xlsx", output_file_path="output.xlsx"):
# Check if the input file exists
if not os.path.exists(input_file_path):
print("Input file not found.")
sys.exit()
shutil.copy(input_file_path, output_file_path)
# Open the input file
input_workbook = openpyxl.load_workbook(input_file_path)
output_workbook = openpyxl.load_workbook(output_file_path)
sheets = input_workbook.sheetnames
map_list = [] #合并的单元格的翻译位置记录
original_text = ""
last_cell_value = ""
try:
for sheet_name in sheets:
print(f"Processing sheet: {sheet_name}")
input_sheet = input_workbook[sheet_name]
# Create a new output sheet
output_sheet = output_workbook[sheet_name]
task_manager = TaskManager(output_sheet)
# Loop through each row in the input sheet
for col_idx, col in enumerate(input_sheet.iter_cols(min_col=1, values_only=True), start=1):
# Get the original text and language code
for row_idx, content in enumerate(col, start=1):
if not content or str(content).strip() == '': #没内容不用翻译
# print(f"Skip empty cell: {idx2excel_pos(row_idx, col_idx)}")
continue
elif isinstance(content, int):#如果是纯数字也不用翻译
continue
#如果全是英文,则不翻译
elif is_all_ascii(content):
continue
#和最近一次的cell内容相同则不翻译直接使用结果即可
if content != last_cell_value:
if last_cell_value == '': #第一个单元格先记录一下,等待后面有不一样的才翻译
last_cell_value = content
original_text = content
map_list.append((row_idx, col_idx))
continue
last_cell_value = content
# print(f"Original text: {original_text}")
await task_manager.add_task(map_list, original_text)
original_text = content
map_list = []
map_list.append((row_idx, col_idx))
await task_manager.finish()
except Exception as e:
print(f"Error: {e}")
pass
except KeyboardInterrupt:
print("KeyboardInterrupt")
output_workbook.save(output_file_path)
sys.exit()
# Save the output workbook
output_workbook.save(output_file_path)
pass
if __name__ == "__main__":
if platform.system()=='Windows':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) #解决windows下退出报错 event loop is closed
asyncio.run(main())
exit(0)