39 KiB
39 KiB
Supabase自动翻译服务系统设计文档
项目概述
1.1 项目背景
基于现有的多语言内容管理系统,构建一个智能自动翻译服务,利用Supabase作为数据库和后端服务,Python作为翻译引擎,RageFlow API作为大模型接口,实现内容的自动多语言翻译。
1.2 核心目标
- 自动翻译
ak_contents、ak_content_translations、topic、categories等表的内容 - 支持多种语言间的智能翻译
- 提供翻译质量评估和人工审核机制
- 实现增量翻译和批量翻译功能
- 保证翻译的一致性和文化适应性
1.3 技术栈
- 数据库: Supabase (PostgreSQL)
- 后端服务: Python + FastAPI
- AI翻译: RageFlow API + 大模型 (GPT-4, Claude等)
- 队列系统: Celery + Redis
- 监控: Prometheus + Grafana
- 部署: Docker + Kubernetes
2. 系统架构设计
2.1 整体架构图
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Web前端 │ │ Translation │ │ RageFlow API │
│ (管理界面) │◄──►│ Service API │◄──►│ (大模型接口) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Task Queue │◄──►│ Translation │◄──►│ Supabase │
│ (Celery+Redis)│ │ Engine │ │ (PostgreSQL) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌──────────────────┐
│ Quality │
│ Assessment │
└──────────────────┘
2.2 核心组件
2.2.1 Translation Service API
- 功能: RESTful API服务,处理翻译请求
- 技术: FastAPI + Pydantic
- 职责:
- 接收翻译请求
- 任务调度和状态管理
- 翻译结果返回
2.2.2 Translation Engine
- 功能: 核心翻译引擎
- 技术: Python + RageFlow SDK
- 职责:
- 调用大模型进行翻译
- 翻译质量评估
- 批量处理优化
2.2.3 Task Queue System
- 功能: 异步任务处理
- 技术: Celery + Redis
- 职责:
- 大批量翻译任务队列
- 任务优先级管理
- 失败重试机制
3. 数据库设计
3.1 翻译任务管理表
-- 翻译任务表
CREATE TABLE public.ak_translation_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_name VARCHAR(255) NOT NULL,
task_type VARCHAR(50) NOT NULL, -- 'single', 'batch', 'incremental'
source_table VARCHAR(100) NOT NULL, -- 'ak_contents', 'ak_content_translations', etc.
source_ids JSONB, -- 要翻译的记录ID列表
source_language VARCHAR(10) NOT NULL,
target_languages VARCHAR(10)[] NOT NULL,
status VARCHAR(20) DEFAULT 'pending', -- 'pending', 'processing', 'completed', 'failed', 'paused'
progress_percentage INTEGER DEFAULT 0,
total_items INTEGER DEFAULT 0,
completed_items INTEGER DEFAULT 0,
failed_items INTEGER DEFAULT 0,
-- 翻译配置
translation_config JSONB DEFAULT '{}', -- 翻译参数配置
quality_threshold FLOAT DEFAULT 0.8,
require_human_review BOOLEAN DEFAULT false,
-- 元数据
created_by UUID REFERENCES public.ak_users(id),
created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
-- 错误处理
error_message TEXT,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3
);
CREATE INDEX idx_translation_tasks_status ON public.ak_translation_tasks(status);
CREATE INDEX idx_translation_tasks_type ON public.ak_translation_tasks(task_type);
CREATE INDEX idx_translation_tasks_created_at ON public.ak_translation_tasks(created_at DESC);
3.2 翻译记录表
-- 翻译记录表
CREATE TABLE public.ak_translation_records (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_id UUID REFERENCES public.ak_translation_tasks(id) ON DELETE CASCADE,
-- 源数据信息
source_table VARCHAR(100) NOT NULL,
source_id UUID NOT NULL,
source_field VARCHAR(100) NOT NULL, -- 翻译的字段名
source_text TEXT NOT NULL,
source_language VARCHAR(10) NOT NULL,
-- 目标数据信息
target_language VARCHAR(10) NOT NULL,
translated_text TEXT,
translation_status VARCHAR(20) DEFAULT 'pending', -- 'pending', 'translating', 'completed', 'failed', 'reviewing'
-- 翻译质量
quality_score FLOAT,
confidence_score FLOAT,
ai_provider VARCHAR(50), -- 'rageflow', 'openai', 'google', etc.
model_version VARCHAR(100),
-- 审核信息
reviewed_by UUID REFERENCES public.ak_users(id),
reviewed_at TIMESTAMP WITH TIME ZONE,
review_status VARCHAR(20), -- 'approved', 'rejected', 'needs_revision'
review_notes TEXT,
-- 元数据
created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
translated_at TIMESTAMP WITH TIME ZONE,
-- 错误信息
error_message TEXT,
retry_count INTEGER DEFAULT 0,
UNIQUE(source_table, source_id, source_field, target_language)
);
CREATE INDEX idx_translation_records_task ON public.ak_translation_records(task_id);
CREATE INDEX idx_translation_records_status ON public.ak_translation_records(translation_status);
CREATE INDEX idx_translation_records_source ON public.ak_translation_records(source_table, source_id);
CREATE INDEX idx_translation_records_quality ON public.ak_translation_records(quality_score DESC);
3.3 翻译配置表
-- 翻译服务配置表
CREATE TABLE public.ak_translation_configs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
config_name VARCHAR(100) UNIQUE NOT NULL,
config_type VARCHAR(50) NOT NULL, -- 'model', 'language_pair', 'quality', 'rate_limit'
-- 配置内容
source_language VARCHAR(10),
target_language VARCHAR(10),
ai_provider VARCHAR(50) NOT NULL, -- 'rageflow', 'openai', 'google'
model_name VARCHAR(100),
-- 翻译参数
translation_params JSONB DEFAULT '{}', -- 温度、最大长度等参数
quality_threshold FLOAT DEFAULT 0.8,
rate_limit_per_minute INTEGER DEFAULT 60,
-- 文化适应性
cultural_adaptation_rules JSONB DEFAULT '{}',
terminology_glossary JSONB DEFAULT '{}', -- 术语词汇表
-- 状态
is_active BOOLEAN DEFAULT true,
priority INTEGER DEFAULT 1, -- 配置优先级
created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);
CREATE INDEX idx_translation_configs_provider ON public.ak_translation_configs(ai_provider);
CREATE INDEX idx_translation_configs_languages ON public.ak_translation_configs(source_language, target_language);
3.4 翻译质量评估表
-- 翻译质量评估表
CREATE TABLE public.ak_translation_quality_metrics (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
record_id UUID REFERENCES public.ak_translation_records(id) ON DELETE CASCADE,
-- 自动评估指标
bleu_score FLOAT, -- BLEU分数
rouge_score FLOAT, -- ROUGE分数
semantic_similarity FLOAT, -- 语义相似度
fluency_score FLOAT, -- 流畅度评分
adequacy_score FLOAT, -- 充分性评分
-- 人工评估指标
human_quality_score FLOAT, -- 人工质量评分 (1-5)
human_fluency_score FLOAT, -- 人工流畅度评分
human_adequacy_score FLOAT, -- 人工充分性评分
-- 评估元数据
evaluator_type VARCHAR(20) NOT NULL, -- 'automatic', 'human', 'hybrid'
evaluator_id UUID REFERENCES public.ak_users(id),
evaluation_method VARCHAR(50), -- 评估方法
created_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);
CREATE INDEX idx_quality_metrics_record ON public.ak_translation_quality_metrics(record_id);
CREATE INDEX idx_quality_metrics_scores ON public.ak_translation_quality_metrics(bleu_score, human_quality_score);
4. API设计
4.1 核心API端点
4.1.1 翻译任务管理
# 创建翻译任务
POST /api/v1/translation/tasks
{
"task_name": "批量翻译内容",
"task_type": "batch",
"source_table": "ak_contents",
"source_ids": ["uuid1", "uuid2"],
"source_language": "zh",
"target_languages": ["en", "ja", "fr"],
"translation_config": {
"model": "gpt-4",
"temperature": 0.3,
"max_tokens": 2048
},
"quality_threshold": 0.8,
"require_human_review": false
}
# 获取任务状态
GET /api/v1/translation/tasks/{task_id}
# 获取任务列表
GET /api/v1/translation/tasks?status=processing&limit=20
# 暂停/恢复任务
PATCH /api/v1/translation/tasks/{task_id}/status
{
"status": "paused"
}
4.1.2 单条翻译
# 单条内容翻译
POST /api/v1/translation/translate
{
"source_text": "这是一段需要翻译的文本",
"source_language": "zh",
"target_language": "en",
"config_name": "high_quality_zh_en"
}
# 批量翻译
POST /api/v1/translation/batch
{
"items": [
{
"source_text": "文本1",
"source_language": "zh",
"target_language": "en"
},
{
"source_text": "文本2",
"source_language": "zh",
"target_language": "ja"
}
],
"config_name": "standard_translation"
}
4.1.3 翻译质量管理
# 获取翻译记录
GET /api/v1/translation/records?task_id={task_id}&status=completed
# 提交人工审核
POST /api/v1/translation/records/{record_id}/review
{
"review_status": "approved",
"quality_score": 4.5,
"review_notes": "翻译质量良好"
}
# 获取质量统计
GET /api/v1/translation/quality/stats?source_language=zh&target_language=en&date_range=7d
4.2 配置管理API
# 获取翻译配置
GET /api/v1/translation/configs
# 创建翻译配置
POST /api/v1/translation/configs
{
"config_name": "premium_zh_en",
"ai_provider": "rageflow",
"model_name": "gpt-4",
"source_language": "zh",
"target_language": "en",
"translation_params": {
"temperature": 0.2,
"max_tokens": 4096,
"context_length": 8192
},
"quality_threshold": 0.9,
"rate_limit_per_minute": 30
}
# 更新配置
PUT /api/v1/translation/configs/{config_id}
# 测试配置
POST /api/v1/translation/configs/{config_id}/test
{
"test_text": "测试文本",
"source_language": "zh",
"target_language": "en"
}
5. 翻译引擎设计
5.1 核心翻译类
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
class TranslationProvider(Enum):
RAGEFLOW = "rageflow"
OPENAI = "openai"
GOOGLE = "google"
BAIDU = "baidu"
@dataclass
class TranslationRequest:
source_text: str
source_language: str
target_language: str
provider: TranslationProvider
config: Dict
context: Optional[str] = None
@dataclass
class TranslationResult:
translated_text: str
confidence_score: float
quality_score: float
provider: str
model_version: str
processing_time: float
error_message: Optional[str] = None
class TranslationEngine:
def __init__(self):
self.providers = {
TranslationProvider.RAGEFLOW: RageFlowProvider(),
TranslationProvider.OPENAI: OpenAIProvider(),
TranslationProvider.GOOGLE: GoogleProvider(),
}
async def translate(self, request: TranslationRequest) -> TranslationResult:
"""执行翻译"""
provider = self.providers[request.provider]
try:
# 预处理
processed_text = await self._preprocess_text(request.source_text, request.source_language)
# 执行翻译
result = await provider.translate(
text=processed_text,
source_lang=request.source_language,
target_lang=request.target_language,
config=request.config,
context=request.context
)
# 后处理
final_text = await self._postprocess_text(result.translated_text, request.target_language)
# 质量评估
quality_score = await self._assess_quality(request, result)
return TranslationResult(
translated_text=final_text,
confidence_score=result.confidence_score,
quality_score=quality_score,
provider=request.provider.value,
model_version=result.model_version,
processing_time=result.processing_time
)
except Exception as e:
return TranslationResult(
translated_text="",
confidence_score=0.0,
quality_score=0.0,
provider=request.provider.value,
model_version="",
processing_time=0.0,
error_message=str(e)
)
async def batch_translate(self, requests: List[TranslationRequest]) -> List[TranslationResult]:
"""批量翻译"""
tasks = [self.translate(req) for req in requests]
return await asyncio.gather(*tasks)
5.2 RageFlow提供者实现
import aiohttp
from typing import Dict, Any
class RageFlowProvider:
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
self.session = None
async def translate(self, text: str, source_lang: str, target_lang: str,
config: Dict, context: Optional[str] = None) -> TranslationResult:
"""使用RageFlow API进行翻译"""
if not self.session:
self.session = aiohttp.ClientSession()
# 构建prompt
prompt = self._build_translation_prompt(text, source_lang, target_lang, context)
# API请求
payload = {
"model": config.get("model", "gpt-4"),
"messages": [
{"role": "system", "content": "You are a professional translator."},
{"role": "user", "content": prompt}
],
"temperature": config.get("temperature", 0.3),
"max_tokens": config.get("max_tokens", 2048),
"stream": False
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
start_time = asyncio.get_event_loop().time()
async with self.session.post(f"{self.base_url}/chat/completions",
json=payload, headers=headers) as response:
processing_time = asyncio.get_event_loop().time() - start_time
if response.status == 200:
data = await response.json()
translated_text = data["choices"][0]["message"]["content"]
# 解析置信度分数(如果API返回)
confidence_score = self._extract_confidence(data)
return TranslationResult(
translated_text=translated_text.strip(),
confidence_score=confidence_score,
quality_score=0.0, # 后续评估
provider="rageflow",
model_version=config.get("model", "gpt-4"),
processing_time=processing_time
)
else:
error_data = await response.json()
raise Exception(f"RageFlow API Error: {error_data}")
def _build_translation_prompt(self, text: str, source_lang: str,
target_lang: str, context: Optional[str]) -> str:
"""构建翻译prompt"""
lang_names = {
"zh": "Chinese", "en": "English", "ja": "Japanese",
"fr": "French", "de": "German", "es": "Spanish"
}
prompt = f"""Please translate the following {lang_names.get(source_lang, source_lang)} text to {lang_names.get(target_lang, target_lang)}.
Requirements:
1. Maintain the original meaning and tone
2. Use natural and fluent expressions in the target language
3. Preserve any technical terms or proper nouns appropriately
4. Consider cultural context and adapt accordingly
"""
if context:
prompt += f"Context: {context}\n\n"
prompt += f"Text to translate:\n{text}\n\nTranslation:"
return prompt
6. 任务调度系统
6.1 Celery任务定义
from celery import Celery, group, chain
from celery.result import AsyncResult
import asyncio
app = Celery('translation_service')
@app.task(bind=True, max_retries=3)
def translate_single_record(self, record_data: Dict) -> Dict:
"""翻译单条记录"""
try:
# 初始化翻译引擎
engine = TranslationEngine()
# 构建翻译请求
request = TranslationRequest(
source_text=record_data["source_text"],
source_language=record_data["source_language"],
target_language=record_data["target_language"],
provider=TranslationProvider(record_data["provider"]),
config=record_data["config"]
)
# 执行翻译
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(engine.translate(request))
loop.close()
# 保存结果到数据库
save_translation_result(record_data["record_id"], result)
return {
"record_id": record_data["record_id"],
"status": "completed",
"quality_score": result.quality_score,
"processing_time": result.processing_time
}
except Exception as e:
# 重试机制
if self.request.retries < self.max_retries:
raise self.retry(countdown=60 * (self.request.retries + 1))
# 保存错误信息
save_translation_error(record_data["record_id"], str(e))
return {
"record_id": record_data["record_id"],
"status": "failed",
"error": str(e)
}
@app.task
def batch_translate_task(task_id: str) -> Dict:
"""批量翻译任务"""
# 获取任务信息
task_info = get_translation_task(task_id)
# 获取要翻译的记录
translation_records = get_translation_records(task_id)
# 创建子任务组
job = group(
translate_single_record.s(record)
for record in translation_records
)
# 执行任务组
result = job.apply_async()
# 更新任务状态
update_task_status(task_id, "processing")
# 等待完成并汇总结果
results = result.get()
completed = sum(1 for r in results if r["status"] == "completed")
failed = sum(1 for r in results if r["status"] == "failed")
# 更新最终状态
update_task_completion(task_id, completed, failed)
return {
"task_id": task_id,
"total": len(results),
"completed": completed,
"failed": failed
}
@app.task
def incremental_translation_task() -> Dict:
"""增量翻译任务 - 定期执行"""
# 查找需要翻译的新内容
new_contents = find_untranslated_contents()
if not new_contents:
return {"message": "No new content to translate"}
# 为每个内容创建翻译任务
for content in new_contents:
create_translation_task_for_content(content)
return {
"message": f"Created translation tasks for {len(new_contents)} new contents"
}
6.2 任务监控
@app.task
def monitor_translation_tasks() -> Dict:
"""监控翻译任务状态"""
# 检查卡住的任务
stuck_tasks = find_stuck_tasks()
for task in stuck_tasks:
restart_task(task["id"])
# 检查失败率高的配置
high_failure_configs = find_high_failure_configs()
for config in high_failure_configs:
disable_config_temporarily(config["id"])
# 生成监控报告
report = {
"total_active_tasks": count_active_tasks(),
"average_processing_time": get_average_processing_time(),
"success_rate_24h": get_success_rate(hours=24),
"stuck_tasks_restarted": len(stuck_tasks),
"configs_disabled": len(high_failure_configs)
}
return report
7. 质量评估系统
7.1 自动质量评估
import nltk
from sentence_transformers import SentenceTransformer
import numpy as np
class QualityAssessment:
def __init__(self):
self.similarity_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
async def assess_translation_quality(self, source_text: str, translated_text: str,
source_lang: str, target_lang: str) -> Dict[str, float]:
"""评估翻译质量"""
scores = {}
# 1. 语义相似度
scores["semantic_similarity"] = await self._semantic_similarity(source_text, translated_text)
# 2. 长度比例检查
scores["length_ratio"] = self._length_ratio_score(source_text, translated_text)
# 3. 术语一致性
scores["terminology_consistency"] = await self._terminology_consistency(
source_text, translated_text, source_lang, target_lang
)
# 4. 流畅度评估
scores["fluency"] = await self._fluency_assessment(translated_text, target_lang)
# 5. 综合评分
scores["overall_quality"] = self._calculate_overall_score(scores)
return scores
async def _semantic_similarity(self, source: str, translation: str) -> float:
"""计算语义相似度"""
try:
# 使用多语言句子嵌入模型
embeddings = self.similarity_model.encode([source, translation])
similarity = np.dot(embeddings[0], embeddings[1]) / (
np.linalg.norm(embeddings[0]) * np.linalg.norm(embeddings[1])
)
return float(similarity)
except:
return 0.0
def _length_ratio_score(self, source: str, translation: str) -> float:
"""长度比例评分"""
source_len = len(source.split())
trans_len = len(translation.split())
if source_len == 0:
return 0.0
ratio = trans_len / source_len
# 理想比例范围 (0.5 - 2.0)
if 0.5 <= ratio <= 2.0:
return 1.0
elif ratio < 0.5:
return ratio * 2 # 线性惩罚太短的翻译
else:
return 2.0 / ratio # 线性惩罚太长的翻译
async def _terminology_consistency(self, source: str, translation: str,
source_lang: str, target_lang: str) -> float:
"""术语一致性检查"""
# 获取术语词典
terminology_dict = await self._get_terminology_dict(source_lang, target_lang)
if not terminology_dict:
return 1.0 # 如果没有术语词典,返回满分
# 检查源文本中的术语是否正确翻译
correct_translations = 0
total_terms = 0
for source_term, expected_translation in terminology_dict.items():
if source_term.lower() in source.lower():
total_terms += 1
if expected_translation.lower() in translation.lower():
correct_translations += 1
return correct_translations / total_terms if total_terms > 0 else 1.0
async def _fluency_assessment(self, text: str, language: str) -> float:
"""流畅度评估"""
# 这里可以集成语言模型来评估流畅度
# 或者使用规则基础的方法
# 简单的规则:检查标点符号、大小写等
fluency_score = 1.0
# 检查标点符号使用
if not self._has_proper_punctuation(text):
fluency_score -= 0.1
# 检查重复词汇
if self._has_repetitive_words(text):
fluency_score -= 0.1
return max(0.0, fluency_score)
def _calculate_overall_score(self, scores: Dict[str, float]) -> float:
"""计算综合评分"""
weights = {
"semantic_similarity": 0.4,
"length_ratio": 0.2,
"terminology_consistency": 0.2,
"fluency": 0.2
}
weighted_sum = sum(scores[key] * weights[key] for key in weights if key in scores)
return min(1.0, max(0.0, weighted_sum))
7.2 人工审核工作流
class HumanReviewWorkflow:
def __init__(self, db_connection):
self.db = db_connection
async def submit_for_review(self, record_id: str, review_type: str = "quality") -> bool:
"""提交人工审核"""
# 获取翻译记录
record = await self.db.get_translation_record(record_id)
# 检查是否需要审核
if not self._needs_human_review(record):
return False
# 分配审核员
reviewer = await self._assign_reviewer(record, review_type)
# 创建审核任务
review_task = {
"record_id": record_id,
"reviewer_id": reviewer["id"],
"review_type": review_type,
"status": "pending",
"assigned_at": datetime.now(),
"priority": self._calculate_priority(record)
}
await self.db.create_review_task(review_task)
# 发送通知
await self._notify_reviewer(reviewer, review_task)
return True
def _needs_human_review(self, record: Dict) -> bool:
"""判断是否需要人工审核"""
# 质量分数低于阈值
if record.get("quality_score", 0) < 0.7:
return True
# 置信度低
if record.get("confidence_score", 0) < 0.8:
return True
# 特殊内容类型(如法律、医疗文档)
if record.get("content_category") in ["legal", "medical", "financial"]:
return True
# 高价值内容
if record.get("importance_level") == "high":
return True
return False
async def _assign_reviewer(self, record: Dict, review_type: str) -> Dict:
"""分配审核员"""
# 根据语言对查找审核员
source_lang = record["source_language"]
target_lang = record["target_language"]
# 查找有资格的审核员
qualified_reviewers = await self.db.get_qualified_reviewers(
source_lang, target_lang, review_type
)
# 根据工作负载分配
reviewer = min(qualified_reviewers, key=lambda r: r["current_workload"])
return reviewer
8. 配置和部署
8.1 Docker配置
# Dockerfile
FROM python:3.11-slim
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
8.2 Docker Compose配置
# docker-compose.yml
version: '3.8'
services:
translation-api:
build: .
ports:
- "8000:8000"
environment:
- SUPABASE_URL=${SUPABASE_URL}
- SUPABASE_KEY=${SUPABASE_KEY}
- RAGEFLOW_API_KEY=${RAGEFLOW_API_KEY}
- REDIS_URL=${REDIS_URL}
depends_on:
- redis
- celery-worker
networks:
- translation-network
celery-worker:
build: .
command: celery -A translation_service.celery_app worker --loglevel=info --concurrency=4
environment:
- SUPABASE_URL=${SUPABASE_URL}
- SUPABASE_KEY=${SUPABASE_KEY}
- RAGEFLOW_API_KEY=${RAGEFLOW_API_KEY}
- REDIS_URL=${REDIS_URL}
depends_on:
- redis
networks:
- translation-network
celery-beat:
build: .
command: celery -A translation_service.celery_app beat --loglevel=info
environment:
- SUPABASE_URL=${SUPABASE_URL}
- SUPABASE_KEY=${SUPABASE_KEY}
- REDIS_URL=${REDIS_URL}
depends_on:
- redis
networks:
- translation-network
redis:
image: redis:7-alpine
ports:
- "6379:6379"
networks:
- translation-network
flower:
build: .
command: celery -A translation_service.celery_app flower --port=5555
ports:
- "5555:5555"
environment:
- REDIS_URL=${REDIS_URL}
depends_on:
- redis
networks:
- translation-network
networks:
translation-network:
driver: bridge
8.3 环境配置
# config.py
from pydantic import BaseSettings
from typing import Dict, List
class Settings(BaseSettings):
# Supabase配置
supabase_url: str
supabase_key: str
# RageFlow配置
rageflow_api_key: str
rageflow_base_url: str = "https://api.rageflow.ai/v1"
# Redis配置
redis_url: str = "redis://localhost:6379/0"
# Celery配置
celery_broker_url: str = "redis://localhost:6379/0"
celery_result_backend: str = "redis://localhost:6379/0"
# 翻译服务配置
default_translation_provider: str = "rageflow"
max_concurrent_translations: int = 10
translation_timeout: int = 300
# 质量评估配置
quality_threshold: float = 0.8
require_human_review_threshold: float = 0.7
# 速率限制
rate_limit_per_minute: int = 60
rate_limit_per_hour: int = 1000
# 监控配置
enable_prometheus_metrics: bool = True
log_level: str = "INFO"
class Config:
env_file = ".env"
settings = Settings()
9. 监控和运维
9.1 监控指标
from prometheus_client import Counter, Histogram, Gauge
import time
# 定义监控指标
translation_requests_total = Counter(
'translation_requests_total',
'Total number of translation requests',
['provider', 'source_language', 'target_language', 'status']
)
translation_duration = Histogram(
'translation_duration_seconds',
'Time spent on translation',
['provider', 'source_language', 'target_language']
)
translation_quality_score = Histogram(
'translation_quality_score',
'Translation quality scores',
['provider', 'language_pair']
)
active_tasks = Gauge(
'active_translation_tasks',
'Number of active translation tasks'
)
failed_translations = Counter(
'failed_translations_total',
'Total number of failed translations',
['provider', 'error_type']
)
# 监控装饰器
def monitor_translation(func):
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
# 记录成功指标
translation_requests_total.labels(
provider=result.provider,
source_language=kwargs.get('source_lang'),
target_language=kwargs.get('target_lang'),
status='success'
).inc()
translation_duration.labels(
provider=result.provider,
source_language=kwargs.get('source_lang'),
target_language=kwargs.get('target_lang')
).observe(time.time() - start_time)
translation_quality_score.labels(
provider=result.provider,
language_pair=f"{kwargs.get('source_lang')}-{kwargs.get('target_lang')}"
).observe(result.quality_score)
return result
except Exception as e:
# 记录失败指标
translation_requests_total.labels(
provider=kwargs.get('provider', 'unknown'),
source_language=kwargs.get('source_lang'),
target_language=kwargs.get('target_lang'),
status='failed'
).inc()
failed_translations.labels(
provider=kwargs.get('provider', 'unknown'),
error_type=type(e).__name__
).inc()
raise
return wrapper
9.2 健康检查
from fastapi import APIRouter, HTTPException
from typing import Dict
health_router = APIRouter()
@health_router.get("/health")
async def health_check() -> Dict:
"""系统健康检查"""
health_status = {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"services": {}
}
# 检查数据库连接
try:
await check_database_connection()
health_status["services"]["database"] = "healthy"
except Exception as e:
health_status["services"]["database"] = f"unhealthy: {str(e)}"
health_status["status"] = "unhealthy"
# 检查Redis连接
try:
await check_redis_connection()
health_status["services"]["redis"] = "healthy"
except Exception as e:
health_status["services"]["redis"] = f"unhealthy: {str(e)}"
health_status["status"] = "unhealthy"
# 检查RageFlow API
try:
await check_rageflow_api()
health_status["services"]["rageflow"] = "healthy"
except Exception as e:
health_status["services"]["rageflow"] = f"unhealthy: {str(e)}"
health_status["status"] = "unhealthy"
# 检查Celery工作者
try:
worker_status = await check_celery_workers()
health_status["services"]["celery"] = worker_status
except Exception as e:
health_status["services"]["celery"] = f"unhealthy: {str(e)}"
health_status["status"] = "unhealthy"
if health_status["status"] == "unhealthy":
raise HTTPException(status_code=503, detail=health_status)
return health_status
@health_router.get("/metrics")
async def get_metrics() -> Dict:
"""获取系统指标"""
return {
"active_tasks": await count_active_tasks(),
"completed_tasks_24h": await count_completed_tasks(hours=24),
"failed_tasks_24h": await count_failed_tasks(hours=24),
"average_quality_score": await get_average_quality_score(),
"average_processing_time": await get_average_processing_time(),
"success_rate": await calculate_success_rate(),
"supported_language_pairs": await get_supported_language_pairs()
}
10. 使用示例
10.1 基本使用流程
# 示例:为ak_contents表创建翻译任务
import asyncio
from translation_service import TranslationAPI
async def main():
# 初始化API客户端
api = TranslationAPI(base_url="http://localhost:8000")
# 1. 创建批量翻译任务
task_request = {
"task_name": "翻译最新文章",
"task_type": "batch",
"source_table": "ak_contents",
"source_ids": ["content-uuid-1", "content-uuid-2"],
"source_language": "zh",
"target_languages": ["en", "ja", "fr"],
"translation_config": {
"provider": "rageflow",
"model": "gpt-4",
"temperature": 0.3,
"quality_threshold": 0.8
}
}
# 提交任务
task = await api.create_translation_task(task_request)
print(f"任务已创建: {task['id']}")
# 2. 监控任务进度
while True:
status = await api.get_task_status(task['id'])
print(f"任务状态: {status['status']}, 进度: {status['progress_percentage']}%")
if status['status'] in ['completed', 'failed']:
break
await asyncio.sleep(5)
# 3. 获取翻译结果
if status['status'] == 'completed':
results = await api.get_translation_results(task['id'])
print(f"翻译完成,成功: {results['completed_items']}, 失败: {results['failed_items']}")
# 查看具体翻译记录
records = await api.get_translation_records(task_id=task['id'])
for record in records:
print(f"原文: {record['source_text'][:50]}...")
print(f"译文: {record['translated_text'][:50]}...")
print(f"质量分数: {record['quality_score']}")
print("---")
if __name__ == "__main__":
asyncio.run(main())
10.2 配置管理示例
# 创建高质量翻译配置
config_request = {
"config_name": "premium_zh_en",
"ai_provider": "rageflow",
"model_name": "gpt-4",
"source_language": "zh",
"target_language": "en",
"translation_params": {
"temperature": 0.2,
"max_tokens": 4096,
"top_p": 0.9,
"frequency_penalty": 0.1
},
"quality_threshold": 0.9,
"rate_limit_per_minute": 30,
"cultural_adaptation_rules": {
"formal_tone": True,
"localize_currency": True,
"localize_dates": True
},
"terminology_glossary": {
"人工智能": "Artificial Intelligence",
"机器学习": "Machine Learning",
"深度学习": "Deep Learning"
}
}
# 创建配置
config = await api.create_translation_config(config_request)
# 测试配置
test_result = await api.test_config(
config['id'],
"这是一个测试人工智能翻译质量的文本。",
"zh",
"en"
)
print(f"测试翻译: {test_result['translated_text']}")
print(f"质量分数: {test_result['quality_score']}")
11. 总结
本系统设计提供了一个完整的基于Supabase的自动翻译服务解决方案,具有以下特点:
11.1 核心优势
- 高扩展性: 微服务架构,支持水平扩展
- 高可靠性: 完善的错误处理和重试机制
- 智能化: 集成AI大模型,支持多种翻译提供商
- 质量保证: 自动质量评估 + 人工审核
- 易监控: 完整的监控和告警体系
11.2 技术特点
- 基于Supabase的现代化数据库设计
- 异步任务处理提升性能
- RESTful API设计便于集成
- Docker容器化部署
- 完善的监控和日志
11.3 应用场景
- 大规模内容自动翻译
- 多语言网站内容管理
- 企业文档翻译
- 新闻资讯多语言发布
- 产品描述国际化
通过本系统,您可以实现高效、高质量的自动翻译服务,满足各种多语言内容管理需求。