/** * Supabase 数据库连接和操作类 * 提供基于 Supabase 的数据库操作方法 */ const { createClient } = require('@supabase/supabase-js'); const winston = require('winston'); const crypto = require('crypto'); require('dotenv').config(); class SupabaseDatabaseManager { constructor() { this.supabase = null; this.logger = winston.createLogger({ level: process.env.LOG_LEVEL || 'info', format: winston.format.combine( winston.format.timestamp(), winston.format.errors({ stack: true }), winston.format.json() ), transports: [ new winston.transports.Console(), new winston.transports.File({ filename: 'logs/database.log' }) ] }); this.initializeClient(); } /** * 初始化 Supabase 客户端 */ initializeClient() { try { const supabaseUrl = process.env.SUPABASE_URL; const supabaseKey = process.env.SUPABASE_SERVICE_ROLE_KEY; if (!supabaseUrl || !supabaseKey) { throw new Error('Supabase URL 和 Service Role Key 必须在环境变量中设置'); } const eldercare_user = process.env.ELDERCARE_USER; const eldercare_password = process.env.ELDERCARE_PASSWORD; this.supabase = createClient(supabaseUrl, supabaseKey, { auth: { autoRefreshToken: true, persistSession: true }, db: { schema: 'public' } }); this.supabase.auth.signInWithPassword({ email: eldercare_user, password: eldercare_password }).then(({ data, error }) => { if (error) { this.logger.error('Supabase 用户认证失败', { error: error.message, stack: error.stack }); throw error; } }); this.logger.info('Supabase 客户端初始化完成', { url: supabaseUrl, hasServiceKey: !!supabaseKey }); } catch (error) { this.logger.error('Supabase 客户端初始化失败', { error: error.message, stack: error.stack }); throw error; } } /** * 测试数据库连接 */ async testConnection() { try { const { data, error } = await this.supabase .from('ps_push_types') .select('count') .limit(1); if (error) { throw error; } this.logger.info('Supabase 数据库连接测试成功'); return { success: true, message: 'Supabase 数据库连接正常', timestamp: new Date().toISOString() }; } catch (error) { this.logger.error('Supabase 数据库连接测试失败', { error: error.message, stack: error.stack }); throw error; } } /** * 插入推送消息 */ async insertPushMessage(messageData) { const startTime = Date.now(); try { // 生成消息校验和用于去重 const checksum = this.generateChecksum(messageData); // 检查是否存在重复消息 const { data: duplicateData } = await this.supabase .from('ps_push_msg_raw') .select('id') .eq('checksum', checksum) .eq('is_deleted', false) .limit(1); let isDuplicate = false; let originalMessageId = null; if (duplicateData && duplicateData.length > 0) { isDuplicate = true; originalMessageId = duplicateData[0].id; } console.log('isDuplicate', isDuplicate, 'originalMessageId', originalMessageId); // 准备插入数据 const insertData = { checksum: checksum, push_type: messageData.push_type || messageData.pushType, source_ip: messageData.source_ip, user_agent: messageData.user_agent, raw_data: messageData.raw_data || messageData }; console.log('insertData', insertData); // 插入推送消息 const { data: messageResult, error: messageError } = await this.supabase .from('ps_push_msg_raw') .insert(insertData) .select('id') .single(); console.log('messageResult', messageResult); // 检查插入结果 if (messageError) { this.logger.error('插入推送消息失败', { error: messageError.message, data: insertData }); throw messageError; } const messageId = messageResult.id; // 记录处理日志 await this.supabase .from('message_processing_logs') .insert({ message_id: messageId, processing_step: 'message_received', status: 'completed', started_at: new Date(startTime).toISOString(), completed_at: new Date().toISOString(), duration_ms: Date.now() - startTime, details: { isDuplicate, checksum } }); // 更新设备信息 if (messageData.device_id || messageData.deviceId) { await this.upsertDevice({ device_id: messageData.device_id || messageData.deviceId, device_name: messageData.device_name, device_type: messageData.device_type, user_id: messageData.user_id || messageData.userId, metadata: messageData.device_metadata || {} }); } // 更新用户信息 if (messageData.user_id || messageData.userId) { await this.upsertUser({ user_id: messageData.user_id || messageData.userId, user_name: messageData.user_name, user_type: messageData.user_type }); } this.logger.info('推送消息插入成功', { messageId, pushType: messageData.push_type || messageData.pushType, userId: messageData.user_id || messageData.userId, isDuplicate, processingTime: Date.now() - startTime }); return { success: true, messageId, isDuplicate, processingTime: Date.now() - startTime }; } catch (error) { this.logger.error('推送消息插入失败', { error: error.message, stack: error.stack, messageData: messageData }); throw error; } } /** * 批量插入推送消息 */ async insertPushMessagesBatch(messagesData) { const startTime = Date.now(); try { const results = []; // 使用事务批量插入 for (const messageData of messagesData) { try { const result = await this.insertPushMessage(messageData); results.push(result); } catch (error) { results.push({ success: false, error: error.message, messageData: messageData }); } } const successCount = results.filter(r => r.success).length; const failureCount = results.filter(r => !r.success).length; this.logger.info('批量推送消息插入完成', { totalCount: messagesData.length, successCount, failureCount, processingTime: Date.now() - startTime }); return { success: true, totalCount: messagesData.length, successCount, failureCount, results, processingTime: Date.now() - startTime }; } catch (error) { this.logger.error('批量推送消息插入失败', { error: error.message, stack: error.stack }); throw error; } } /** * 更新或插入设备信息 - 适配现有 ak_devices 表 */ async upsertDevice(deviceData) { try { // 检查设备是否存在 const { data: existingDevice, error: selectError } = await this.supabase .from('ak_devices') .select('id') .eq('id', deviceData.device_id) .single(); if (selectError && selectError.code !== 'PGRST116') { // PGRST116 是没有找到记录的错误 throw selectError; } // 如果设备存在,更新状态;如果不存在,记录日志(设备应该通过其他接口创建) if (existingDevice) { const { error } = await this.supabase .from('ak_devices') .update({ status: 'active', extra: { ...deviceData.metadata, last_push_at: new Date().toISOString() } }) .eq('id', deviceData.device_id); if (error) { throw error; } this.logger.info('设备状态更新成功', { device_id: deviceData.device_id }); } else { this.logger.warn('推送消息关联的设备不存在', { device_id: deviceData.device_id, message: '设备需要通过设备管理接口先创建' }); } } catch (error) { this.logger.error('设备信息更新失败', { error: error.message, deviceData: deviceData }); } } /** * 更新或插入用户信息 - 适配现有 ak_users 表 */ async upsertUser(userData) { try { // 检查用户是否存在 const { data: existingUser, error: selectError } = await this.supabase .from('ak_users') .select('id') .eq('id', userData.user_id) .single(); if (selectError && selectError.code !== 'PGRST116') { // PGRST116 是没有找到记录的错误 throw selectError; } // 如果用户存在,记录活跃状态;如果不存在,记录日志(用户应该通过其他接口创建) if (existingUser) { this.logger.info('用户推送消息活跃', { user_id: userData.user_id }); } else { this.logger.warn('推送消息关联的用户不存在', { user_id: userData.user_id, message: '用户需要通过用户管理接口先创建' }); } } catch (error) { this.logger.error('用户信息检查失败', { error: error.message, userData: userData }); } } /** * 生成消息校验和 */ generateChecksum(data) { const normalizedData = JSON.stringify(data, Object.keys(data).sort()); return crypto.createHash('sha256').update(normalizedData).digest('hex'); } /** * 获取消息统计信息 */ async getMessageStats(hoursBack = 24) { try { const { data, error } = await this.supabase .rpc('get_message_stats', { hours_back: hoursBack }); if (error) { throw error; } return data || []; } catch (error) { this.logger.error('获取消息统计失败', { error: error.message, stack: error.stack }); throw error; } } /** * 获取系统健康状态 */ async getHealthStatus() { try { // 获取消息统计 const { data: messageStats, error: statsError } = await this.supabase .from('ps_push_messages') .select('processing_status', { count: 'exact' }) .gte('received_at', new Date(Date.now() - 60 * 60 * 1000).toISOString()) .eq('is_deleted', false); if (statsError) { throw statsError; } // 统计不同状态的消息数量 const stats = { total_messages: 0, messages_last_hour: messageStats?.length || 0, pending_messages: 0, failed_messages: 0, processed_messages: 0 }; // 获取总消息数 const { count: totalCount } = await this.supabase .from('ps_push_messages') .select('*', { count: 'exact', head: true }) .eq('is_deleted', false); stats.total_messages = totalCount || 0; // 获取待处理和失败的消息数 const { count: pendingCount } = await this.supabase .from('ps_push_messages') .select('*', { count: 'exact', head: true }) .eq('processing_status', 'pending') .eq('is_deleted', false); const { count: failedCount } = await this.supabase .from('ps_push_messages') .select('*', { count: 'exact', head: true }) .eq('processing_status', 'failed') .eq('is_deleted', false); stats.pending_messages = pendingCount || 0; stats.failed_messages = failedCount || 0; stats.processed_messages = stats.total_messages - stats.pending_messages - stats.failed_messages; return { database: { connected: true, provider: 'Supabase', stats: stats }, timestamp: new Date().toISOString() }; } catch (error) { this.logger.error('获取系统健康状态失败', { error: error.message, stack: error.stack }); return { database: { connected: false, provider: 'Supabase', error: error.message }, timestamp: new Date().toISOString() }; } } /** * 获取推送消息列表 */ async getPushMessages(options = {}) { try { const { limit = 50, offset = 0, pushType = null, userId = null, startDate = null, endDate = null, status = null } = options; let query = this.supabase .from('ps_push_messages') .select(` id, push_type, user_id, device_id, received_at, processing_status, priority, raw_data, is_duplicate `) .eq('is_deleted', false) .order('received_at', { ascending: false }) .range(offset, offset + limit - 1); if (pushType) { query = query.eq('push_type', pushType); } if (userId) { query = query.eq('user_id', userId); } if (status) { query = query.eq('processing_status', status); } if (startDate) { query = query.gte('received_at', startDate); } if (endDate) { query = query.lte('received_at', endDate); } const { data, error } = await query; if (error) { throw error; } return data || []; } catch (error) { this.logger.error('获取推送消息列表失败', { error: error.message, stack: error.stack }); throw error; } } /** * 清理旧数据 */ async cleanupOldMessages(daysToKeep = 30) { try { const cutoffDate = new Date(Date.now() - daysToKeep * 24 * 60 * 60 * 1000).toISOString(); const { data, error } = await this.supabase .from('ps_push_messages') .update({ is_deleted: true, deleted_at: new Date().toISOString() }) .lt('received_at', cutoffDate) .eq('is_deleted', false) .select('id'); if (error) { throw error; } const deletedCount = data?.length || 0; this.logger.info('清理旧数据完成', { daysToKeep, deletedCount, cutoffDate }); return deletedCount; } catch (error) { this.logger.error('清理旧数据失败', { error: error.message, stack: error.stack }); throw error; } } /** * 关闭连接(Supabase 客户端不需要显式关闭) */ async close() { this.logger.info('Supabase 客户端连接已关闭'); } } module.exports = SupabaseDatabaseManager;