/** * 推送消息接收服务主服务器 * 专门用于接收和存储各种推送消息 */ const express = require('express'); const cors = require('cors'); const helmet = require('helmet'); const compression = require('compression'); const rateLimit = require('express-rate-limit'); const { body, validationResult } = require('express-validator'); const winston = require('winston'); const path = require('path'); const fs = require('fs'); require('dotenv').config(); const DatabaseManager = require('./lib/database'); class PushReceiverService { constructor() { this.app = express(); this.port = process.env.PORT || 3001; this.host = process.env.HOST || '0.0.0.0'; this.db = new DatabaseManager(); this.setupLogger(); this.setupMiddleware(); this.setupRoutes(); this.setupErrorHandling(); // 统计信息 this.stats = { startTime: new Date(), requestCount: 0, messageCount: 0, errorCount: 0 }; } /** * 设置日志系统 */ setupLogger() { // 确保日志目录存在 const logDir = process.env.LOG_DIR || './logs'; if (!fs.existsSync(logDir)) { fs.mkdirSync(logDir, { recursive: true }); } this.logger = winston.createLogger({ level: process.env.LOG_LEVEL || 'info', format: winston.format.combine( winston.format.timestamp(), winston.format.errors({ stack: true }), winston.format.json() ), transports: [ new winston.transports.File({ filename: path.join(logDir, 'push-service-error.log'), level: 'error', maxsize: parseInt(process.env.LOG_FILE_MAX_SIZE) || 10485760, maxFiles: parseInt(process.env.LOG_FILE_MAX_FILES) || 5 }), new winston.transports.File({ filename: path.join(logDir, 'push-service.log'), maxsize: parseInt(process.env.LOG_FILE_MAX_SIZE) || 10485760, maxFiles: parseInt(process.env.LOG_FILE_MAX_FILES) || 5 }), new winston.transports.Console({ format: winston.format.combine( winston.format.colorize(), winston.format.simple() ) }) ] }); } /** * 设置中间件 */ setupMiddleware() { // 安全中间件 this.app.use(helmet({ contentSecurityPolicy: false })); // 压缩中间件 this.app.use(compression()); // CORS 配置 const corsOptions = { origin: (origin, callback) => { const allowedOrigins = process.env.ALLOWED_ORIGINS; if (!allowedOrigins || allowedOrigins === '*') { callback(null, true); } else { const origins = allowedOrigins.split(','); if (origins.includes(origin) || !origin) { callback(null, true); } else { callback(new Error('不允许的CORS来源')); } } }, credentials: true, optionsSuccessStatus: 200 }; this.app.use(cors(corsOptions)); // 请求体解析 this.app.use(express.json({ limit: process.env.MAX_MESSAGE_SIZE || '1mb' })); this.app.use(express.urlencoded({ extended: true })); // API密钥验证中间件 this.app.use('/api', (req, res, next) => { if (req.path === '/health') { return next(); // 健康检查不需要验证 } const apiKey = req.headers['x-api-key'] || req.headers['authorization']; const expectedApiKey = process.env.API_KEY; if (expectedApiKey && (!apiKey || apiKey !== expectedApiKey)) { this.stats.errorCount++; return res.status(401).json({ success: false, error: 'UNAUTHORIZED', message: 'API密钥无效或缺失' }); } next(); }); // 速率限制 const limiter = rateLimit({ windowMs: parseInt(process.env.RATE_LIMIT_WINDOW_MS) || 60000, max: parseInt(process.env.RATE_LIMIT_MAX_REQUESTS) || 1000, message: { success: false, error: 'RATE_LIMIT_EXCEEDED', message: process.env.RATE_LIMIT_MESSAGE || '请求过于频繁,请稍后再试' }, standardHeaders: true, legacyHeaders: false }); this.app.use('/api', limiter); // 请求日志中间件 this.app.use((req, res, next) => { const startTime = Date.now(); this.stats.requestCount++; res.on('finish', () => { const duration = Date.now() - startTime; this.logger.info('HTTP Request', { method: req.method, url: req.url, status: res.statusCode, duration: `${duration}ms`, ip: req.ip, userAgent: req.get('User-Agent'), contentLength: req.get('Content-Length') || 0 }); }); next(); }); } /** * 设置路由 */ setupRoutes() { // 根路径 - 服务信息 this.app.get('/', (req, res) => { res.json({ name: 'Push Receiver Service', version: '1.0.0', description: '专门接收推送消息的独立服务', status: 'running', uptime: Math.floor((Date.now() - this.stats.startTime.getTime()) / 1000), stats: { ...this.stats, uptime: Math.floor((Date.now() - this.stats.startTime.getTime()) / 1000) }, endpoints: { pushMessage: 'POST /api/push/message', pushBatch: 'POST /api/push/batch', health: 'GET /api/health', stats: 'GET /api/stats' }, timestamp: new Date().toISOString() }); }); // 健康检查 this.app.get('/api/health', async (req, res) => { try { const dbHealth = await this.db.getHealthStatus(); res.json({ status: 'OK', service: 'Push Receiver Service', version: '1.0.0', timestamp: new Date().toISOString(), uptime: Math.floor((Date.now() - this.stats.startTime.getTime()) / 1000), database: dbHealth.database, stats: this.stats }); } catch (error) { this.logger.error('健康检查失败', { error: error.message, stack: error.stack }); res.status(500).json({ status: 'ERROR', service: 'Push Receiver Service', error: error.message, timestamp: new Date().toISOString() }); } }); // 接收单个推送消息 this.app.post('/api/push/message', [ body('pushType').notEmpty().withMessage('推送类型不能为空'), body('userId').optional().isString().withMessage('用户ID必须是字符串'), body('deviceId').optional().isString().withMessage('设备ID必须是字符串') ], async (req, res) => { try { // 验证请求数据 const errors = validationResult(req); if (!errors.isEmpty()) { return res.status(400).json({ success: false, error: 'VALIDATION_ERROR', message: '请求数据验证失败', details: errors.array() }); } // 准备消息数据 const messageData = { ...req.body, source_ip: req.ip, user_agent: req.get('User-Agent'), raw_data: req.body, parsed_data: req.body, received_at: new Date() }; // 保存到数据库 const result = await this.db.insertPushMessage(messageData); this.stats.messageCount++; this.logger.info('推送消息接收成功', { messageId: result.messageId, pushType: req.body.pushType, userId: req.body.userId, deviceId: req.body.deviceId, isDuplicate: result.isDuplicate, processingTime: result.processingTime }); res.json({ success: true, message: '推送消息接收成功', data: { messageId: result.messageId, isDuplicate: result.isDuplicate, processingTime: result.processingTime, timestamp: new Date().toISOString() } }); } catch (error) { this.stats.errorCount++; this.logger.error('推送消息接收失败', { error: error.message, stack: error.stack, requestBody: req.body }); res.status(500).json({ success: false, error: 'MESSAGE_PROCESSING_ERROR', message: '推送消息处理失败', details: error.message }); } } ); // 批量接收推送消息 this.app.post('/api/push/batch', [ body('messages').isArray().withMessage('messages必须是数组'), body('messages').custom((messages) => { const maxBatchSize = parseInt(process.env.BATCH_SIZE_LIMIT) || 1000; if (messages.length > maxBatchSize) { throw new Error(`批量消息数量不能超过${maxBatchSize}条`); } return true; }) ], async (req, res) => { try { // 验证请求数据 const errors = validationResult(req); if (!errors.isEmpty()) { return res.status(400).json({ success: false, error: 'VALIDATION_ERROR', message: '请求数据验证失败', details: errors.array() }); } const { messages } = req.body; // 准备批量消息数据 const messagesData = messages.map(msg => ({ ...msg, source_ip: req.ip, user_agent: req.get('User-Agent'), raw_data: msg, parsed_data: msg, received_at: new Date() })); // 批量保存到数据库 const result = await this.db.insertPushMessagesBatch(messagesData); this.stats.messageCount += result.successCount; this.stats.errorCount += result.failureCount; this.logger.info('批量推送消息接收完成', { totalCount: result.totalCount, successCount: result.successCount, failureCount: result.failureCount, processingTime: result.processingTime }); res.json({ success: true, message: `批量推送消息处理完成,成功${result.successCount}条,失败${result.failureCount}条`, data: { totalCount: result.totalCount, successCount: result.successCount, failureCount: result.failureCount, processingTime: result.processingTime, timestamp: new Date().toISOString() } }); } catch (error) { this.stats.errorCount++; this.logger.error('批量推送消息接收失败', { error: error.message, stack: error.stack, requestBody: req.body }); res.status(500).json({ success: false, error: 'BATCH_PROCESSING_ERROR', message: '批量推送消息处理失败', details: error.message }); } } ); // 获取统计信息 this.app.get('/api/stats', async (req, res) => { try { const hoursBack = parseInt(req.query.hours) || 24; const messageStats = await this.db.getMessageStats(hoursBack); res.json({ success: true, data: { service: { uptime: Math.floor((Date.now() - this.stats.startTime.getTime()) / 1000), ...this.stats }, messages: messageStats, period: `最近${hoursBack}小时` }, timestamp: new Date().toISOString() }); } catch (error) { this.logger.error('获取统计信息失败', { error: error.message, stack: error.stack }); res.status(500).json({ success: false, error: 'STATS_ERROR', message: '获取统计信息失败', details: error.message }); } }); } /** * 设置错误处理 */ setupErrorHandling() { // 404 处理 this.app.use((req, res) => { res.status(404).json({ success: false, error: 'NOT_FOUND', message: '请求的端点不存在', path: req.path, method: req.method, availableEndpoints: [ 'GET /', 'GET /api/health', 'POST /api/push/message', 'POST /api/push/batch', 'GET /api/stats' ] }); }); // 全局错误处理 this.app.use((error, req, res, next) => { this.stats.errorCount++; this.logger.error('未处理的错误', { error: error.message, stack: error.stack, url: req.url, method: req.method }); res.status(500).json({ success: false, error: 'INTERNAL_SERVER_ERROR', message: '服务器内部错误', details: process.env.NODE_ENV === 'development' ? error.message : undefined }); }); } /** * 启动服务器 */ async start() { try { // 测试数据库连接 await this.db.testConnection(); // 启动HTTP服务器 this.server = this.app.listen(this.port, this.host, () => { console.log('🚀 推送消息接收服务启动成功!'); console.log(`📍 服务器地址: http://${this.host}:${this.port}`); console.log(''); console.log('📋 API 端点:'); console.log(` - 服务信息: http://${this.host}:${this.port}/`); console.log(` - 健康检查: http://${this.host}:${this.port}/api/health`); console.log(` - 推送消息: http://${this.host}:${this.port}/api/push/message`); console.log(` - 批量推送: http://${this.host}:${this.port}/api/push/batch`); console.log(` - 统计信息: http://${this.host}:${this.port}/api/stats`); console.log(''); console.log('🗄️ 数据库连接: ✅ 正常'); console.log('🔒 API密钥保护: ' + (process.env.API_KEY ? '✅ 启用' : '❌ 未设置')); console.log('📊 请求速率限制: ✅ 启用'); console.log(''); console.log('✅ 服务已准备就绪,等待接收推送消息...'); console.log('💡 按 Ctrl+C 停止服务'); this.logger.info('推送消息接收服务启动完成', { port: this.port, host: this.host, nodeEnv: process.env.NODE_ENV, apiKeyEnabled: !!process.env.API_KEY }); }); // 设置优雅关闭 this.setupGracefulShutdown(); } catch (error) { this.logger.error('服务启动失败', { error: error.message, stack: error.stack }); process.exit(1); } } /** * 设置优雅关闭 */ setupGracefulShutdown() { const shutdown = async (signal) => { console.log(`\n📴 收到${signal}信号,正在优雅关闭服务...`); this.logger.info(`收到${signal}信号,开始关闭服务`); // 停止接收新请求 if (this.server) { this.server.close(async () => { console.log('🌐 HTTP服务器已关闭'); this.logger.info('HTTP服务器已关闭'); // 关闭数据库连接 try { await this.db.close(); console.log('🗄️ 数据库连接已关闭'); this.logger.info('数据库连接已关闭'); } catch (error) { console.error('❌ 关闭数据库连接失败:', error.message); this.logger.error('关闭数据库连接失败', { error: error.message }); } console.log('✅ 服务已安全关闭'); this.logger.info('服务已安全关闭'); process.exit(0); }); // 设置强制关闭超时 setTimeout(() => { console.error('❌ 强制关闭服务(超时)'); this.logger.error('强制关闭服务(超时)'); process.exit(1); }, 30000); } }; process.on('SIGINT', () => shutdown('SIGINT')); process.on('SIGTERM', () => shutdown('SIGTERM')); // 捕获未处理的异常 process.on('uncaughtException', (error) => { console.error('💥 未捕获的异常:', error); this.logger.error('未捕获的异常', { error: error.message, stack: error.stack }); process.exit(1); }); process.on('unhandledRejection', (reason, promise) => { console.error('💥 未处理的 Promise 拒绝:', reason); this.logger.error('未处理的 Promise 拒绝', { reason: reason?.message || reason }); process.exit(1); }); } } // 启动服务 if (require.main === module) { const service = new PushReceiverService(); service.start(); } module.exports = PushReceiverService;