Files
akmon/uni_modules/ak-ai-news/services/ContentProcessingPipeline.uts
2026-01-20 08:04:15 +08:00

756 lines
21 KiB
Plaintext

// Content Processing Pipeline - Automated news content workflow
import {
ContentInfo,
ProcessingStep,
AIProvider,
AIResponse,
AIServiceConfig,
AIServiceError,
TranslationResult,
ContentAnalysisResult,
BatchProcessingOptions
} from '../types/ai-types.uts'
import { AITranslationService } from './AITranslationService.uts'
import { AIContentAnalysisService } from './AIContentAnalysisService.uts'
// 处理阶段枚举
type ProcessingStage =
| 'fetching'
| 'validation'
| 'analysis'
| 'translation'
| 'categorization'
| 'quality_check'
| 'storage'
| 'indexing'
| 'completed'
| 'failed'
// 处理状态
type ProcessingStatus = {
contentId: string
stage: ProcessingStage
progress: number // 0-100
startTime: number
lastUpdateTime: number
completedSteps: string[]
errors: Array<{ step: string, error: string, timestamp: number }>
metadata: UTSJSONObject
}
// 管道配置
type PipelineConfig = {
enabledSteps: string[]
parallelProcessing: boolean
maxConcurrency: number
retryCount: number
timeoutMs: number
qualityThreshold: number
targetLanguages: string[]
categorization: {
enabled: boolean
threshold: number
maxCategories: number
}
translation: {
enabled: boolean
targetLanguages: string[]
qualityThreshold: number
}
analysis: {
enabled: boolean
types: string[]
includeScores: boolean
}
}
// 处理结果
type ProcessingResult = {
contentId: string
originalContent: ContentInfo
processedContent: ContentInfo
translations: Record<string, TranslationResult>
analysis: ContentAnalysisResult
categories: string[]
qualityScore: number
processingTime: number
totalCost: number
status: ProcessingStage
errors: string[]
}
// 管道统计
type PipelineStats = {
totalProcessed: number
successCount: number
errorCount: number
avgProcessingTime: number
totalCost: number
stageStats: Record<ProcessingStage, {
count: number
avgTime: number
errorRate: number
}>
dailyThroughput: number
lastProcessedAt: number
}
/**
* 内容处理管道服务
* 自动化新闻内容的获取、分析、翻译、分类等全流程处理
*/
export class ContentProcessingPipeline {
private config: AIServiceConfig
private pipelineConfig: PipelineConfig
private translationService: AITranslationService
private analysisService: AIContentAnalysisService
private processingQueue: Map<string, ProcessingStatus> = new Map()
private processingSteps: Map<string, ProcessingStep> = new Map()
private stats: PipelineStats
constructor(
aiConfig: AIServiceConfig,
pipelineConfig: Partial<PipelineConfig> = {}
) {
this.config = aiConfig
this.pipelineConfig = this.createDefaultPipelineConfig(pipelineConfig)
this.translationService = new AITranslationService(aiConfig)
this.analysisService = new AIContentAnalysisService(aiConfig)
this.stats = this.initializeStats()
this.initializeProcessingSteps()
}
/**
* 处理单个内容
* @param content 原始内容
*/
async processContent(content: ContentInfo): Promise<AIResponse<ProcessingResult>> {
try {
const contentId = content.id
const startTime = Date.now()
// 初始化处理状态
const status: ProcessingStatus = {
contentId,
stage: 'validation',
progress: 0,
startTime,
lastUpdateTime: startTime,
completedSteps: [],
errors: [],
metadata: {}
}
this.processingQueue.set(contentId, status)
// 执行处理步骤
const result = await this.executeProcessingPipeline(content, status)
// 清理处理队列
this.processingQueue.delete(contentId)
// 更新统计
this.updateStats(result)
return { success: true, data: result }
} catch (error) {
const aiError: AIServiceError = {
code: 'PIPELINE_ERROR',
message: error.message || 'Content processing failed',
retryable: this.isRetryableError(error)
}
return {
success: false,
error: aiError.message,
errorCode: aiError.code
}
}
}
/**
* 批量处理内容
* @param contents 内容列表
* @param batchOptions 批处理选项
*/
async processBatch(
contents: ContentInfo[],
batchOptions: BatchProcessingOptions = {
batchSize: 5,
concurrency: 3,
retryCount: 2,
delayMs: 1000
}
): Promise<AIResponse<ProcessingResult[]>> {
try {
const results: ProcessingResult[] = []
const batches = this.createBatches(contents, batchOptions.batchSize)
for (let i = 0; i < batches.length; i++) {
const batch = batches[i]
if (this.pipelineConfig.parallelProcessing) {
// 并行处理
const batchPromises = batch.map(async (content) => {
try {
const response = await this.processContent(content)
if (response.success && response.data) {
return response.data
}
throw new Error(response.error || 'Processing failed')
} catch (error) {
if (batchOptions.onError) {
batchOptions.onError(error, content)
}
throw error
}
})
const batchResults = await Promise.allSettled(batchPromises)
for (const result of batchResults) {
if (result.status === 'fulfilled') {
results.push(result.value)
}
}
} else {
// 串行处理
for (const content of batch) {
try {
const response = await this.processContent(content)
if (response.success && response.data) {
results.push(response.data)
}
} catch (error) {
if (batchOptions.onError) {
batchOptions.onError(error, content)
}
}
}
}
// 进度回调
if (batchOptions.onProgress) {
batchOptions.onProgress(results.length, contents.length)
}
// 批次间延迟
if (i < batches.length - 1 && batchOptions.delayMs > 0) {
await this.delay(batchOptions.delayMs)
}
}
return { success: true, data: results }
} catch (error) {
return {
success: false,
error: error.message || 'Batch processing failed'
}
}
}
/**
* 获取处理状态
* @param contentId 内容ID
*/
getProcessingStatus(contentId: string): ProcessingStatus | null {
return this.processingQueue.get(contentId) || null
}
/**
* 获取所有处理中的内容状态
*/
getAllProcessingStatus(): ProcessingStatus[] {
return Array.from(this.processingQueue.values())
}
/**
* 添加自定义处理步骤
* @param step 处理步骤
*/
addProcessingStep(step: ProcessingStep): void {
this.processingSteps.set(step.name, step)
}
/**
* 移除处理步骤
* @param stepName 步骤名称
*/
removeProcessingStep(stepName: string): void {
this.processingSteps.delete(stepName)
}
/**
* 更新管道配置
* @param config 新配置
*/
updatePipelineConfig(config: Partial<PipelineConfig>): void {
this.pipelineConfig = { ...this.pipelineConfig, ...config }
}
/**
* 获取管道统计
*/
getPipelineStatistics(): PipelineStats {
return { ...this.stats }
}
/**
* 重置统计数据
*/
resetStatistics(): void {
this.stats = this.initializeStats()
}
// Private methods
private async executeProcessingPipeline(
content: ContentInfo,
status: ProcessingStatus
): Promise<ProcessingResult> {
const result: ProcessingResult = {
contentId: content.id,
originalContent: content,
processedContent: { ...content },
translations: {},
analysis: {} as ContentAnalysisResult,
categories: [],
qualityScore: 0,
processingTime: 0,
totalCost: 0,
status: 'fetching',
errors: []
}
try {
// 1. 内容验证
await this.executeStep('validation', content, result, status)
// 2. 内容分析
if (this.pipelineConfig.analysis.enabled) {
await this.executeStep('analysis', content, result, status)
}
// 3. 内容翻译
if (this.pipelineConfig.translation.enabled && this.pipelineConfig.translation.targetLanguages.length > 0) {
await this.executeStep('translation', content, result, status)
}
// 4. 内容分类
if (this.pipelineConfig.categorization.enabled) {
await this.executeStep('categorization', content, result, status)
}
// 5. 质量检查
await this.executeStep('quality_check', content, result, status)
// 6. 存储处理
await this.executeStep('storage', content, result, status)
// 7. 索引构建
await this.executeStep('indexing', content, result, status)
// 完成处理
result.status = 'completed'
result.processingTime = Date.now() - status.startTime
this.updateProcessingStatus(status, 'completed', 100)
} catch (error) {
result.status = 'failed'
result.errors.push(error.message || 'Unknown error')
this.updateProcessingStatus(status, 'failed', status.progress, error.message)
throw error
}
return result
}
private async executeStep(
stepName: string,
content: ContentInfo,
result: ProcessingResult,
status: ProcessingStatus
): Promise<void> {
const step = this.processingSteps.get(stepName)
if (!step) {
throw new Error(`Processing step '${stepName}' not found`)
}
try {
// 验证前置条件
if (step.validate && !step.validate(result)) {
throw new Error(`Validation failed for step '${stepName}'`)
}
// 执行步骤
const stepResult = await step.execute(result)
// 更新结果
if (stepResult) {
Object.assign(result, stepResult)
}
// 更新状态
status.completedSteps.push(stepName)
const progress = (status.completedSteps.length / 7) * 100 // 7个主要步骤
this.updateProcessingStatus(status, this.getStageFromStep(stepName), progress)
} catch (error) {
// 记录错误
status.errors.push({
step: stepName,
error: error.message || 'Unknown error',
timestamp: Date.now()
})
// 尝试回滚
if (step.rollback) {
try {
await step.rollback(result)
} catch (rollbackError) {
console.error(`Rollback failed for step '${stepName}':`, rollbackError)
}
}
throw error
}
}
private updateProcessingStatus(
status: ProcessingStatus,
stage: ProcessingStage,
progress: number,
error?: string
): void {
status.stage = stage
status.progress = progress
status.lastUpdateTime = Date.now()
if (error) {
status.errors.push({
step: stage,
error,
timestamp: Date.now()
})
}
}
private getStageFromStep(stepName: string): ProcessingStage {
const stageMap: Record<string, ProcessingStage> = {
'validation': 'validation',
'analysis': 'analysis',
'translation': 'translation',
'categorization': 'categorization',
'quality_check': 'quality_check',
'storage': 'storage',
'indexing': 'indexing'
}
return stageMap[stepName] || 'validation'
}
private initializeProcessingSteps(): void {
// 内容验证步骤
this.processingSteps.set('validation', {
name: 'validation',
order: 1,
execute: async (data: ProcessingResult) => {
const content = data.originalContent
// 验证必需字段
if (!content.title || content.title.trim().length === 0) {
throw new Error('Content title is required')
}
if (!content.content || content.content.trim().length < 50) {
throw new Error('Content is too short (minimum 50 characters)')
}
// 验证内容质量
if (content.quality !== undefined && content.quality < this.pipelineConfig.qualityThreshold) {
throw new Error(`Content quality (${content.quality}) below threshold (${this.pipelineConfig.qualityThreshold})`)
}
return data
},
validate: (data: ProcessingResult) => {
return data.originalContent && data.originalContent.title && data.originalContent.content
}
})
// 内容分析步骤
this.processingSteps.set('analysis', {
name: 'analysis',
order: 2,
execute: async (data: ProcessingResult) => {
const response = await this.analysisService.analyzeContent(data.originalContent.content, {
types: this.pipelineConfig.analysis.types as any,
includeScores: this.pipelineConfig.analysis.includeScores,
language: data.originalContent.originalLanguage
})
if (response.success && response.data) {
data.analysis = response.data
data.processedContent.sentiment = response.data.sentimentScore
data.processedContent.readability = response.data.readabilityScore
data.processedContent.credibility = response.data.credibilityScore
data.processedContent.keywords = response.data.keywords
data.totalCost += (response.costUSD || 0)
} else {
throw new Error(response.error || 'Content analysis failed')
}
return data
}
})
// 内容翻译步骤
this.processingSteps.set('translation', {
name: 'translation',
order: 3,
execute: async (data: ProcessingResult) => {
const sourceContent = data.originalContent
const targetLanguages = this.pipelineConfig.translation.targetLanguages
for (const targetLang of targetLanguages) {
if (targetLang === sourceContent.originalLanguage) continue
// 翻译标题
const titleResponse = await this.translationService.translateText(
sourceContent.title,
targetLang,
sourceContent.originalLanguage,
{ qualityThreshold: this.pipelineConfig.translation.qualityThreshold }
)
// 翻译内容
const contentResponse = await this.translationService.translateText(
sourceContent.content,
targetLang,
sourceContent.originalLanguage,
{ qualityThreshold: this.pipelineConfig.translation.qualityThreshold }
)
if (titleResponse.success && contentResponse.success && titleResponse.data && contentResponse.data) {
data.translations[targetLang] = {
...contentResponse.data,
translatedText: `${titleResponse.data.translatedText}\n\n${contentResponse.data.translatedText}`
}
data.totalCost += (titleResponse.costUSD || 0) + (contentResponse.costUSD || 0)
}
}
return data
}
})
// 内容分类步骤
this.processingSteps.set('categorization', {
name: 'categorization',
order: 4,
execute: async (data: ProcessingResult) => {
if (data.analysis && data.analysis.categories) {
const validCategories = data.analysis.categories
.filter(cat => cat.confidence >= this.pipelineConfig.categorization.threshold)
.slice(0, this.pipelineConfig.categorization.maxCategories)
.map(cat => cat.categoryId)
data.categories = validCategories
// 设置主分类
if (validCategories.length > 0) {
data.processedContent.categoryId = validCategories[0]
}
}
return data
}
})
// 质量检查步骤
this.processingSteps.set('quality_check', {
name: 'quality_check',
order: 5,
execute: async (data: ProcessingResult) => {
let qualityScore = 0
let factors = 0
// 基于分析结果的质量评估
if (data.analysis) {
if (data.analysis.readabilityScore !== undefined) {
qualityScore += data.analysis.readabilityScore
factors++
}
if (data.analysis.credibilityScore !== undefined) {
qualityScore += data.analysis.credibilityScore
factors++
}
// 毒性检查
if (data.analysis.toxicityScore !== undefined) {
qualityScore += (1 - data.analysis.toxicityScore) // 毒性越低质量越高
factors++
}
}
// 内容长度评估
const contentLength = data.originalContent.content.length
const lengthScore = contentLength > 500 ? 1 : contentLength / 500
qualityScore += lengthScore
factors++
// 翻译质量评估
if (Object.keys(data.translations).length > 0) {
const translationQualities = Object.values(data.translations).map(t => t.qualityScore)
const avgTranslationQuality = translationQualities.reduce((sum, q) => sum + q, 0) / translationQualities.length
qualityScore += avgTranslationQuality
factors++
}
data.qualityScore = factors > 0 ? qualityScore / factors : 0.5
data.processedContent.quality = data.qualityScore
// 质量阈值检查
if (data.qualityScore < this.pipelineConfig.qualityThreshold) {
console.warn(`Content quality (${data.qualityScore}) below threshold (${this.pipelineConfig.qualityThreshold})`)
}
return data
}
})
// 存储步骤
this.processingSteps.set('storage', {
name: 'storage',
order: 6,
execute: async (data: ProcessingResult) => {
// 模拟存储操作
await this.delay(100)
// 在实际实现中,这里会将处理后的内容保存到数据库
data.processedContent.status = 'published'
data.processedContent.tags = [...(data.processedContent.tags || []), ...data.categories]
return data
}
})
// 索引构建步骤
this.processingSteps.set('indexing', {
name: 'indexing',
order: 7,
execute: async (data: ProcessingResult) => {
// 模拟索引构建
await this.delay(50)
// 在实际实现中,这里会更新搜索索引
console.log(`Content indexed: ${data.contentId}`)
return data
}
})
}
private createDefaultPipelineConfig(overrides: Partial<PipelineConfig>): PipelineConfig {
return {
enabledSteps: ['validation', 'analysis', 'translation', 'categorization', 'quality_check', 'storage', 'indexing'],
parallelProcessing: true,
maxConcurrency: 3,
retryCount: 2,
timeoutMs: 300000, // 5分钟
qualityThreshold: 0.7,
targetLanguages: ['zh-CN', 'en'],
categorization: {
enabled: true,
threshold: 0.6,
maxCategories: 3
},
translation: {
enabled: true,
targetLanguages: ['zh-CN', 'en'],
qualityThreshold: 0.7
},
analysis: {
enabled: true,
types: ['sentiment', 'entities', 'topics', 'categories', 'readability', 'credibility', 'summary', 'keywords'],
includeScores: true
},
...overrides
}
}
private initializeStats(): PipelineStats {
const stages: ProcessingStage[] = [
'fetching', 'validation', 'analysis', 'translation',
'categorization', 'quality_check', 'storage', 'indexing',
'completed', 'failed'
]
const stageStats: Record<ProcessingStage, any> = {} as Record<ProcessingStage, any>
stages.forEach(stage => {
stageStats[stage] = {
count: 0,
avgTime: 0,
errorRate: 0
}
})
return {
totalProcessed: 0,
successCount: 0,
errorCount: 0,
avgProcessingTime: 0,
totalCost: 0,
stageStats,
dailyThroughput: 0,
lastProcessedAt: 0
}
}
private updateStats(result: ProcessingResult): void {
this.stats.totalProcessed++
this.stats.lastProcessedAt = Date.now()
if (result.status === 'completed') {
this.stats.successCount++
} else {
this.stats.errorCount++
}
// 更新平均处理时间
this.stats.avgProcessingTime = (
this.stats.avgProcessingTime * (this.stats.totalProcessed - 1) + result.processingTime
) / this.stats.totalProcessed
// 更新总成本
this.stats.totalCost += result.totalCost
// 更新阶段统计
this.stats.stageStats[result.status].count++
}
private createBatches<T>(items: T[], batchSize: number): T[][] {
const batches: T[][] = []
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize))
}
return batches
}
private async delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
}
private isRetryableError(error: any): boolean {
const retryableCodes = ['TIMEOUT', 'RATE_LIMIT', 'SERVER_ERROR', 'NETWORK_ERROR']
return retryableCodes.includes(error.code) || error.status >= 500
}
}