1315 lines
39 KiB
Markdown
1315 lines
39 KiB
Markdown
# Supabase自动翻译服务系统设计文档
|
||
|
||
## 项目概述
|
||
|
||
### 1.1 项目背景
|
||
基于现有的多语言内容管理系统,构建一个智能自动翻译服务,利用Supabase作为数据库和后端服务,Python作为翻译引擎,RageFlow API作为大模型接口,实现内容的自动多语言翻译。
|
||
|
||
### 1.2 核心目标
|
||
- 自动翻译 `ak_contents`、`ak_content_translations`、`topic`、`categories` 等表的内容
|
||
- 支持多种语言间的智能翻译
|
||
- 提供翻译质量评估和人工审核机制
|
||
- 实现增量翻译和批量翻译功能
|
||
- 保证翻译的一致性和文化适应性
|
||
|
||
### 1.3 技术栈
|
||
- **数据库**: Supabase (PostgreSQL)
|
||
- **后端服务**: Python + FastAPI
|
||
- **AI翻译**: RageFlow API + 大模型 (GPT-4, Claude等)
|
||
- **队列系统**: Celery + Redis
|
||
- **监控**: Prometheus + Grafana
|
||
- **部署**: Docker + Kubernetes
|
||
|
||
---
|
||
|
||
## 2. 系统架构设计
|
||
|
||
### 2.1 整体架构图
|
||
```
|
||
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
|
||
│ Web前端 │ │ Translation │ │ RageFlow API │
|
||
│ (管理界面) │◄──►│ Service API │◄──►│ (大模型接口) │
|
||
└─────────────────┘ └──────────────────┘ └─────────────────┘
|
||
│
|
||
▼
|
||
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
|
||
│ Task Queue │◄──►│ Translation │◄──►│ Supabase │
|
||
│ (Celery+Redis)│ │ Engine │ │ (PostgreSQL) │
|
||
└─────────────────┘ └──────────────────┘ └─────────────────┘
|
||
│
|
||
▼
|
||
┌──────────────────┐
|
||
│ Quality │
|
||
│ Assessment │
|
||
└──────────────────┘
|
||
```
|
||
|
||
### 2.2 核心组件
|
||
|
||
#### 2.2.1 Translation Service API
|
||
- **功能**: RESTful API服务,处理翻译请求
|
||
- **技术**: FastAPI + Pydantic
|
||
- **职责**:
|
||
- 接收翻译请求
|
||
- 任务调度和状态管理
|
||
- 翻译结果返回
|
||
|
||
#### 2.2.2 Translation Engine
|
||
- **功能**: 核心翻译引擎
|
||
- **技术**: Python + RageFlow SDK
|
||
- **职责**:
|
||
- 调用大模型进行翻译
|
||
- 翻译质量评估
|
||
- 批量处理优化
|
||
|
||
#### 2.2.3 Task Queue System
|
||
- **功能**: 异步任务处理
|
||
- **技术**: Celery + Redis
|
||
- **职责**:
|
||
- 大批量翻译任务队列
|
||
- 任务优先级管理
|
||
- 失败重试机制
|
||
|
||
---
|
||
|
||
## 3. 数据库设计
|
||
|
||
### 3.1 翻译任务管理表
|
||
|
||
```sql
|
||
-- 翻译任务表
|
||
CREATE TABLE public.ak_translation_tasks (
|
||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||
task_name VARCHAR(255) NOT NULL,
|
||
task_type VARCHAR(50) NOT NULL, -- 'single', 'batch', 'incremental'
|
||
source_table VARCHAR(100) NOT NULL, -- 'ak_contents', 'ak_content_translations', etc.
|
||
source_ids JSONB, -- 要翻译的记录ID列表
|
||
source_language VARCHAR(10) NOT NULL,
|
||
target_languages VARCHAR(10)[] NOT NULL,
|
||
status VARCHAR(20) DEFAULT 'pending', -- 'pending', 'processing', 'completed', 'failed', 'paused'
|
||
progress_percentage INTEGER DEFAULT 0,
|
||
total_items INTEGER DEFAULT 0,
|
||
completed_items INTEGER DEFAULT 0,
|
||
failed_items INTEGER DEFAULT 0,
|
||
|
||
-- 翻译配置
|
||
translation_config JSONB DEFAULT '{}', -- 翻译参数配置
|
||
quality_threshold FLOAT DEFAULT 0.8,
|
||
require_human_review BOOLEAN DEFAULT false,
|
||
|
||
-- 元数据
|
||
created_by UUID REFERENCES public.ak_users(id),
|
||
created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
|
||
started_at TIMESTAMP WITH TIME ZONE,
|
||
completed_at TIMESTAMP WITH TIME ZONE,
|
||
updated_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
|
||
|
||
-- 错误处理
|
||
error_message TEXT,
|
||
retry_count INTEGER DEFAULT 0,
|
||
max_retries INTEGER DEFAULT 3
|
||
);
|
||
|
||
CREATE INDEX idx_translation_tasks_status ON public.ak_translation_tasks(status);
|
||
CREATE INDEX idx_translation_tasks_type ON public.ak_translation_tasks(task_type);
|
||
CREATE INDEX idx_translation_tasks_created_at ON public.ak_translation_tasks(created_at DESC);
|
||
```
|
||
|
||
### 3.2 翻译记录表
|
||
|
||
```sql
|
||
-- 翻译记录表
|
||
CREATE TABLE public.ak_translation_records (
|
||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||
task_id UUID REFERENCES public.ak_translation_tasks(id) ON DELETE CASCADE,
|
||
|
||
-- 源数据信息
|
||
source_table VARCHAR(100) NOT NULL,
|
||
source_id UUID NOT NULL,
|
||
source_field VARCHAR(100) NOT NULL, -- 翻译的字段名
|
||
source_text TEXT NOT NULL,
|
||
source_language VARCHAR(10) NOT NULL,
|
||
|
||
-- 目标数据信息
|
||
target_language VARCHAR(10) NOT NULL,
|
||
translated_text TEXT,
|
||
translation_status VARCHAR(20) DEFAULT 'pending', -- 'pending', 'translating', 'completed', 'failed', 'reviewing'
|
||
|
||
-- 翻译质量
|
||
quality_score FLOAT,
|
||
confidence_score FLOAT,
|
||
ai_provider VARCHAR(50), -- 'rageflow', 'openai', 'google', etc.
|
||
model_version VARCHAR(100),
|
||
|
||
-- 审核信息
|
||
reviewed_by UUID REFERENCES public.ak_users(id),
|
||
reviewed_at TIMESTAMP WITH TIME ZONE,
|
||
review_status VARCHAR(20), -- 'approved', 'rejected', 'needs_revision'
|
||
review_notes TEXT,
|
||
|
||
-- 元数据
|
||
created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
|
||
updated_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
|
||
translated_at TIMESTAMP WITH TIME ZONE,
|
||
|
||
-- 错误信息
|
||
error_message TEXT,
|
||
retry_count INTEGER DEFAULT 0,
|
||
|
||
UNIQUE(source_table, source_id, source_field, target_language)
|
||
);
|
||
|
||
CREATE INDEX idx_translation_records_task ON public.ak_translation_records(task_id);
|
||
CREATE INDEX idx_translation_records_status ON public.ak_translation_records(translation_status);
|
||
CREATE INDEX idx_translation_records_source ON public.ak_translation_records(source_table, source_id);
|
||
CREATE INDEX idx_translation_records_quality ON public.ak_translation_records(quality_score DESC);
|
||
```
|
||
|
||
### 3.3 翻译配置表
|
||
|
||
```sql
|
||
-- 翻译服务配置表
|
||
CREATE TABLE public.ak_translation_configs (
|
||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||
config_name VARCHAR(100) UNIQUE NOT NULL,
|
||
config_type VARCHAR(50) NOT NULL, -- 'model', 'language_pair', 'quality', 'rate_limit'
|
||
|
||
-- 配置内容
|
||
source_language VARCHAR(10),
|
||
target_language VARCHAR(10),
|
||
ai_provider VARCHAR(50) NOT NULL, -- 'rageflow', 'openai', 'google'
|
||
model_name VARCHAR(100),
|
||
|
||
-- 翻译参数
|
||
translation_params JSONB DEFAULT '{}', -- 温度、最大长度等参数
|
||
quality_threshold FLOAT DEFAULT 0.8,
|
||
rate_limit_per_minute INTEGER DEFAULT 60,
|
||
|
||
-- 文化适应性
|
||
cultural_adaptation_rules JSONB DEFAULT '{}',
|
||
terminology_glossary JSONB DEFAULT '{}', -- 术语词汇表
|
||
|
||
-- 状态
|
||
is_active BOOLEAN DEFAULT true,
|
||
priority INTEGER DEFAULT 1, -- 配置优先级
|
||
|
||
created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
|
||
updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
|
||
);
|
||
|
||
CREATE INDEX idx_translation_configs_provider ON public.ak_translation_configs(ai_provider);
|
||
CREATE INDEX idx_translation_configs_languages ON public.ak_translation_configs(source_language, target_language);
|
||
```
|
||
|
||
### 3.4 翻译质量评估表
|
||
|
||
```sql
|
||
-- 翻译质量评估表
|
||
CREATE TABLE public.ak_translation_quality_metrics (
|
||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||
record_id UUID REFERENCES public.ak_translation_records(id) ON DELETE CASCADE,
|
||
|
||
-- 自动评估指标
|
||
bleu_score FLOAT, -- BLEU分数
|
||
rouge_score FLOAT, -- ROUGE分数
|
||
semantic_similarity FLOAT, -- 语义相似度
|
||
fluency_score FLOAT, -- 流畅度评分
|
||
adequacy_score FLOAT, -- 充分性评分
|
||
|
||
-- 人工评估指标
|
||
human_quality_score FLOAT, -- 人工质量评分 (1-5)
|
||
human_fluency_score FLOAT, -- 人工流畅度评分
|
||
human_adequacy_score FLOAT, -- 人工充分性评分
|
||
|
||
-- 评估元数据
|
||
evaluator_type VARCHAR(20) NOT NULL, -- 'automatic', 'human', 'hybrid'
|
||
evaluator_id UUID REFERENCES public.ak_users(id),
|
||
evaluation_method VARCHAR(50), -- 评估方法
|
||
|
||
created_at TIMESTAMP WITH TIME ZONE DEFAULT now()
|
||
);
|
||
|
||
CREATE INDEX idx_quality_metrics_record ON public.ak_translation_quality_metrics(record_id);
|
||
CREATE INDEX idx_quality_metrics_scores ON public.ak_translation_quality_metrics(bleu_score, human_quality_score);
|
||
```
|
||
|
||
---
|
||
|
||
## 4. API设计
|
||
|
||
### 4.1 核心API端点
|
||
|
||
#### 4.1.1 翻译任务管理
|
||
```python
|
||
# 创建翻译任务
|
||
POST /api/v1/translation/tasks
|
||
{
|
||
"task_name": "批量翻译内容",
|
||
"task_type": "batch",
|
||
"source_table": "ak_contents",
|
||
"source_ids": ["uuid1", "uuid2"],
|
||
"source_language": "zh",
|
||
"target_languages": ["en", "ja", "fr"],
|
||
"translation_config": {
|
||
"model": "gpt-4",
|
||
"temperature": 0.3,
|
||
"max_tokens": 2048
|
||
},
|
||
"quality_threshold": 0.8,
|
||
"require_human_review": false
|
||
}
|
||
|
||
# 获取任务状态
|
||
GET /api/v1/translation/tasks/{task_id}
|
||
|
||
# 获取任务列表
|
||
GET /api/v1/translation/tasks?status=processing&limit=20
|
||
|
||
# 暂停/恢复任务
|
||
PATCH /api/v1/translation/tasks/{task_id}/status
|
||
{
|
||
"status": "paused"
|
||
}
|
||
```
|
||
|
||
#### 4.1.2 单条翻译
|
||
```python
|
||
# 单条内容翻译
|
||
POST /api/v1/translation/translate
|
||
{
|
||
"source_text": "这是一段需要翻译的文本",
|
||
"source_language": "zh",
|
||
"target_language": "en",
|
||
"config_name": "high_quality_zh_en"
|
||
}
|
||
|
||
# 批量翻译
|
||
POST /api/v1/translation/batch
|
||
{
|
||
"items": [
|
||
{
|
||
"source_text": "文本1",
|
||
"source_language": "zh",
|
||
"target_language": "en"
|
||
},
|
||
{
|
||
"source_text": "文本2",
|
||
"source_language": "zh",
|
||
"target_language": "ja"
|
||
}
|
||
],
|
||
"config_name": "standard_translation"
|
||
}
|
||
```
|
||
|
||
#### 4.1.3 翻译质量管理
|
||
```python
|
||
# 获取翻译记录
|
||
GET /api/v1/translation/records?task_id={task_id}&status=completed
|
||
|
||
# 提交人工审核
|
||
POST /api/v1/translation/records/{record_id}/review
|
||
{
|
||
"review_status": "approved",
|
||
"quality_score": 4.5,
|
||
"review_notes": "翻译质量良好"
|
||
}
|
||
|
||
# 获取质量统计
|
||
GET /api/v1/translation/quality/stats?source_language=zh&target_language=en&date_range=7d
|
||
```
|
||
|
||
### 4.2 配置管理API
|
||
```python
|
||
# 获取翻译配置
|
||
GET /api/v1/translation/configs
|
||
|
||
# 创建翻译配置
|
||
POST /api/v1/translation/configs
|
||
{
|
||
"config_name": "premium_zh_en",
|
||
"ai_provider": "rageflow",
|
||
"model_name": "gpt-4",
|
||
"source_language": "zh",
|
||
"target_language": "en",
|
||
"translation_params": {
|
||
"temperature": 0.2,
|
||
"max_tokens": 4096,
|
||
"context_length": 8192
|
||
},
|
||
"quality_threshold": 0.9,
|
||
"rate_limit_per_minute": 30
|
||
}
|
||
|
||
# 更新配置
|
||
PUT /api/v1/translation/configs/{config_id}
|
||
|
||
# 测试配置
|
||
POST /api/v1/translation/configs/{config_id}/test
|
||
{
|
||
"test_text": "测试文本",
|
||
"source_language": "zh",
|
||
"target_language": "en"
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 5. 翻译引擎设计
|
||
|
||
### 5.1 核心翻译类
|
||
```python
|
||
from typing import List, Dict, Optional
|
||
from dataclasses import dataclass
|
||
from enum import Enum
|
||
|
||
class TranslationProvider(Enum):
|
||
RAGEFLOW = "rageflow"
|
||
OPENAI = "openai"
|
||
GOOGLE = "google"
|
||
BAIDU = "baidu"
|
||
|
||
@dataclass
|
||
class TranslationRequest:
|
||
source_text: str
|
||
source_language: str
|
||
target_language: str
|
||
provider: TranslationProvider
|
||
config: Dict
|
||
context: Optional[str] = None
|
||
|
||
@dataclass
|
||
class TranslationResult:
|
||
translated_text: str
|
||
confidence_score: float
|
||
quality_score: float
|
||
provider: str
|
||
model_version: str
|
||
processing_time: float
|
||
error_message: Optional[str] = None
|
||
|
||
class TranslationEngine:
|
||
def __init__(self):
|
||
self.providers = {
|
||
TranslationProvider.RAGEFLOW: RageFlowProvider(),
|
||
TranslationProvider.OPENAI: OpenAIProvider(),
|
||
TranslationProvider.GOOGLE: GoogleProvider(),
|
||
}
|
||
|
||
async def translate(self, request: TranslationRequest) -> TranslationResult:
|
||
"""执行翻译"""
|
||
provider = self.providers[request.provider]
|
||
|
||
try:
|
||
# 预处理
|
||
processed_text = await self._preprocess_text(request.source_text, request.source_language)
|
||
|
||
# 执行翻译
|
||
result = await provider.translate(
|
||
text=processed_text,
|
||
source_lang=request.source_language,
|
||
target_lang=request.target_language,
|
||
config=request.config,
|
||
context=request.context
|
||
)
|
||
|
||
# 后处理
|
||
final_text = await self._postprocess_text(result.translated_text, request.target_language)
|
||
|
||
# 质量评估
|
||
quality_score = await self._assess_quality(request, result)
|
||
|
||
return TranslationResult(
|
||
translated_text=final_text,
|
||
confidence_score=result.confidence_score,
|
||
quality_score=quality_score,
|
||
provider=request.provider.value,
|
||
model_version=result.model_version,
|
||
processing_time=result.processing_time
|
||
)
|
||
|
||
except Exception as e:
|
||
return TranslationResult(
|
||
translated_text="",
|
||
confidence_score=0.0,
|
||
quality_score=0.0,
|
||
provider=request.provider.value,
|
||
model_version="",
|
||
processing_time=0.0,
|
||
error_message=str(e)
|
||
)
|
||
|
||
async def batch_translate(self, requests: List[TranslationRequest]) -> List[TranslationResult]:
|
||
"""批量翻译"""
|
||
tasks = [self.translate(req) for req in requests]
|
||
return await asyncio.gather(*tasks)
|
||
```
|
||
|
||
### 5.2 RageFlow提供者实现
|
||
```python
|
||
import aiohttp
|
||
from typing import Dict, Any
|
||
|
||
class RageFlowProvider:
|
||
def __init__(self, api_key: str, base_url: str):
|
||
self.api_key = api_key
|
||
self.base_url = base_url
|
||
self.session = None
|
||
|
||
async def translate(self, text: str, source_lang: str, target_lang: str,
|
||
config: Dict, context: Optional[str] = None) -> TranslationResult:
|
||
"""使用RageFlow API进行翻译"""
|
||
|
||
if not self.session:
|
||
self.session = aiohttp.ClientSession()
|
||
|
||
# 构建prompt
|
||
prompt = self._build_translation_prompt(text, source_lang, target_lang, context)
|
||
|
||
# API请求
|
||
payload = {
|
||
"model": config.get("model", "gpt-4"),
|
||
"messages": [
|
||
{"role": "system", "content": "You are a professional translator."},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
"temperature": config.get("temperature", 0.3),
|
||
"max_tokens": config.get("max_tokens", 2048),
|
||
"stream": False
|
||
}
|
||
|
||
headers = {
|
||
"Authorization": f"Bearer {self.api_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
start_time = asyncio.get_event_loop().time()
|
||
|
||
async with self.session.post(f"{self.base_url}/chat/completions",
|
||
json=payload, headers=headers) as response:
|
||
|
||
processing_time = asyncio.get_event_loop().time() - start_time
|
||
|
||
if response.status == 200:
|
||
data = await response.json()
|
||
translated_text = data["choices"][0]["message"]["content"]
|
||
|
||
# 解析置信度分数(如果API返回)
|
||
confidence_score = self._extract_confidence(data)
|
||
|
||
return TranslationResult(
|
||
translated_text=translated_text.strip(),
|
||
confidence_score=confidence_score,
|
||
quality_score=0.0, # 后续评估
|
||
provider="rageflow",
|
||
model_version=config.get("model", "gpt-4"),
|
||
processing_time=processing_time
|
||
)
|
||
else:
|
||
error_data = await response.json()
|
||
raise Exception(f"RageFlow API Error: {error_data}")
|
||
|
||
def _build_translation_prompt(self, text: str, source_lang: str,
|
||
target_lang: str, context: Optional[str]) -> str:
|
||
"""构建翻译prompt"""
|
||
lang_names = {
|
||
"zh": "Chinese", "en": "English", "ja": "Japanese",
|
||
"fr": "French", "de": "German", "es": "Spanish"
|
||
}
|
||
|
||
prompt = f"""Please translate the following {lang_names.get(source_lang, source_lang)} text to {lang_names.get(target_lang, target_lang)}.
|
||
|
||
Requirements:
|
||
1. Maintain the original meaning and tone
|
||
2. Use natural and fluent expressions in the target language
|
||
3. Preserve any technical terms or proper nouns appropriately
|
||
4. Consider cultural context and adapt accordingly
|
||
|
||
"""
|
||
|
||
if context:
|
||
prompt += f"Context: {context}\n\n"
|
||
|
||
prompt += f"Text to translate:\n{text}\n\nTranslation:"
|
||
|
||
return prompt
|
||
```
|
||
|
||
---
|
||
|
||
## 6. 任务调度系统
|
||
|
||
### 6.1 Celery任务定义
|
||
```python
|
||
from celery import Celery, group, chain
|
||
from celery.result import AsyncResult
|
||
import asyncio
|
||
|
||
app = Celery('translation_service')
|
||
|
||
@app.task(bind=True, max_retries=3)
|
||
def translate_single_record(self, record_data: Dict) -> Dict:
|
||
"""翻译单条记录"""
|
||
try:
|
||
# 初始化翻译引擎
|
||
engine = TranslationEngine()
|
||
|
||
# 构建翻译请求
|
||
request = TranslationRequest(
|
||
source_text=record_data["source_text"],
|
||
source_language=record_data["source_language"],
|
||
target_language=record_data["target_language"],
|
||
provider=TranslationProvider(record_data["provider"]),
|
||
config=record_data["config"]
|
||
)
|
||
|
||
# 执行翻译
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
result = loop.run_until_complete(engine.translate(request))
|
||
loop.close()
|
||
|
||
# 保存结果到数据库
|
||
save_translation_result(record_data["record_id"], result)
|
||
|
||
return {
|
||
"record_id": record_data["record_id"],
|
||
"status": "completed",
|
||
"quality_score": result.quality_score,
|
||
"processing_time": result.processing_time
|
||
}
|
||
|
||
except Exception as e:
|
||
# 重试机制
|
||
if self.request.retries < self.max_retries:
|
||
raise self.retry(countdown=60 * (self.request.retries + 1))
|
||
|
||
# 保存错误信息
|
||
save_translation_error(record_data["record_id"], str(e))
|
||
|
||
return {
|
||
"record_id": record_data["record_id"],
|
||
"status": "failed",
|
||
"error": str(e)
|
||
}
|
||
|
||
@app.task
|
||
def batch_translate_task(task_id: str) -> Dict:
|
||
"""批量翻译任务"""
|
||
|
||
# 获取任务信息
|
||
task_info = get_translation_task(task_id)
|
||
|
||
# 获取要翻译的记录
|
||
translation_records = get_translation_records(task_id)
|
||
|
||
# 创建子任务组
|
||
job = group(
|
||
translate_single_record.s(record)
|
||
for record in translation_records
|
||
)
|
||
|
||
# 执行任务组
|
||
result = job.apply_async()
|
||
|
||
# 更新任务状态
|
||
update_task_status(task_id, "processing")
|
||
|
||
# 等待完成并汇总结果
|
||
results = result.get()
|
||
|
||
completed = sum(1 for r in results if r["status"] == "completed")
|
||
failed = sum(1 for r in results if r["status"] == "failed")
|
||
|
||
# 更新最终状态
|
||
update_task_completion(task_id, completed, failed)
|
||
|
||
return {
|
||
"task_id": task_id,
|
||
"total": len(results),
|
||
"completed": completed,
|
||
"failed": failed
|
||
}
|
||
|
||
@app.task
|
||
def incremental_translation_task() -> Dict:
|
||
"""增量翻译任务 - 定期执行"""
|
||
|
||
# 查找需要翻译的新内容
|
||
new_contents = find_untranslated_contents()
|
||
|
||
if not new_contents:
|
||
return {"message": "No new content to translate"}
|
||
|
||
# 为每个内容创建翻译任务
|
||
for content in new_contents:
|
||
create_translation_task_for_content(content)
|
||
|
||
return {
|
||
"message": f"Created translation tasks for {len(new_contents)} new contents"
|
||
}
|
||
```
|
||
|
||
### 6.2 任务监控
|
||
```python
|
||
@app.task
|
||
def monitor_translation_tasks() -> Dict:
|
||
"""监控翻译任务状态"""
|
||
|
||
# 检查卡住的任务
|
||
stuck_tasks = find_stuck_tasks()
|
||
for task in stuck_tasks:
|
||
restart_task(task["id"])
|
||
|
||
# 检查失败率高的配置
|
||
high_failure_configs = find_high_failure_configs()
|
||
for config in high_failure_configs:
|
||
disable_config_temporarily(config["id"])
|
||
|
||
# 生成监控报告
|
||
report = {
|
||
"total_active_tasks": count_active_tasks(),
|
||
"average_processing_time": get_average_processing_time(),
|
||
"success_rate_24h": get_success_rate(hours=24),
|
||
"stuck_tasks_restarted": len(stuck_tasks),
|
||
"configs_disabled": len(high_failure_configs)
|
||
}
|
||
|
||
return report
|
||
```
|
||
|
||
---
|
||
|
||
## 7. 质量评估系统
|
||
|
||
### 7.1 自动质量评估
|
||
```python
|
||
import nltk
|
||
from sentence_transformers import SentenceTransformer
|
||
import numpy as np
|
||
|
||
class QualityAssessment:
|
||
def __init__(self):
|
||
self.similarity_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
|
||
|
||
async def assess_translation_quality(self, source_text: str, translated_text: str,
|
||
source_lang: str, target_lang: str) -> Dict[str, float]:
|
||
"""评估翻译质量"""
|
||
|
||
scores = {}
|
||
|
||
# 1. 语义相似度
|
||
scores["semantic_similarity"] = await self._semantic_similarity(source_text, translated_text)
|
||
|
||
# 2. 长度比例检查
|
||
scores["length_ratio"] = self._length_ratio_score(source_text, translated_text)
|
||
|
||
# 3. 术语一致性
|
||
scores["terminology_consistency"] = await self._terminology_consistency(
|
||
source_text, translated_text, source_lang, target_lang
|
||
)
|
||
|
||
# 4. 流畅度评估
|
||
scores["fluency"] = await self._fluency_assessment(translated_text, target_lang)
|
||
|
||
# 5. 综合评分
|
||
scores["overall_quality"] = self._calculate_overall_score(scores)
|
||
|
||
return scores
|
||
|
||
async def _semantic_similarity(self, source: str, translation: str) -> float:
|
||
"""计算语义相似度"""
|
||
try:
|
||
# 使用多语言句子嵌入模型
|
||
embeddings = self.similarity_model.encode([source, translation])
|
||
similarity = np.dot(embeddings[0], embeddings[1]) / (
|
||
np.linalg.norm(embeddings[0]) * np.linalg.norm(embeddings[1])
|
||
)
|
||
return float(similarity)
|
||
except:
|
||
return 0.0
|
||
|
||
def _length_ratio_score(self, source: str, translation: str) -> float:
|
||
"""长度比例评分"""
|
||
source_len = len(source.split())
|
||
trans_len = len(translation.split())
|
||
|
||
if source_len == 0:
|
||
return 0.0
|
||
|
||
ratio = trans_len / source_len
|
||
|
||
# 理想比例范围 (0.5 - 2.0)
|
||
if 0.5 <= ratio <= 2.0:
|
||
return 1.0
|
||
elif ratio < 0.5:
|
||
return ratio * 2 # 线性惩罚太短的翻译
|
||
else:
|
||
return 2.0 / ratio # 线性惩罚太长的翻译
|
||
|
||
async def _terminology_consistency(self, source: str, translation: str,
|
||
source_lang: str, target_lang: str) -> float:
|
||
"""术语一致性检查"""
|
||
# 获取术语词典
|
||
terminology_dict = await self._get_terminology_dict(source_lang, target_lang)
|
||
|
||
if not terminology_dict:
|
||
return 1.0 # 如果没有术语词典,返回满分
|
||
|
||
# 检查源文本中的术语是否正确翻译
|
||
correct_translations = 0
|
||
total_terms = 0
|
||
|
||
for source_term, expected_translation in terminology_dict.items():
|
||
if source_term.lower() in source.lower():
|
||
total_terms += 1
|
||
if expected_translation.lower() in translation.lower():
|
||
correct_translations += 1
|
||
|
||
return correct_translations / total_terms if total_terms > 0 else 1.0
|
||
|
||
async def _fluency_assessment(self, text: str, language: str) -> float:
|
||
"""流畅度评估"""
|
||
# 这里可以集成语言模型来评估流畅度
|
||
# 或者使用规则基础的方法
|
||
|
||
# 简单的规则:检查标点符号、大小写等
|
||
fluency_score = 1.0
|
||
|
||
# 检查标点符号使用
|
||
if not self._has_proper_punctuation(text):
|
||
fluency_score -= 0.1
|
||
|
||
# 检查重复词汇
|
||
if self._has_repetitive_words(text):
|
||
fluency_score -= 0.1
|
||
|
||
return max(0.0, fluency_score)
|
||
|
||
def _calculate_overall_score(self, scores: Dict[str, float]) -> float:
|
||
"""计算综合评分"""
|
||
weights = {
|
||
"semantic_similarity": 0.4,
|
||
"length_ratio": 0.2,
|
||
"terminology_consistency": 0.2,
|
||
"fluency": 0.2
|
||
}
|
||
|
||
weighted_sum = sum(scores[key] * weights[key] for key in weights if key in scores)
|
||
return min(1.0, max(0.0, weighted_sum))
|
||
```
|
||
|
||
### 7.2 人工审核工作流
|
||
```python
|
||
class HumanReviewWorkflow:
|
||
def __init__(self, db_connection):
|
||
self.db = db_connection
|
||
|
||
async def submit_for_review(self, record_id: str, review_type: str = "quality") -> bool:
|
||
"""提交人工审核"""
|
||
|
||
# 获取翻译记录
|
||
record = await self.db.get_translation_record(record_id)
|
||
|
||
# 检查是否需要审核
|
||
if not self._needs_human_review(record):
|
||
return False
|
||
|
||
# 分配审核员
|
||
reviewer = await self._assign_reviewer(record, review_type)
|
||
|
||
# 创建审核任务
|
||
review_task = {
|
||
"record_id": record_id,
|
||
"reviewer_id": reviewer["id"],
|
||
"review_type": review_type,
|
||
"status": "pending",
|
||
"assigned_at": datetime.now(),
|
||
"priority": self._calculate_priority(record)
|
||
}
|
||
|
||
await self.db.create_review_task(review_task)
|
||
|
||
# 发送通知
|
||
await self._notify_reviewer(reviewer, review_task)
|
||
|
||
return True
|
||
|
||
def _needs_human_review(self, record: Dict) -> bool:
|
||
"""判断是否需要人工审核"""
|
||
|
||
# 质量分数低于阈值
|
||
if record.get("quality_score", 0) < 0.7:
|
||
return True
|
||
|
||
# 置信度低
|
||
if record.get("confidence_score", 0) < 0.8:
|
||
return True
|
||
|
||
# 特殊内容类型(如法律、医疗文档)
|
||
if record.get("content_category") in ["legal", "medical", "financial"]:
|
||
return True
|
||
|
||
# 高价值内容
|
||
if record.get("importance_level") == "high":
|
||
return True
|
||
|
||
return False
|
||
|
||
async def _assign_reviewer(self, record: Dict, review_type: str) -> Dict:
|
||
"""分配审核员"""
|
||
|
||
# 根据语言对查找审核员
|
||
source_lang = record["source_language"]
|
||
target_lang = record["target_language"]
|
||
|
||
# 查找有资格的审核员
|
||
qualified_reviewers = await self.db.get_qualified_reviewers(
|
||
source_lang, target_lang, review_type
|
||
)
|
||
|
||
# 根据工作负载分配
|
||
reviewer = min(qualified_reviewers, key=lambda r: r["current_workload"])
|
||
|
||
return reviewer
|
||
```
|
||
|
||
---
|
||
|
||
## 8. 配置和部署
|
||
|
||
### 8.1 Docker配置
|
||
```dockerfile
|
||
# Dockerfile
|
||
FROM python:3.11-slim
|
||
|
||
# 安装系统依赖
|
||
RUN apt-get update && apt-get install -y \
|
||
gcc \
|
||
g++ \
|
||
&& rm -rf /var/lib/apt/lists/*
|
||
|
||
# 设置工作目录
|
||
WORKDIR /app
|
||
|
||
# 复制依赖文件
|
||
COPY requirements.txt .
|
||
|
||
# 安装Python依赖
|
||
RUN pip install --no-cache-dir -r requirements.txt
|
||
|
||
# 复制应用代码
|
||
COPY . .
|
||
|
||
# 暴露端口
|
||
EXPOSE 8000
|
||
|
||
# 启动命令
|
||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
|
||
```
|
||
|
||
### 8.2 Docker Compose配置
|
||
```yaml
|
||
# docker-compose.yml
|
||
version: '3.8'
|
||
|
||
services:
|
||
translation-api:
|
||
build: .
|
||
ports:
|
||
- "8000:8000"
|
||
environment:
|
||
- SUPABASE_URL=${SUPABASE_URL}
|
||
- SUPABASE_KEY=${SUPABASE_KEY}
|
||
- RAGEFLOW_API_KEY=${RAGEFLOW_API_KEY}
|
||
- REDIS_URL=${REDIS_URL}
|
||
depends_on:
|
||
- redis
|
||
- celery-worker
|
||
networks:
|
||
- translation-network
|
||
|
||
celery-worker:
|
||
build: .
|
||
command: celery -A translation_service.celery_app worker --loglevel=info --concurrency=4
|
||
environment:
|
||
- SUPABASE_URL=${SUPABASE_URL}
|
||
- SUPABASE_KEY=${SUPABASE_KEY}
|
||
- RAGEFLOW_API_KEY=${RAGEFLOW_API_KEY}
|
||
- REDIS_URL=${REDIS_URL}
|
||
depends_on:
|
||
- redis
|
||
networks:
|
||
- translation-network
|
||
|
||
celery-beat:
|
||
build: .
|
||
command: celery -A translation_service.celery_app beat --loglevel=info
|
||
environment:
|
||
- SUPABASE_URL=${SUPABASE_URL}
|
||
- SUPABASE_KEY=${SUPABASE_KEY}
|
||
- REDIS_URL=${REDIS_URL}
|
||
depends_on:
|
||
- redis
|
||
networks:
|
||
- translation-network
|
||
|
||
redis:
|
||
image: redis:7-alpine
|
||
ports:
|
||
- "6379:6379"
|
||
networks:
|
||
- translation-network
|
||
|
||
flower:
|
||
build: .
|
||
command: celery -A translation_service.celery_app flower --port=5555
|
||
ports:
|
||
- "5555:5555"
|
||
environment:
|
||
- REDIS_URL=${REDIS_URL}
|
||
depends_on:
|
||
- redis
|
||
networks:
|
||
- translation-network
|
||
|
||
networks:
|
||
translation-network:
|
||
driver: bridge
|
||
```
|
||
|
||
### 8.3 环境配置
|
||
```python
|
||
# config.py
|
||
from pydantic import BaseSettings
|
||
from typing import Dict, List
|
||
|
||
class Settings(BaseSettings):
|
||
# Supabase配置
|
||
supabase_url: str
|
||
supabase_key: str
|
||
|
||
# RageFlow配置
|
||
rageflow_api_key: str
|
||
rageflow_base_url: str = "https://api.rageflow.ai/v1"
|
||
|
||
# Redis配置
|
||
redis_url: str = "redis://localhost:6379/0"
|
||
|
||
# Celery配置
|
||
celery_broker_url: str = "redis://localhost:6379/0"
|
||
celery_result_backend: str = "redis://localhost:6379/0"
|
||
|
||
# 翻译服务配置
|
||
default_translation_provider: str = "rageflow"
|
||
max_concurrent_translations: int = 10
|
||
translation_timeout: int = 300
|
||
|
||
# 质量评估配置
|
||
quality_threshold: float = 0.8
|
||
require_human_review_threshold: float = 0.7
|
||
|
||
# 速率限制
|
||
rate_limit_per_minute: int = 60
|
||
rate_limit_per_hour: int = 1000
|
||
|
||
# 监控配置
|
||
enable_prometheus_metrics: bool = True
|
||
log_level: str = "INFO"
|
||
|
||
class Config:
|
||
env_file = ".env"
|
||
|
||
settings = Settings()
|
||
```
|
||
|
||
---
|
||
|
||
## 9. 监控和运维
|
||
|
||
### 9.1 监控指标
|
||
```python
|
||
from prometheus_client import Counter, Histogram, Gauge
|
||
import time
|
||
|
||
# 定义监控指标
|
||
translation_requests_total = Counter(
|
||
'translation_requests_total',
|
||
'Total number of translation requests',
|
||
['provider', 'source_language', 'target_language', 'status']
|
||
)
|
||
|
||
translation_duration = Histogram(
|
||
'translation_duration_seconds',
|
||
'Time spent on translation',
|
||
['provider', 'source_language', 'target_language']
|
||
)
|
||
|
||
translation_quality_score = Histogram(
|
||
'translation_quality_score',
|
||
'Translation quality scores',
|
||
['provider', 'language_pair']
|
||
)
|
||
|
||
active_tasks = Gauge(
|
||
'active_translation_tasks',
|
||
'Number of active translation tasks'
|
||
)
|
||
|
||
failed_translations = Counter(
|
||
'failed_translations_total',
|
||
'Total number of failed translations',
|
||
['provider', 'error_type']
|
||
)
|
||
|
||
# 监控装饰器
|
||
def monitor_translation(func):
|
||
async def wrapper(*args, **kwargs):
|
||
start_time = time.time()
|
||
|
||
try:
|
||
result = await func(*args, **kwargs)
|
||
|
||
# 记录成功指标
|
||
translation_requests_total.labels(
|
||
provider=result.provider,
|
||
source_language=kwargs.get('source_lang'),
|
||
target_language=kwargs.get('target_lang'),
|
||
status='success'
|
||
).inc()
|
||
|
||
translation_duration.labels(
|
||
provider=result.provider,
|
||
source_language=kwargs.get('source_lang'),
|
||
target_language=kwargs.get('target_lang')
|
||
).observe(time.time() - start_time)
|
||
|
||
translation_quality_score.labels(
|
||
provider=result.provider,
|
||
language_pair=f"{kwargs.get('source_lang')}-{kwargs.get('target_lang')}"
|
||
).observe(result.quality_score)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
# 记录失败指标
|
||
translation_requests_total.labels(
|
||
provider=kwargs.get('provider', 'unknown'),
|
||
source_language=kwargs.get('source_lang'),
|
||
target_language=kwargs.get('target_lang'),
|
||
status='failed'
|
||
).inc()
|
||
|
||
failed_translations.labels(
|
||
provider=kwargs.get('provider', 'unknown'),
|
||
error_type=type(e).__name__
|
||
).inc()
|
||
|
||
raise
|
||
|
||
return wrapper
|
||
```
|
||
|
||
### 9.2 健康检查
|
||
```python
|
||
from fastapi import APIRouter, HTTPException
|
||
from typing import Dict
|
||
|
||
health_router = APIRouter()
|
||
|
||
@health_router.get("/health")
|
||
async def health_check() -> Dict:
|
||
"""系统健康检查"""
|
||
|
||
health_status = {
|
||
"status": "healthy",
|
||
"timestamp": datetime.now().isoformat(),
|
||
"services": {}
|
||
}
|
||
|
||
# 检查数据库连接
|
||
try:
|
||
await check_database_connection()
|
||
health_status["services"]["database"] = "healthy"
|
||
except Exception as e:
|
||
health_status["services"]["database"] = f"unhealthy: {str(e)}"
|
||
health_status["status"] = "unhealthy"
|
||
|
||
# 检查Redis连接
|
||
try:
|
||
await check_redis_connection()
|
||
health_status["services"]["redis"] = "healthy"
|
||
except Exception as e:
|
||
health_status["services"]["redis"] = f"unhealthy: {str(e)}"
|
||
health_status["status"] = "unhealthy"
|
||
|
||
# 检查RageFlow API
|
||
try:
|
||
await check_rageflow_api()
|
||
health_status["services"]["rageflow"] = "healthy"
|
||
except Exception as e:
|
||
health_status["services"]["rageflow"] = f"unhealthy: {str(e)}"
|
||
health_status["status"] = "unhealthy"
|
||
|
||
# 检查Celery工作者
|
||
try:
|
||
worker_status = await check_celery_workers()
|
||
health_status["services"]["celery"] = worker_status
|
||
except Exception as e:
|
||
health_status["services"]["celery"] = f"unhealthy: {str(e)}"
|
||
health_status["status"] = "unhealthy"
|
||
|
||
if health_status["status"] == "unhealthy":
|
||
raise HTTPException(status_code=503, detail=health_status)
|
||
|
||
return health_status
|
||
|
||
@health_router.get("/metrics")
|
||
async def get_metrics() -> Dict:
|
||
"""获取系统指标"""
|
||
|
||
return {
|
||
"active_tasks": await count_active_tasks(),
|
||
"completed_tasks_24h": await count_completed_tasks(hours=24),
|
||
"failed_tasks_24h": await count_failed_tasks(hours=24),
|
||
"average_quality_score": await get_average_quality_score(),
|
||
"average_processing_time": await get_average_processing_time(),
|
||
"success_rate": await calculate_success_rate(),
|
||
"supported_language_pairs": await get_supported_language_pairs()
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 10. 使用示例
|
||
|
||
### 10.1 基本使用流程
|
||
```python
|
||
# 示例:为ak_contents表创建翻译任务
|
||
|
||
import asyncio
|
||
from translation_service import TranslationAPI
|
||
|
||
async def main():
|
||
# 初始化API客户端
|
||
api = TranslationAPI(base_url="http://localhost:8000")
|
||
|
||
# 1. 创建批量翻译任务
|
||
task_request = {
|
||
"task_name": "翻译最新文章",
|
||
"task_type": "batch",
|
||
"source_table": "ak_contents",
|
||
"source_ids": ["content-uuid-1", "content-uuid-2"],
|
||
"source_language": "zh",
|
||
"target_languages": ["en", "ja", "fr"],
|
||
"translation_config": {
|
||
"provider": "rageflow",
|
||
"model": "gpt-4",
|
||
"temperature": 0.3,
|
||
"quality_threshold": 0.8
|
||
}
|
||
}
|
||
|
||
# 提交任务
|
||
task = await api.create_translation_task(task_request)
|
||
print(f"任务已创建: {task['id']}")
|
||
|
||
# 2. 监控任务进度
|
||
while True:
|
||
status = await api.get_task_status(task['id'])
|
||
print(f"任务状态: {status['status']}, 进度: {status['progress_percentage']}%")
|
||
|
||
if status['status'] in ['completed', 'failed']:
|
||
break
|
||
|
||
await asyncio.sleep(5)
|
||
|
||
# 3. 获取翻译结果
|
||
if status['status'] == 'completed':
|
||
results = await api.get_translation_results(task['id'])
|
||
print(f"翻译完成,成功: {results['completed_items']}, 失败: {results['failed_items']}")
|
||
|
||
# 查看具体翻译记录
|
||
records = await api.get_translation_records(task_id=task['id'])
|
||
for record in records:
|
||
print(f"原文: {record['source_text'][:50]}...")
|
||
print(f"译文: {record['translated_text'][:50]}...")
|
||
print(f"质量分数: {record['quality_score']}")
|
||
print("---")
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|
||
```
|
||
|
||
### 10.2 配置管理示例
|
||
```python
|
||
# 创建高质量翻译配置
|
||
config_request = {
|
||
"config_name": "premium_zh_en",
|
||
"ai_provider": "rageflow",
|
||
"model_name": "gpt-4",
|
||
"source_language": "zh",
|
||
"target_language": "en",
|
||
"translation_params": {
|
||
"temperature": 0.2,
|
||
"max_tokens": 4096,
|
||
"top_p": 0.9,
|
||
"frequency_penalty": 0.1
|
||
},
|
||
"quality_threshold": 0.9,
|
||
"rate_limit_per_minute": 30,
|
||
"cultural_adaptation_rules": {
|
||
"formal_tone": True,
|
||
"localize_currency": True,
|
||
"localize_dates": True
|
||
},
|
||
"terminology_glossary": {
|
||
"人工智能": "Artificial Intelligence",
|
||
"机器学习": "Machine Learning",
|
||
"深度学习": "Deep Learning"
|
||
}
|
||
}
|
||
|
||
# 创建配置
|
||
config = await api.create_translation_config(config_request)
|
||
|
||
# 测试配置
|
||
test_result = await api.test_config(
|
||
config['id'],
|
||
"这是一个测试人工智能翻译质量的文本。",
|
||
"zh",
|
||
"en"
|
||
)
|
||
print(f"测试翻译: {test_result['translated_text']}")
|
||
print(f"质量分数: {test_result['quality_score']}")
|
||
```
|
||
|
||
---
|
||
|
||
## 11. 总结
|
||
|
||
本系统设计提供了一个完整的基于Supabase的自动翻译服务解决方案,具有以下特点:
|
||
|
||
### 11.1 核心优势
|
||
- **高扩展性**: 微服务架构,支持水平扩展
|
||
- **高可靠性**: 完善的错误处理和重试机制
|
||
- **智能化**: 集成AI大模型,支持多种翻译提供商
|
||
- **质量保证**: 自动质量评估 + 人工审核
|
||
- **易监控**: 完整的监控和告警体系
|
||
|
||
### 11.2 技术特点
|
||
- 基于Supabase的现代化数据库设计
|
||
- 异步任务处理提升性能
|
||
- RESTful API设计便于集成
|
||
- Docker容器化部署
|
||
- 完善的监控和日志
|
||
|
||
### 11.3 应用场景
|
||
- 大规模内容自动翻译
|
||
- 多语言网站内容管理
|
||
- 企业文档翻译
|
||
- 新闻资讯多语言发布
|
||
- 产品描述国际化
|
||
|
||
通过本系统,您可以实现高效、高质量的自动翻译服务,满足各种多语言内容管理需求。
|