Files
akmon/trans_LLM/simple_translation_service.py
2026-01-20 08:04:15 +08:00

404 lines
14 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
简化版自动翻译服务
专门用于翻译 ak_contents 表的内容
使用 Supabase + RageFlow 接口
"""
import asyncio
import aiohttp
import json
from typing import List, Dict, Optional
from dataclasses import dataclass
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class TranslationConfig:
"""翻译配置"""
supabase_url: str
supabase_key: str
rageflow_api_key: str
rageflow_base_url: str = "https://api.rageflow.ai/v1"
model: str = "gpt-4"
temperature: float = 0.3
max_tokens: int = 2048
@dataclass
class ContentItem:
"""内容项"""
id: str
title: str
content: str
category_id: str
current_language: str = "zh"
@dataclass
class TranslationResult:
"""翻译结果"""
original_id: str
title: str
content: str
language: str
quality_score: float = 0.0
success: bool = True
error_message: str = ""
class SimpleTranslationService:
"""简化翻译服务"""
def __init__(self, config: TranslationConfig):
self.config = config
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def get_contents_to_translate(self, limit: int = 10) -> List[ContentItem]:
"""获取需要翻译的内容"""
headers = {
"apikey": self.config.supabase_key,
"Authorization": f"Bearer {self.config.supabase_key}",
"Content-Type": "application/json"
}
# 查询中文内容,且还没有英文翻译的记录
query = f"""
select=id,title,content,category_id
&limit={limit}
&order=created_at.desc
"""
url = f"{self.config.supabase_url}/rest/v1/ak_contents?{query}"
try:
async with self.session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
contents = []
for item in data:
contents.append(ContentItem(
id=item["id"],
title=item.get("title", ""),
content=item.get("content", ""),
category_id=item.get("category_id", "")
))
logger.info(f"获取到 {len(contents)} 条待翻译内容")
return contents
else:
logger.error(f"获取内容失败: {response.status}")
return []
except Exception as e:
logger.error(f"获取内容异常: {str(e)}")
return []
async def translate_text(self, text: str, target_language: str = "en") -> Dict:
"""翻译文本"""
if not text.strip():
return {"translated_text": "", "success": True}
# 构建翻译提示词
language_map = {
"en": "English",
"ja": "Japanese",
"fr": "French",
"de": "German",
"es": "Spanish"
}
target_lang_name = language_map.get(target_language, target_language)
prompt = f"""Please translate the following Chinese text to {target_lang_name}.
Requirements:
1. Maintain the original meaning and tone
2. Use natural and fluent expressions
3. Preserve any technical terms appropriately
4. Keep HTML tags and markdown formatting intact
Text to translate:
{text}
Translation:"""
# 调用 RageFlow API
payload = {
"model": self.config.model,
"messages": [
{"role": "system", "content": "You are a professional translator."},
{"role": "user", "content": prompt}
],
"temperature": self.config.temperature,
"max_tokens": self.config.max_tokens,
"stream": False
}
headers = {
"Authorization": f"Bearer {self.config.rageflow_api_key}",
"Content-Type": "application/json"
}
try:
async with self.session.post(
f"{self.config.rageflow_base_url}/chat/completions",
json=payload,
headers=headers
) as response:
if response.status == 200:
data = await response.json()
translated_text = data["choices"][0]["message"]["content"].strip()
return {
"translated_text": translated_text,
"success": True,
"model": self.config.model
}
else:
error_data = await response.text()
logger.error(f"RageFlow API 错误: {response.status} - {error_data}")
return {
"translated_text": "",
"success": False,
"error": f"API错误: {response.status}"
}
except Exception as e:
logger.error(f"翻译请求异常: {str(e)}")
return {
"translated_text": "",
"success": False,
"error": str(e)
}
async def translate_content_item(self, content: ContentItem, target_language: str = "en") -> TranslationResult:
"""翻译单个内容项"""
logger.info(f"开始翻译内容: {content.id}")
# 翻译标题
title_result = await self.translate_text(content.title, target_language)
if not title_result["success"]:
return TranslationResult(
original_id=content.id,
title="",
content="",
language=target_language,
success=False,
error_message=f"标题翻译失败: {title_result.get('error', '')}"
)
# 翻译内容
content_result = await self.translate_text(content.content, target_language)
if not content_result["success"]:
return TranslationResult(
original_id=content.id,
title=title_result["translated_text"],
content="",
language=target_language,
success=False,
error_message=f"内容翻译失败: {content_result.get('error', '')}"
)
# 计算简单的质量分数
quality_score = self._calculate_quality_score(
content.title + content.content,
title_result["translated_text"] + content_result["translated_text"]
)
return TranslationResult(
original_id=content.id,
title=title_result["translated_text"],
content=content_result["translated_text"],
language=target_language,
quality_score=quality_score,
success=True
)
def _calculate_quality_score(self, original: str, translated: str) -> float:
"""计算简单的质量分数"""
if not original or not translated:
return 0.0
# 基于长度比例的简单评分
original_len = len(original.split())
translated_len = len(translated.split())
if original_len == 0:
return 0.0
ratio = translated_len / original_len
# 理想比例范围 (0.5 - 2.0)
if 0.5 <= ratio <= 2.0:
return 0.9
elif ratio < 0.5:
return max(0.3, ratio * 1.8)
else:
return max(0.3, 2.0 / ratio)
async def save_translation(self, original_content: ContentItem, translation: TranslationResult) -> bool:
"""保存翻译结果到 ak_content_translations 表"""
if not translation.success:
return False
headers = {
"apikey": self.config.supabase_key,
"Authorization": f"Bearer {self.config.supabase_key}",
"Content-Type": "application/json"
}
# 检查是否已存在该语言的翻译
check_url = f"{self.config.supabase_url}/rest/v1/ak_content_translations"
check_params = f"select=id&content_id=eq.{original_content.id}&language=eq.{translation.language}"
try:
async with self.session.get(f"{check_url}?{check_params}", headers=headers) as response:
if response.status == 200:
existing = await response.json()
if existing:
logger.info(f"内容 {original_content.id}{translation.language} 翻译已存在,跳过")
return True
except Exception as e:
logger.warning(f"检查翻译记录时出错: {str(e)}")
# 插入新的翻译记录
translation_data = {
"content_id": original_content.id,
"language": translation.language,
"title": translation.title,
"content": translation.content,
"quality_score": translation.quality_score,
"translated_at": datetime.now().isoformat(),
"translation_source": "rageflow_auto"
}
try:
async with self.session.post(
f"{self.config.supabase_url}/rest/v1/ak_content_translations",
json=translation_data,
headers=headers
) as response:
if response.status in [200, 201]:
logger.info(f"翻译保存成功: {original_content.id} -> {translation.language}")
return True
else:
error_text = await response.text()
logger.error(f"保存翻译失败: {response.status} - {error_text}")
return False
except Exception as e:
logger.error(f"保存翻译异常: {str(e)}")
return False
async def batch_translate(self, target_languages: List[str] = ["en"], limit: int = 10) -> Dict:
"""批量翻译"""
logger.info(f"开始批量翻译,目标语言: {target_languages},限制: {limit}")
# 获取待翻译内容
contents = await self.get_contents_to_translate(limit)
if not contents:
return {"success": False, "message": "没有找到待翻译的内容"}
results = {
"total_contents": len(contents),
"target_languages": target_languages,
"results": {},
"summary": {
"successful": 0,
"failed": 0,
"skipped": 0
}
}
for content in contents:
content_results = {}
for target_lang in target_languages:
try:
# 翻译内容
translation = await self.translate_content_item(content, target_lang)
if translation.success:
# 保存翻译
saved = await self.save_translation(content, translation)
content_results[target_lang] = {
"success": saved,
"quality_score": translation.quality_score,
"title_preview": translation.title[:50] + "..." if len(translation.title) > 50 else translation.title
}
if saved:
results["summary"]["successful"] += 1
else:
results["summary"]["failed"] += 1
else:
content_results[target_lang] = {
"success": False,
"error": translation.error_message
}
results["summary"]["failed"] += 1
# 添加延迟避免API限制
await asyncio.sleep(1)
except Exception as e:
logger.error(f"翻译内容 {content.id}{target_lang} 时出错: {str(e)}")
content_results[target_lang] = {
"success": False,
"error": str(e)
}
results["summary"]["failed"] += 1
results["results"][content.id] = content_results
logger.info(f"批量翻译完成: 成功 {results['summary']['successful']}, 失败 {results['summary']['failed']}")
return results
# 使用示例函数
async def main():
"""主函数示例"""
# 配置参数
config = TranslationConfig(
supabase_url="YOUR_SUPABASE_URL", # 替换为您的 Supabase URL
supabase_key="YOUR_SUPABASE_KEY", # 替换为您的 Supabase API Key
rageflow_api_key="YOUR_RAGEFLOW_KEY", # 替换为您的 RageFlow API Key
model="gpt-4",
temperature=0.3
)
# 执行翻译
async with SimpleTranslationService(config) as service:
# 翻译为英文和日文
results = await service.batch_translate(
target_languages=["en", "ja"],
limit=5 # 限制翻译5条内容进行测试
)
# 打印结果
print(json.dumps(results, indent=2, ensure_ascii=False))
# 单独翻译示例
# contents = await service.get_contents_to_translate(1)
# if contents:
# translation = await service.translate_content_item(contents[0], "en")
# print(f"翻译结果: {translation}")
if __name__ == "__main__":
asyncio.run(main())