Files
akmon/push-receiver-service/server.js
2026-01-20 08:04:15 +08:00

557 lines
21 KiB
JavaScript

/**
* 推送消息接收服务主服务器
* 专门用于接收和存储各种推送消息
*/
const express = require('express');
const cors = require('cors');
const helmet = require('helmet');
const compression = require('compression');
const rateLimit = require('express-rate-limit');
const { body, validationResult } = require('express-validator');
const winston = require('winston');
const path = require('path');
const fs = require('fs');
require('dotenv').config();
const DatabaseManager = require('./lib/database');
class PushReceiverService {
constructor() {
this.app = express();
this.port = process.env.PORT || 3001;
this.host = process.env.HOST || '0.0.0.0';
this.db = new DatabaseManager();
this.setupLogger();
this.setupMiddleware();
this.setupRoutes();
this.setupErrorHandling();
// 统计信息
this.stats = {
startTime: new Date(),
requestCount: 0,
messageCount: 0,
errorCount: 0
};
}
/**
* 设置日志系统
*/
setupLogger() {
// 确保日志目录存在
const logDir = process.env.LOG_DIR || './logs';
if (!fs.existsSync(logDir)) {
fs.mkdirSync(logDir, { recursive: true });
}
this.logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
transports: [
new winston.transports.File({
filename: path.join(logDir, 'push-service-error.log'),
level: 'error',
maxsize: parseInt(process.env.LOG_FILE_MAX_SIZE) || 10485760,
maxFiles: parseInt(process.env.LOG_FILE_MAX_FILES) || 5
}),
new winston.transports.File({
filename: path.join(logDir, 'push-service.log'),
maxsize: parseInt(process.env.LOG_FILE_MAX_SIZE) || 10485760,
maxFiles: parseInt(process.env.LOG_FILE_MAX_FILES) || 5
}),
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
)
})
]
});
}
/**
* 设置中间件
*/
setupMiddleware() {
// 安全中间件
this.app.use(helmet({
contentSecurityPolicy: false
}));
// 压缩中间件
this.app.use(compression());
// CORS 配置
const corsOptions = {
origin: (origin, callback) => {
const allowedOrigins = process.env.ALLOWED_ORIGINS;
if (!allowedOrigins || allowedOrigins === '*') {
callback(null, true);
} else {
const origins = allowedOrigins.split(',');
if (origins.includes(origin) || !origin) {
callback(null, true);
} else {
callback(new Error('不允许的CORS来源'));
}
}
},
credentials: true,
optionsSuccessStatus: 200
};
this.app.use(cors(corsOptions));
// 请求体解析
this.app.use(express.json({
limit: process.env.MAX_MESSAGE_SIZE || '1mb'
}));
this.app.use(express.urlencoded({ extended: true }));
// API密钥验证中间件
this.app.use('/api', (req, res, next) => {
if (req.path === '/health') {
return next(); // 健康检查不需要验证
}
const apiKey = req.headers['x-api-key'] || req.headers['authorization'];
const expectedApiKey = process.env.API_KEY;
if (expectedApiKey && (!apiKey || apiKey !== expectedApiKey)) {
this.stats.errorCount++;
return res.status(401).json({
success: false,
error: 'UNAUTHORIZED',
message: 'API密钥无效或缺失'
});
}
next();
});
// 速率限制
const limiter = rateLimit({
windowMs: parseInt(process.env.RATE_LIMIT_WINDOW_MS) || 60000,
max: parseInt(process.env.RATE_LIMIT_MAX_REQUESTS) || 1000,
message: {
success: false,
error: 'RATE_LIMIT_EXCEEDED',
message: process.env.RATE_LIMIT_MESSAGE || '请求过于频繁,请稍后再试'
},
standardHeaders: true,
legacyHeaders: false
});
this.app.use('/api', limiter);
// 请求日志中间件
this.app.use((req, res, next) => {
const startTime = Date.now();
this.stats.requestCount++;
res.on('finish', () => {
const duration = Date.now() - startTime;
this.logger.info('HTTP Request', {
method: req.method,
url: req.url,
status: res.statusCode,
duration: `${duration}ms`,
ip: req.ip,
userAgent: req.get('User-Agent'),
contentLength: req.get('Content-Length') || 0
});
});
next();
});
}
/**
* 设置路由
*/
setupRoutes() {
// 根路径 - 服务信息
this.app.get('/', (req, res) => {
res.json({
name: 'Push Receiver Service',
version: '1.0.0',
description: '专门接收推送消息的独立服务',
status: 'running',
uptime: Math.floor((Date.now() - this.stats.startTime.getTime()) / 1000),
stats: {
...this.stats,
uptime: Math.floor((Date.now() - this.stats.startTime.getTime()) / 1000)
},
endpoints: {
pushMessage: 'POST /api/push/message',
pushBatch: 'POST /api/push/batch',
health: 'GET /api/health',
stats: 'GET /api/stats'
},
timestamp: new Date().toISOString()
});
});
// 健康检查
this.app.get('/api/health', async (req, res) => {
try {
const dbHealth = await this.db.getHealthStatus();
res.json({
status: 'OK',
service: 'Push Receiver Service',
version: '1.0.0',
timestamp: new Date().toISOString(),
uptime: Math.floor((Date.now() - this.stats.startTime.getTime()) / 1000),
database: dbHealth.database,
stats: this.stats
});
} catch (error) {
this.logger.error('健康检查失败', { error: error.message, stack: error.stack });
res.status(500).json({
status: 'ERROR',
service: 'Push Receiver Service',
error: error.message,
timestamp: new Date().toISOString()
});
}
});
// 接收单个推送消息
this.app.post('/api/push/message',
[
body('pushType').notEmpty().withMessage('推送类型不能为空'),
body('userId').optional().isString().withMessage('用户ID必须是字符串'),
body('deviceId').optional().isString().withMessage('设备ID必须是字符串')
],
async (req, res) => {
try {
// 验证请求数据
const errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({
success: false,
error: 'VALIDATION_ERROR',
message: '请求数据验证失败',
details: errors.array()
});
}
// 准备消息数据
const messageData = {
...req.body,
source_ip: req.ip,
user_agent: req.get('User-Agent'),
raw_data: req.body,
parsed_data: req.body,
received_at: new Date()
};
// 保存到数据库
const result = await this.db.insertPushMessage(messageData);
this.stats.messageCount++;
this.logger.info('推送消息接收成功', {
messageId: result.messageId,
pushType: req.body.pushType,
userId: req.body.userId,
deviceId: req.body.deviceId,
isDuplicate: result.isDuplicate,
processingTime: result.processingTime
});
res.json({
success: true,
message: '推送消息接收成功',
data: {
messageId: result.messageId,
isDuplicate: result.isDuplicate,
processingTime: result.processingTime,
timestamp: new Date().toISOString()
}
});
} catch (error) {
this.stats.errorCount++;
this.logger.error('推送消息接收失败', {
error: error.message,
stack: error.stack,
requestBody: req.body
});
res.status(500).json({
success: false,
error: 'MESSAGE_PROCESSING_ERROR',
message: '推送消息处理失败',
details: error.message
});
}
}
);
// 批量接收推送消息
this.app.post('/api/push/batch',
[
body('messages').isArray().withMessage('messages必须是数组'),
body('messages').custom((messages) => {
const maxBatchSize = parseInt(process.env.BATCH_SIZE_LIMIT) || 1000;
if (messages.length > maxBatchSize) {
throw new Error(`批量消息数量不能超过${maxBatchSize}`);
}
return true;
})
],
async (req, res) => {
try {
// 验证请求数据
const errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({
success: false,
error: 'VALIDATION_ERROR',
message: '请求数据验证失败',
details: errors.array()
});
}
const { messages } = req.body;
// 准备批量消息数据
const messagesData = messages.map(msg => ({
...msg,
source_ip: req.ip,
user_agent: req.get('User-Agent'),
raw_data: msg,
parsed_data: msg,
received_at: new Date()
}));
// 批量保存到数据库
const result = await this.db.insertPushMessagesBatch(messagesData);
this.stats.messageCount += result.successCount;
this.stats.errorCount += result.failureCount;
this.logger.info('批量推送消息接收完成', {
totalCount: result.totalCount,
successCount: result.successCount,
failureCount: result.failureCount,
processingTime: result.processingTime
});
res.json({
success: true,
message: `批量推送消息处理完成,成功${result.successCount}条,失败${result.failureCount}`,
data: {
totalCount: result.totalCount,
successCount: result.successCount,
failureCount: result.failureCount,
processingTime: result.processingTime,
timestamp: new Date().toISOString()
}
});
} catch (error) {
this.stats.errorCount++;
this.logger.error('批量推送消息接收失败', {
error: error.message,
stack: error.stack,
requestBody: req.body
});
res.status(500).json({
success: false,
error: 'BATCH_PROCESSING_ERROR',
message: '批量推送消息处理失败',
details: error.message
});
}
}
);
// 获取统计信息
this.app.get('/api/stats', async (req, res) => {
try {
const hoursBack = parseInt(req.query.hours) || 24;
const messageStats = await this.db.getMessageStats(hoursBack);
res.json({
success: true,
data: {
service: {
uptime: Math.floor((Date.now() - this.stats.startTime.getTime()) / 1000),
...this.stats
},
messages: messageStats,
period: `最近${hoursBack}小时`
},
timestamp: new Date().toISOString()
});
} catch (error) {
this.logger.error('获取统计信息失败', { error: error.message, stack: error.stack });
res.status(500).json({
success: false,
error: 'STATS_ERROR',
message: '获取统计信息失败',
details: error.message
});
}
});
}
/**
* 设置错误处理
*/
setupErrorHandling() {
// 404 处理
this.app.use((req, res) => {
res.status(404).json({
success: false,
error: 'NOT_FOUND',
message: '请求的端点不存在',
path: req.path,
method: req.method,
availableEndpoints: [
'GET /',
'GET /api/health',
'POST /api/push/message',
'POST /api/push/batch',
'GET /api/stats'
]
});
});
// 全局错误处理
this.app.use((error, req, res, next) => {
this.stats.errorCount++;
this.logger.error('未处理的错误', {
error: error.message,
stack: error.stack,
url: req.url,
method: req.method
});
res.status(500).json({
success: false,
error: 'INTERNAL_SERVER_ERROR',
message: '服务器内部错误',
details: process.env.NODE_ENV === 'development' ? error.message : undefined
});
});
}
/**
* 启动服务器
*/
async start() {
try {
// 测试数据库连接
await this.db.testConnection();
// 启动HTTP服务器
this.server = this.app.listen(this.port, this.host, () => {
console.log('🚀 推送消息接收服务启动成功!');
console.log(`📍 服务器地址: http://${this.host}:${this.port}`);
console.log('');
console.log('📋 API 端点:');
console.log(` - 服务信息: http://${this.host}:${this.port}/`);
console.log(` - 健康检查: http://${this.host}:${this.port}/api/health`);
console.log(` - 推送消息: http://${this.host}:${this.port}/api/push/message`);
console.log(` - 批量推送: http://${this.host}:${this.port}/api/push/batch`);
console.log(` - 统计信息: http://${this.host}:${this.port}/api/stats`);
console.log('');
console.log('🗄️ 数据库连接: ✅ 正常');
console.log('🔒 API密钥保护: ' + (process.env.API_KEY ? '✅ 启用' : '❌ 未设置'));
console.log('📊 请求速率限制: ✅ 启用');
console.log('');
console.log('✅ 服务已准备就绪,等待接收推送消息...');
console.log('💡 按 Ctrl+C 停止服务');
this.logger.info('推送消息接收服务启动完成', {
port: this.port,
host: this.host,
nodeEnv: process.env.NODE_ENV,
apiKeyEnabled: !!process.env.API_KEY
});
});
// 设置优雅关闭
this.setupGracefulShutdown();
} catch (error) {
this.logger.error('服务启动失败', { error: error.message, stack: error.stack });
process.exit(1);
}
}
/**
* 设置优雅关闭
*/
setupGracefulShutdown() {
const shutdown = async (signal) => {
console.log(`\n📴 收到${signal}信号,正在优雅关闭服务...`);
this.logger.info(`收到${signal}信号,开始关闭服务`);
// 停止接收新请求
if (this.server) {
this.server.close(async () => {
console.log('🌐 HTTP服务器已关闭');
this.logger.info('HTTP服务器已关闭');
// 关闭数据库连接
try {
await this.db.close();
console.log('🗄️ 数据库连接已关闭');
this.logger.info('数据库连接已关闭');
} catch (error) {
console.error('❌ 关闭数据库连接失败:', error.message);
this.logger.error('关闭数据库连接失败', { error: error.message });
}
console.log('✅ 服务已安全关闭');
this.logger.info('服务已安全关闭');
process.exit(0);
});
// 设置强制关闭超时
setTimeout(() => {
console.error('❌ 强制关闭服务(超时)');
this.logger.error('强制关闭服务(超时)');
process.exit(1);
}, 30000);
}
};
process.on('SIGINT', () => shutdown('SIGINT'));
process.on('SIGTERM', () => shutdown('SIGTERM'));
// 捕获未处理的异常
process.on('uncaughtException', (error) => {
console.error('💥 未捕获的异常:', error);
this.logger.error('未捕获的异常', { error: error.message, stack: error.stack });
process.exit(1);
});
process.on('unhandledRejection', (reason, promise) => {
console.error('💥 未处理的 Promise 拒绝:', reason);
this.logger.error('未处理的 Promise 拒绝', { reason: reason?.message || reason });
process.exit(1);
});
}
}
// 启动服务
if (require.main === module) {
const service = new PushReceiverService();
service.start();
}
module.exports = PushReceiverService;