Files
akmon/push-receiver-service/lib/supabase-database.js
2026-01-20 08:04:15 +08:00

572 lines
19 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Supabase 数据库连接和操作类
* 提供基于 Supabase 的数据库操作方法
*/
const { createClient } = require('@supabase/supabase-js');
const winston = require('winston');
const crypto = require('crypto');
require('dotenv').config();
class SupabaseDatabaseManager {
constructor() {
this.supabase = null;
this.logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: 'logs/database.log' })
]
});
this.initializeClient();
}
/**
* 初始化 Supabase 客户端
*/
initializeClient() {
try {
const supabaseUrl = process.env.SUPABASE_URL;
const supabaseKey = process.env.SUPABASE_SERVICE_ROLE_KEY;
if (!supabaseUrl || !supabaseKey) {
throw new Error('Supabase URL 和 Service Role Key 必须在环境变量中设置');
}
const eldercare_user = process.env.ELDERCARE_USER;
const eldercare_password = process.env.ELDERCARE_PASSWORD;
this.supabase = createClient(supabaseUrl, supabaseKey, {
auth: {
autoRefreshToken: true,
persistSession: true
},
db: {
schema: 'public'
}
});
this.supabase.auth.signInWithPassword({
email: eldercare_user,
password: eldercare_password
}).then(({ data, error }) => {
if (error) {
this.logger.error('Supabase 用户认证失败', {
error: error.message,
stack: error.stack
});
throw error;
}
});
this.logger.info('Supabase 客户端初始化完成', {
url: supabaseUrl,
hasServiceKey: !!supabaseKey
});
} catch (error) {
this.logger.error('Supabase 客户端初始化失败', {
error: error.message,
stack: error.stack
});
throw error;
}
}
/**
* 测试数据库连接
*/
async testConnection() {
try {
const { data, error } = await this.supabase
.from('ps_push_types')
.select('count')
.limit(1);
if (error) {
throw error;
}
this.logger.info('Supabase 数据库连接测试成功');
return {
success: true,
message: 'Supabase 数据库连接正常',
timestamp: new Date().toISOString()
};
} catch (error) {
this.logger.error('Supabase 数据库连接测试失败', {
error: error.message,
stack: error.stack
});
throw error;
}
}
/**
* 插入推送消息
*/
async insertPushMessage(messageData) {
const startTime = Date.now();
try {
// 生成消息校验和用于去重
const checksum = this.generateChecksum(messageData);
// 检查是否存在重复消息
const { data: duplicateData } = await this.supabase
.from('ps_push_msg_raw')
.select('id')
.eq('checksum', checksum)
.eq('is_deleted', false)
.limit(1);
let isDuplicate = false;
let originalMessageId = null;
if (duplicateData && duplicateData.length > 0) {
isDuplicate = true;
originalMessageId = duplicateData[0].id;
}
console.log('isDuplicate', isDuplicate, 'originalMessageId', originalMessageId);
// 准备插入数据
const insertData = {
checksum: checksum,
push_type: messageData.push_type || messageData.pushType,
source_ip: messageData.source_ip,
user_agent: messageData.user_agent,
raw_data: messageData.raw_data || messageData
};
console.log('insertData', insertData);
// 插入推送消息
const { data: messageResult, error: messageError } = await this.supabase
.from('ps_push_msg_raw')
.insert(insertData)
.select('id')
.single();
console.log('messageResult', messageResult);
// 检查插入结果
if (messageError) {
this.logger.error('插入推送消息失败', {
error: messageError.message,
data: insertData
});
throw messageError;
}
const messageId = messageResult.id;
// 记录处理日志
await this.supabase
.from('message_processing_logs')
.insert({
message_id: messageId,
processing_step: 'message_received',
status: 'completed',
started_at: new Date(startTime).toISOString(),
completed_at: new Date().toISOString(),
duration_ms: Date.now() - startTime,
details: { isDuplicate, checksum }
});
// 更新设备信息
if (messageData.device_id || messageData.deviceId) {
await this.upsertDevice({
device_id: messageData.device_id || messageData.deviceId,
device_name: messageData.device_name,
device_type: messageData.device_type,
user_id: messageData.user_id || messageData.userId,
metadata: messageData.device_metadata || {}
});
}
// 更新用户信息
if (messageData.user_id || messageData.userId) {
await this.upsertUser({
user_id: messageData.user_id || messageData.userId,
user_name: messageData.user_name,
user_type: messageData.user_type
});
}
this.logger.info('推送消息插入成功', {
messageId,
pushType: messageData.push_type || messageData.pushType,
userId: messageData.user_id || messageData.userId,
isDuplicate,
processingTime: Date.now() - startTime
});
return {
success: true,
messageId,
isDuplicate,
processingTime: Date.now() - startTime
};
} catch (error) {
this.logger.error('推送消息插入失败', {
error: error.message,
stack: error.stack,
messageData: messageData
});
throw error;
}
}
/**
* 批量插入推送消息
*/
async insertPushMessagesBatch(messagesData) {
const startTime = Date.now();
try {
const results = [];
// 使用事务批量插入
for (const messageData of messagesData) {
try {
const result = await this.insertPushMessage(messageData);
results.push(result);
} catch (error) {
results.push({
success: false,
error: error.message,
messageData: messageData
});
}
}
const successCount = results.filter(r => r.success).length;
const failureCount = results.filter(r => !r.success).length;
this.logger.info('批量推送消息插入完成', {
totalCount: messagesData.length,
successCount,
failureCount,
processingTime: Date.now() - startTime
});
return {
success: true,
totalCount: messagesData.length,
successCount,
failureCount,
results,
processingTime: Date.now() - startTime
};
} catch (error) {
this.logger.error('批量推送消息插入失败', {
error: error.message,
stack: error.stack
});
throw error;
}
}
/**
* 更新或插入设备信息 - 适配现有 ak_devices 表
*/
async upsertDevice(deviceData) {
try {
// 检查设备是否存在
const { data: existingDevice, error: selectError } = await this.supabase
.from('ak_devices')
.select('id')
.eq('id', deviceData.device_id)
.single();
if (selectError && selectError.code !== 'PGRST116') { // PGRST116 是没有找到记录的错误
throw selectError;
}
// 如果设备存在,更新状态;如果不存在,记录日志(设备应该通过其他接口创建)
if (existingDevice) {
const { error } = await this.supabase
.from('ak_devices')
.update({
status: 'active',
extra: {
...deviceData.metadata,
last_push_at: new Date().toISOString()
}
})
.eq('id', deviceData.device_id);
if (error) {
throw error;
}
this.logger.info('设备状态更新成功', { device_id: deviceData.device_id });
} else {
this.logger.warn('推送消息关联的设备不存在', {
device_id: deviceData.device_id,
message: '设备需要通过设备管理接口先创建'
});
}
} catch (error) {
this.logger.error('设备信息更新失败', {
error: error.message,
deviceData: deviceData
});
}
}
/**
* 更新或插入用户信息 - 适配现有 ak_users 表
*/
async upsertUser(userData) {
try {
// 检查用户是否存在
const { data: existingUser, error: selectError } = await this.supabase
.from('ak_users')
.select('id')
.eq('id', userData.user_id)
.single();
if (selectError && selectError.code !== 'PGRST116') { // PGRST116 是没有找到记录的错误
throw selectError;
}
// 如果用户存在,记录活跃状态;如果不存在,记录日志(用户应该通过其他接口创建)
if (existingUser) {
this.logger.info('用户推送消息活跃', { user_id: userData.user_id });
} else {
this.logger.warn('推送消息关联的用户不存在', {
user_id: userData.user_id,
message: '用户需要通过用户管理接口先创建'
});
}
} catch (error) {
this.logger.error('用户信息检查失败', {
error: error.message,
userData: userData
});
}
}
/**
* 生成消息校验和
*/
generateChecksum(data) {
const normalizedData = JSON.stringify(data, Object.keys(data).sort());
return crypto.createHash('sha256').update(normalizedData).digest('hex');
}
/**
* 获取消息统计信息
*/
async getMessageStats(hoursBack = 24) {
try {
const { data, error } = await this.supabase
.rpc('get_message_stats', { hours_back: hoursBack });
if (error) {
throw error;
}
return data || [];
} catch (error) {
this.logger.error('获取消息统计失败', {
error: error.message,
stack: error.stack
});
throw error;
}
}
/**
* 获取系统健康状态
*/
async getHealthStatus() {
try { // 获取消息统计
const { data: messageStats, error: statsError } = await this.supabase
.from('ps_push_messages')
.select('processing_status', { count: 'exact' })
.gte('received_at', new Date(Date.now() - 60 * 60 * 1000).toISOString())
.eq('is_deleted', false);
if (statsError) {
throw statsError;
}
// 统计不同状态的消息数量
const stats = {
total_messages: 0,
messages_last_hour: messageStats?.length || 0,
pending_messages: 0,
failed_messages: 0,
processed_messages: 0
};
// 获取总消息数
const { count: totalCount } = await this.supabase
.from('ps_push_messages')
.select('*', { count: 'exact', head: true })
.eq('is_deleted', false);
stats.total_messages = totalCount || 0;
// 获取待处理和失败的消息数
const { count: pendingCount } = await this.supabase
.from('ps_push_messages')
.select('*', { count: 'exact', head: true })
.eq('processing_status', 'pending')
.eq('is_deleted', false);
const { count: failedCount } = await this.supabase
.from('ps_push_messages')
.select('*', { count: 'exact', head: true })
.eq('processing_status', 'failed')
.eq('is_deleted', false);
stats.pending_messages = pendingCount || 0;
stats.failed_messages = failedCount || 0;
stats.processed_messages = stats.total_messages - stats.pending_messages - stats.failed_messages;
return {
database: {
connected: true,
provider: 'Supabase',
stats: stats
},
timestamp: new Date().toISOString()
};
} catch (error) {
this.logger.error('获取系统健康状态失败', {
error: error.message,
stack: error.stack
});
return {
database: {
connected: false,
provider: 'Supabase',
error: error.message
},
timestamp: new Date().toISOString()
};
}
}
/**
* 获取推送消息列表
*/
async getPushMessages(options = {}) {
try {
const {
limit = 50,
offset = 0,
pushType = null,
userId = null,
startDate = null,
endDate = null,
status = null
} = options;
let query = this.supabase
.from('ps_push_messages')
.select(`
id,
push_type,
user_id,
device_id,
received_at,
processing_status,
priority,
raw_data,
is_duplicate
`)
.eq('is_deleted', false)
.order('received_at', { ascending: false })
.range(offset, offset + limit - 1);
if (pushType) {
query = query.eq('push_type', pushType);
}
if (userId) {
query = query.eq('user_id', userId);
}
if (status) {
query = query.eq('processing_status', status);
}
if (startDate) {
query = query.gte('received_at', startDate);
}
if (endDate) {
query = query.lte('received_at', endDate);
}
const { data, error } = await query;
if (error) {
throw error;
}
return data || [];
} catch (error) {
this.logger.error('获取推送消息列表失败', {
error: error.message,
stack: error.stack
});
throw error;
}
}
/**
* 清理旧数据
*/
async cleanupOldMessages(daysToKeep = 30) {
try {
const cutoffDate = new Date(Date.now() - daysToKeep * 24 * 60 * 60 * 1000).toISOString();
const { data, error } = await this.supabase
.from('ps_push_messages')
.update({
is_deleted: true,
deleted_at: new Date().toISOString()
})
.lt('received_at', cutoffDate)
.eq('is_deleted', false)
.select('id');
if (error) {
throw error;
}
const deletedCount = data?.length || 0;
this.logger.info('清理旧数据完成', {
daysToKeep,
deletedCount,
cutoffDate
});
return deletedCount;
} catch (error) {
this.logger.error('清理旧数据失败', {
error: error.message,
stack: error.stack
});
throw error;
}
}
/**
* 关闭连接Supabase 客户端不需要显式关闭)
*/
async close() {
this.logger.info('Supabase 客户端连接已关闭');
}
}
module.exports = SupabaseDatabaseManager;