使用asyncio来构建异步翻译任务提高翻译效率

目前存在退出的时候报错async runtime error还未解决
自动检测并安装openpyxl和openai库
This commit is contained in:
Ekko.bao 2024-11-04 08:42:00 +08:00
parent 1f2de5d8ac
commit d3ab696d10
2 changed files with 145 additions and 90 deletions

View File

@ -1,14 +1,25 @@
from dataclasses import dataclass
import shutil import shutil
import openpyxl from typing import List, Tuple
import os import os
import sys import sys
from openai import OpenAI try:
import json import openpyxl
except ImportError:
print("openpyxl is not installed, installing...")
os.system("pip install openpyxl")
import openpyxl
try:
from openai import AsyncOpenAI
except ImportError:
print("openai is not installed, installing...")
os.system("pip install openai")
from openai import AsyncOpenAI
import asyncio
# Set up OpenAI API key # Set up OpenAI API key
API_KEY = "sk-ckFgxmnjJAJoVfcVF918CbFbEc5a459eA72cA51e4dB24dAf" API_KEY = "sk-ckFgxmnjJAJoVfcVF918CbFbEc5a459eA72cA51e4dB24dAf" #来自V3API
API_URL = "https://api.gpt.ge/v1" API_URL = "https://api.gpt.ge/v1"
client = OpenAI(api_key=API_KEY, base_url=API_URL)
# completion = client.chat.completions.create( # completion = client.chat.completions.create(
# model="gpt-4o-mini", # model="gpt-4o-mini",
# messages=[ # messages=[
@ -37,15 +48,13 @@ class Model:
# 翻译时请结合所有内容整体去进行理解含义而不仅仅是单个单元格的内容。 # 翻译时请结合所有内容整体去进行理解含义而不仅仅是单个单元格的内容。
# 注意仅输出翻译后的内容即可,不要保留原文任何内容!!! # 注意仅输出翻译后的内容即可,不要保留原文任何内容!!!
# 请翻译:\n""" # 请翻译:\n"""
PROMT = """ PROMT = "将如下内容翻译为英文,仅输出翻译后的内容,不输出任何原文:"
# 将我所提供的如下内容翻译为英文, async def chinese2english(text, model=Model.gpt_4o_mini):
# 注意仅输出翻译后的内容即可,不要保留原文任何内容!!!
# 请翻译:\n"""
def chinese2english(text, model=Model.gpt_4o_mini):
# print("start translate") # print("start translate")
#Translate the text using OpenAI #Translate the text using OpenAI
response = client.chat.completions.create( client = AsyncOpenAI(api_key=API_KEY, base_url=API_URL)
model = Model.gpt_4o_mini, response = await client.chat.completions.create(
model = model,
messages = [ messages = [
{ {
"role": "user", "role": "user",
@ -57,49 +66,94 @@ def chinese2english(text, model=Model.gpt_4o_mini):
) )
# print("translate done") # print("translate done")
translated_text = response.choices[0].message.content translated_text = response.choices[0].message.content
client.close()
return translated_text return translated_text
def chinese2english_stream(text, model=Model.gpt_4o_mini): # def chinese2english_stream(text, model=Model.gpt_4o_mini):
with client.chat.completions.with_streaming_response.create( # client = OpenAI(api_key=API_KEY, base_url=API_URL)
model = Model.gpt_4o_mini, # with client.chat.completions.with_streaming_response.create(
messages = [ # model = Model.gpt_4o_mini,
{ # messages = [
"role": "user", # {
"content": [ # "role": "user",
{"type": "text", "text": PROMT + text}, # "content": [
], # {"type": "text", "text": PROMT + text},
} # ],
] # }
) as response: # ]
result = response.json() # ) as response:
print(result) # result = response.json()
return '' # print(result)
# return ''
def idx2excel_pos(row_idx, col_idx):
if col_idx < 1:
return ''
ret = ''
while col_idx:
ret += chr(ord('A') + (col_idx - 1) % 26)
col_idx = (col_idx - 1) // 26
return f"{ret[::-1]}{row_idx}"
def update_translate(output_sheet, map_list, translated_text): def update_translate(output_sheet, map_list, translated_text):
# json_obj = json.loads(translated_text) pos = [f"{idx2excel_pos(row_idx, col_idx)}" for row_idx, col_idx in map_list]
# print(json_obj) pos = ','.join(pos)
json_obj = [translated_text] print(f"========================= [{pos}] Update Translated Text :\n{translated_text}")
for idx, (row_idx, col_idx) in enumerate(map_list): for row_idx, col_idx in map_list:
output_sheet.cell(row=row_idx, column=col_idx).value = json_obj[idx] output_sheet.cell(row=row_idx, column=col_idx).value = translated_text
def get_json_content(content): # def get_json_content(content):
content = content.strip().split('\n') # content = content.strip().split('\n')
#去除掉api返回的code代码提示信息 # #去除掉api返回的code代码提示信息
if content[0].strip().startswith('```') and content[-1].strip().endswith('```'): # if content[0].strip().startswith('```') and content[-1].strip().endswith('```'):
content = content[1:-1] # content = content[1:-1]
return '\n'.join(content) # return '\n'.join(content)
def is_all_ascii(s): def is_all_ascii(s):
return all(ord(char) < 128 for char in s) return all(ord(char) < 128 for char in s)
def main():
if len(sys.argv) >= 3:
print("Usage: python excel_translate.py input_file_path output_file_path")
input_file_path = sys.argv[1]
output_file_path = sys.argv[2]
translate_excel_process()
def translate_excel_process(input_file_path="test.xlsx", output_file_path="output.xlsx"): @dataclass
class TaskInfo:
original_text: str
translated_text: str
pos:List[Tuple[int, int]]
class TaskManager:
def __init__(self, output_sheet):
self.task_list = []
self.output_sheet = output_sheet
async def add_task(self, pos_list, original_text):
task = TaskInfo(original_text, '', pos_list)
self.task_list.append(task)
if len(self.task_list) >= 20:
await self.do_task()
async def do_task(self):
tasks = []
for task in self.task_list:
tasks.append(asyncio.create_task(chinese2english(task.original_text)))
results = await asyncio.gather(*tasks)
for idx, task in enumerate(self.task_list):
task.translated_text = results[idx]
update_translate(self.output_sheet, task.pos, task.translated_text)
self.task_list = []
async def finish(self):
if len(self.task_list) > 0:
await self.do_task()
async def main() -> None:
if len(sys.argv) < 2:
print("Usage: python excel_translate.py input_file_path")
sys.exit(0)
input_file = sys.argv[1]
output = os.path.splitext(input_file)[0] + '_translated' + os.path.splitext(input_file)[1]
await translate_excel_process(input_file, output)
print(f"{input_file} Translation complete => {output}.")
async def translate_excel_process(input_file_path="input.xlsx", output_file_path="output.xlsx"):
# Check if the input file exists # Check if the input file exists
if not os.path.exists(input_file_path): if not os.path.exists(input_file_path):
print("Input file not found.") print("Input file not found.")
@ -108,54 +162,55 @@ def translate_excel_process(input_file_path="test.xlsx", output_file_path="outpu
# Open the input file # Open the input file
input_workbook = openpyxl.load_workbook(input_file_path) input_workbook = openpyxl.load_workbook(input_file_path)
input_sheet = input_workbook.active
# Create a new output workbook
output_workbook = openpyxl.load_workbook(output_file_path) output_workbook = openpyxl.load_workbook(output_file_path)
output_sheet = output_workbook.active sheets = input_workbook.sheetnames
map_list = [] #合并的单元格的翻译位置记录
# Loop through each row in the input sheet
original_text = "" original_text = ""
col_idx= 1
map_list = []
last_cell_value = "" last_cell_value = ""
last_cell_value_trans = "" try:
# debug_test_count = 0 for sheet_name in sheets:
for col_idx, col in enumerate(input_sheet.iter_cols(min_col=1, values_only=True), start=1): print(f"Processing sheet: {sheet_name}")
# Get the original text and language code input_sheet = input_workbook[sheet_name]
for row_idx, content in enumerate(col, start=1): # Create a new output sheet
if not content: output_sheet = output_workbook[sheet_name]
continue task_manager = TaskManager(output_sheet)
elif isinstance(content, int):#如果是纯数字也不用翻译 # Loop through each row in the input sheet
continue for col_idx, col in enumerate(input_sheet.iter_cols(min_col=1, values_only=True), start=1):
#如果全是英文,则不翻译 # Get the original text and language code
elif is_all_ascii(content): for row_idx, content in enumerate(col, start=1):
continue if not content or str(content).strip() == '': #没内容不用翻译
try: # print(f"Skip empty cell: {idx2excel_pos(row_idx, col_idx)}")
int(content) continue
except ValueError: elif isinstance(content, int):#如果是纯数字也不用翻译
pass continue
else: #如果全是英文,则不翻译
continue elif is_all_ascii(content):
#和最近一次的cell内容相同则不翻译直接使用结果即可 continue
if content != last_cell_value:
original_text += f'{content}\n'
map_list.append((row_idx, col_idx))
#if len(original_text) > 100:
# print(f"Original text: {original_text}")
translated_text = chinese2english(original_text)
# translated_text = get_json_content(translated_text)
# print(f"Translated text: {translated_text}")
update_translate(output_sheet, map_list, translated_text)
map_list = []
original_text = ""
# Add the original text and translated text to the output sheet
# output_sheet.append([original_text, translated_text])
#和最近一次的cell内容相同则不翻译直接使用结果即可
if content != last_cell_value:
if last_cell_value == '': #第一个单元格先记录一下,等待后面有不一样的才翻译
last_cell_value = content
original_text = content
map_list.append((row_idx, col_idx))
continue
last_cell_value = content
# print(f"Original text: {original_text}")
await task_manager.add_task(map_list, original_text)
original_text = content
map_list = []
map_list.append((row_idx, col_idx))
await task_manager.finish()
except Exception as e:
print(f"Error: {e}")
pass
except KeyboardInterrupt:
print("KeyboardInterrupt")
output_workbook.save(output_file_path)
sys.exit()
# Save the output workbook # Save the output workbook
output_workbook.save(output_file_path) output_workbook.save(output_file_path)
print("Translation complete.") pass
if __name__ == '__main__': if __name__ == "__main__":
main() asyncio.run(main())
exit(0)

Binary file not shown.