#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 简化版自动翻译服务 专门用于翻译 ak_contents 表的内容 使用 Supabase + RageFlow 接口 """ import asyncio import aiohttp import json from typing import List, Dict, Optional from dataclasses import dataclass import logging from datetime import datetime # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class TranslationConfig: """翻译配置""" supabase_url: str supabase_key: str rageflow_api_key: str rageflow_base_url: str = "https://api.rageflow.ai/v1" model: str = "gpt-4" temperature: float = 0.3 max_tokens: int = 2048 @dataclass class ContentItem: """内容项""" id: str title: str content: str category_id: str current_language: str = "zh" @dataclass class TranslationResult: """翻译结果""" original_id: str title: str content: str language: str quality_score: float = 0.0 success: bool = True error_message: str = "" class SimpleTranslationService: """简化翻译服务""" def __init__(self, config: TranslationConfig): self.config = config self.session = None async def __aenter__(self): self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def get_contents_to_translate(self, limit: int = 10) -> List[ContentItem]: """获取需要翻译的内容""" headers = { "apikey": self.config.supabase_key, "Authorization": f"Bearer {self.config.supabase_key}", "Content-Type": "application/json" } # 查询中文内容,且还没有英文翻译的记录 query = f""" select=id,title,content,category_id &limit={limit} &order=created_at.desc """ url = f"{self.config.supabase_url}/rest/v1/ak_contents?{query}" try: async with self.session.get(url, headers=headers) as response: if response.status == 200: data = await response.json() contents = [] for item in data: contents.append(ContentItem( id=item["id"], title=item.get("title", ""), content=item.get("content", ""), category_id=item.get("category_id", "") )) logger.info(f"获取到 {len(contents)} 条待翻译内容") return contents else: logger.error(f"获取内容失败: {response.status}") return [] except Exception as e: logger.error(f"获取内容异常: {str(e)}") return [] async def translate_text(self, text: str, target_language: str = "en") -> Dict: """翻译文本""" if not text.strip(): return {"translated_text": "", "success": True} # 构建翻译提示词 language_map = { "en": "English", "ja": "Japanese", "fr": "French", "de": "German", "es": "Spanish" } target_lang_name = language_map.get(target_language, target_language) prompt = f"""Please translate the following Chinese text to {target_lang_name}. Requirements: 1. Maintain the original meaning and tone 2. Use natural and fluent expressions 3. Preserve any technical terms appropriately 4. Keep HTML tags and markdown formatting intact Text to translate: {text} Translation:""" # 调用 RageFlow API payload = { "model": self.config.model, "messages": [ {"role": "system", "content": "You are a professional translator."}, {"role": "user", "content": prompt} ], "temperature": self.config.temperature, "max_tokens": self.config.max_tokens, "stream": False } headers = { "Authorization": f"Bearer {self.config.rageflow_api_key}", "Content-Type": "application/json" } try: async with self.session.post( f"{self.config.rageflow_base_url}/chat/completions", json=payload, headers=headers ) as response: if response.status == 200: data = await response.json() translated_text = data["choices"][0]["message"]["content"].strip() return { "translated_text": translated_text, "success": True, "model": self.config.model } else: error_data = await response.text() logger.error(f"RageFlow API 错误: {response.status} - {error_data}") return { "translated_text": "", "success": False, "error": f"API错误: {response.status}" } except Exception as e: logger.error(f"翻译请求异常: {str(e)}") return { "translated_text": "", "success": False, "error": str(e) } async def translate_content_item(self, content: ContentItem, target_language: str = "en") -> TranslationResult: """翻译单个内容项""" logger.info(f"开始翻译内容: {content.id}") # 翻译标题 title_result = await self.translate_text(content.title, target_language) if not title_result["success"]: return TranslationResult( original_id=content.id, title="", content="", language=target_language, success=False, error_message=f"标题翻译失败: {title_result.get('error', '')}" ) # 翻译内容 content_result = await self.translate_text(content.content, target_language) if not content_result["success"]: return TranslationResult( original_id=content.id, title=title_result["translated_text"], content="", language=target_language, success=False, error_message=f"内容翻译失败: {content_result.get('error', '')}" ) # 计算简单的质量分数 quality_score = self._calculate_quality_score( content.title + content.content, title_result["translated_text"] + content_result["translated_text"] ) return TranslationResult( original_id=content.id, title=title_result["translated_text"], content=content_result["translated_text"], language=target_language, quality_score=quality_score, success=True ) def _calculate_quality_score(self, original: str, translated: str) -> float: """计算简单的质量分数""" if not original or not translated: return 0.0 # 基于长度比例的简单评分 original_len = len(original.split()) translated_len = len(translated.split()) if original_len == 0: return 0.0 ratio = translated_len / original_len # 理想比例范围 (0.5 - 2.0) if 0.5 <= ratio <= 2.0: return 0.9 elif ratio < 0.5: return max(0.3, ratio * 1.8) else: return max(0.3, 2.0 / ratio) async def save_translation(self, original_content: ContentItem, translation: TranslationResult) -> bool: """保存翻译结果到 ak_content_translations 表""" if not translation.success: return False headers = { "apikey": self.config.supabase_key, "Authorization": f"Bearer {self.config.supabase_key}", "Content-Type": "application/json" } # 检查是否已存在该语言的翻译 check_url = f"{self.config.supabase_url}/rest/v1/ak_content_translations" check_params = f"select=id&content_id=eq.{original_content.id}&language=eq.{translation.language}" try: async with self.session.get(f"{check_url}?{check_params}", headers=headers) as response: if response.status == 200: existing = await response.json() if existing: logger.info(f"内容 {original_content.id} 的 {translation.language} 翻译已存在,跳过") return True except Exception as e: logger.warning(f"检查翻译记录时出错: {str(e)}") # 插入新的翻译记录 translation_data = { "content_id": original_content.id, "language": translation.language, "title": translation.title, "content": translation.content, "quality_score": translation.quality_score, "translated_at": datetime.now().isoformat(), "translation_source": "rageflow_auto" } try: async with self.session.post( f"{self.config.supabase_url}/rest/v1/ak_content_translations", json=translation_data, headers=headers ) as response: if response.status in [200, 201]: logger.info(f"翻译保存成功: {original_content.id} -> {translation.language}") return True else: error_text = await response.text() logger.error(f"保存翻译失败: {response.status} - {error_text}") return False except Exception as e: logger.error(f"保存翻译异常: {str(e)}") return False async def batch_translate(self, target_languages: List[str] = ["en"], limit: int = 10) -> Dict: """批量翻译""" logger.info(f"开始批量翻译,目标语言: {target_languages},限制: {limit}") # 获取待翻译内容 contents = await self.get_contents_to_translate(limit) if not contents: return {"success": False, "message": "没有找到待翻译的内容"} results = { "total_contents": len(contents), "target_languages": target_languages, "results": {}, "summary": { "successful": 0, "failed": 0, "skipped": 0 } } for content in contents: content_results = {} for target_lang in target_languages: try: # 翻译内容 translation = await self.translate_content_item(content, target_lang) if translation.success: # 保存翻译 saved = await self.save_translation(content, translation) content_results[target_lang] = { "success": saved, "quality_score": translation.quality_score, "title_preview": translation.title[:50] + "..." if len(translation.title) > 50 else translation.title } if saved: results["summary"]["successful"] += 1 else: results["summary"]["failed"] += 1 else: content_results[target_lang] = { "success": False, "error": translation.error_message } results["summary"]["failed"] += 1 # 添加延迟避免API限制 await asyncio.sleep(1) except Exception as e: logger.error(f"翻译内容 {content.id} 到 {target_lang} 时出错: {str(e)}") content_results[target_lang] = { "success": False, "error": str(e) } results["summary"]["failed"] += 1 results["results"][content.id] = content_results logger.info(f"批量翻译完成: 成功 {results['summary']['successful']}, 失败 {results['summary']['failed']}") return results # 使用示例函数 async def main(): """主函数示例""" # 配置参数 config = TranslationConfig( supabase_url="YOUR_SUPABASE_URL", # 替换为您的 Supabase URL supabase_key="YOUR_SUPABASE_KEY", # 替换为您的 Supabase API Key rageflow_api_key="YOUR_RAGEFLOW_KEY", # 替换为您的 RageFlow API Key model="gpt-4", temperature=0.3 ) # 执行翻译 async with SimpleTranslationService(config) as service: # 翻译为英文和日文 results = await service.batch_translate( target_languages=["en", "ja"], limit=5 # 限制翻译5条内容进行测试 ) # 打印结果 print(json.dumps(results, indent=2, ensure_ascii=False)) # 单独翻译示例 # contents = await service.get_contents_to_translate(1) # if contents: # translation = await service.translate_content_item(contents[0], "en") # print(f"翻译结果: {translation}") if __name__ == "__main__": asyncio.run(main())