-- 推送消息接收服务数据库设计 -- 创建数据库和基础表结构 -- 1. 创建数据库 (需要管理员权限执行) -- CREATE DATABASE push_messages; -- CREATE USER push_service WITH PASSWORD 'your_secure_password'; -- GRANT ALL PRIVILEGES ON DATABASE push_messages TO push_service; -- 使用数据库 -- \c push_messages; -- 2. 创建扩展 CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; CREATE EXTENSION IF NOT EXISTS "pg_trgm"; -- 3. 推送消息主表 CREATE TABLE push_messages ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), message_id VARCHAR(255) UNIQUE, -- 外部消息ID(如果有) push_type VARCHAR(50) NOT NULL, -- 推送类型:SOS, HEALTH, LOCATION, ALERT 等 user_id VARCHAR(255), -- 用户ID device_id VARCHAR(255), -- 设备ID source_ip INET, -- 来源IP地址 user_agent TEXT, -- 用户代理 -- 消息内容(JSON格式存储原始数据) raw_data JSONB NOT NULL, -- 原始接收到的完整数据 parsed_data JSONB, -- 解析后的结构化数据 -- 时间戳 received_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, -- 处理状态 processing_status VARCHAR(20) DEFAULT 'pending', -- pending, processed, failed, ignored processed_at TIMESTAMP WITH TIME ZONE, error_message TEXT, retry_count INTEGER DEFAULT 0, -- 优先级和分类 priority INTEGER DEFAULT 5, -- 1-10,数字越小优先级越高 category VARCHAR(100), -- 消息分类 tags TEXT[], -- 标签数组 -- 验证和重复检查 checksum VARCHAR(64), -- 消息校验和,用于去重 is_duplicate BOOLEAN DEFAULT FALSE, original_message_id UUID, -- 如果是重复消息,指向原始消息ID -- 地理位置信息(如果有) latitude DECIMAL(10, 8), longitude DECIMAL(11, 8), location_accuracy FLOAT, location_timestamp TIMESTAMP WITH TIME ZONE, -- 索引和搜索 search_vector tsvector, -- 全文搜索向量 -- 软删除 is_deleted BOOLEAN DEFAULT FALSE, deleted_at TIMESTAMP WITH TIME ZONE ); -- 4. 推送类型配置表 CREATE TABLE push_types ( id SERIAL PRIMARY KEY, type_code VARCHAR(50) UNIQUE NOT NULL, type_name VARCHAR(100) NOT NULL, description TEXT, default_priority INTEGER DEFAULT 5, validation_schema JSONB, -- JSON Schema 用于验证消息格式 is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); -- 5. 消息处理日志表 CREATE TABLE message_processing_logs ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), message_id UUID REFERENCES push_messages(id) ON DELETE CASCADE, processing_step VARCHAR(100) NOT NULL, status VARCHAR(20) NOT NULL, -- started, completed, failed started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP WITH TIME ZONE, duration_ms INTEGER, details JSONB, error_details TEXT ); -- 6. 设备信息表 CREATE TABLE devices ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), device_id VARCHAR(255) UNIQUE NOT NULL, device_name VARCHAR(255), device_type VARCHAR(100), -- sensor, mobile, wearable, etc. user_id VARCHAR(255), last_seen_at TIMESTAMP WITH TIME ZONE, is_active BOOLEAN DEFAULT TRUE, metadata JSONB, -- 设备元数据 created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); -- 7. 用户信息表(简化) CREATE TABLE users ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), user_id VARCHAR(255) UNIQUE NOT NULL, user_name VARCHAR(255), user_type VARCHAR(50), -- student, teacher, elder, caregiver, etc. contact_info JSONB, -- 联系方式 preferences JSONB, -- 用户偏好设置 is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); -- 8. 系统统计表 CREATE TABLE system_stats ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), stat_date DATE DEFAULT CURRENT_DATE, stat_hour INTEGER DEFAULT EXTRACT(HOUR FROM CURRENT_TIMESTAMP), push_type VARCHAR(50), message_count INTEGER DEFAULT 0, success_count INTEGER DEFAULT 0, error_count INTEGER DEFAULT 0, avg_processing_time_ms FLOAT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(stat_date, stat_hour, push_type) ); -- 9. 创建索引 CREATE INDEX idx_push_messages_received_at ON push_messages(received_at DESC); CREATE INDEX idx_push_messages_push_type ON push_messages(push_type); CREATE INDEX idx_push_messages_user_id ON push_messages(user_id); CREATE INDEX idx_push_messages_device_id ON push_messages(device_id); CREATE INDEX idx_push_messages_processing_status ON push_messages(processing_status); CREATE INDEX idx_push_messages_priority ON push_messages(priority); CREATE INDEX idx_push_messages_checksum ON push_messages(checksum); CREATE INDEX idx_push_messages_location ON push_messages USING GIST(ST_POINT(longitude, latitude)) WHERE latitude IS NOT NULL AND longitude IS NOT NULL; CREATE INDEX idx_push_messages_search_vector ON push_messages USING GIN(search_vector); CREATE INDEX idx_push_messages_raw_data ON push_messages USING GIN(raw_data); CREATE INDEX idx_push_messages_parsed_data ON push_messages USING GIN(parsed_data); CREATE INDEX idx_processing_logs_message_id ON message_processing_logs(message_id); CREATE INDEX idx_processing_logs_started_at ON message_processing_logs(started_at DESC); CREATE INDEX idx_devices_device_id ON devices(device_id); CREATE INDEX idx_devices_user_id ON devices(user_id); CREATE INDEX idx_devices_last_seen_at ON devices(last_seen_at DESC); CREATE INDEX idx_users_user_id ON users(user_id); CREATE INDEX idx_system_stats_date_hour ON system_stats(stat_date DESC, stat_hour DESC); CREATE INDEX idx_system_stats_push_type ON system_stats(push_type); -- 10. 创建触发器函数 CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = CURRENT_TIMESTAMP; RETURN NEW; END; $$ language 'plpgsql'; CREATE OR REPLACE FUNCTION update_search_vector() RETURNS TRIGGER AS $$ BEGIN NEW.search_vector := to_tsvector('english', COALESCE(NEW.push_type, '') || ' ' || COALESCE(NEW.user_id, '') || ' ' || COALESCE(NEW.device_id, '') || ' ' || COALESCE(NEW.category, '') || ' ' || COALESCE(array_to_string(NEW.tags, ' '), '') || ' ' || COALESCE(NEW.raw_data::text, '') ); RETURN NEW; END; $$ language 'plpgsql'; -- 11. 创建触发器 CREATE TRIGGER update_push_messages_updated_at BEFORE UPDATE ON push_messages FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); CREATE TRIGGER update_push_messages_search_vector BEFORE INSERT OR UPDATE ON push_messages FOR EACH ROW EXECUTE FUNCTION update_search_vector(); CREATE TRIGGER update_push_types_updated_at BEFORE UPDATE ON push_types FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); CREATE TRIGGER update_devices_updated_at BEFORE UPDATE ON devices FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); -- 12. 插入基础推送类型配置 INSERT INTO push_types (type_code, type_name, description, default_priority, validation_schema) VALUES ('SOS', '紧急求救', 'SOS紧急求救信号', 1, '{"type": "object", "required": ["userId"], "properties": {"userId": {"type": "string"}, "emergencyLevel": {"type": "string", "enum": ["LOW", "MEDIUM", "HIGH", "CRITICAL"]}}}'), ('HEALTH', '健康数据', '健康监测数据推送', 3, '{"type": "object", "required": ["userId"], "properties": {"userId": {"type": "string"}, "H": {"type": "number"}, "O": {"type": "number"}, "T": {"type": "number"}}}'), ('LOCATION', '位置信息', '位置定位数据推送', 4, '{"type": "object", "required": ["userId"], "properties": {"userId": {"type": "string"}, "lat": {"type": "number"}, "lng": {"type": "number"}, "accuracy": {"type": "number"}}}'), ('ALERT', '告警信息', '各类告警信息推送', 2, '{"type": "object", "required": ["userId", "alertType"], "properties": {"userId": {"type": "string"}, "alertType": {"type": "string"}, "severity": {"type": "string"}}}'), ('ACTIVITY', '活动数据', '运动和活动数据推送', 5, '{"type": "object", "required": ["userId"], "properties": {"userId": {"type": "string"}, "activityType": {"type": "string"}, "duration": {"type": "number"}, "calories": {"type": "number"}}}'), ('DEVICE_STATUS', '设备状态', '设备状态信息推送', 4, '{"type": "object", "required": ["deviceId"], "properties": {"deviceId": {"type": "string"}, "status": {"type": "string"}, "batteryLevel": {"type": "number"}}}'); -- 13. 创建视图 -- 最近24小时消息统计视图 CREATE VIEW recent_message_stats AS SELECT push_type, COUNT(*) as total_count, COUNT(*) FILTER (WHERE processing_status = 'processed') as processed_count, COUNT(*) FILTER (WHERE processing_status = 'failed') as failed_count, COUNT(*) FILTER (WHERE processing_status = 'pending') as pending_count, AVG(EXTRACT(EPOCH FROM (processed_at - received_at)) * 1000) as avg_processing_time_ms, MIN(received_at) as first_received, MAX(received_at) as last_received FROM push_messages WHERE received_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours' AND is_deleted = FALSE GROUP BY push_type; -- 活跃设备统计视图 CREATE VIEW active_devices_stats AS SELECT device_type, COUNT(*) as total_devices, COUNT(*) FILTER (WHERE last_seen_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour') as active_1h, COUNT(*) FILTER (WHERE last_seen_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours') as active_24h, COUNT(*) FILTER (WHERE last_seen_at >= CURRENT_TIMESTAMP - INTERVAL '7 days') as active_7d FROM devices WHERE is_active = TRUE GROUP BY device_type; -- 14. 创建函数 -- 清理旧数据函数 CREATE OR REPLACE FUNCTION cleanup_old_messages(days_to_keep INTEGER DEFAULT 30) RETURNS INTEGER AS $$ DECLARE deleted_count INTEGER; BEGIN -- 软删除超过指定天数的消息 UPDATE push_messages SET is_deleted = TRUE, deleted_at = CURRENT_TIMESTAMP WHERE received_at < CURRENT_TIMESTAMP - (days_to_keep || ' days')::INTERVAL AND is_deleted = FALSE; GET DIAGNOSTICS deleted_count = ROW_COUNT; -- 记录清理日志 INSERT INTO message_processing_logs (message_id, processing_step, status, details) VALUES (NULL, 'cleanup_old_messages', 'completed', json_build_object('days_to_keep', days_to_keep, 'deleted_count', deleted_count)); RETURN deleted_count; END; $$ LANGUAGE plpgsql; -- 获取消息统计函数 CREATE OR REPLACE FUNCTION get_message_stats(hours_back INTEGER DEFAULT 24) RETURNS TABLE( push_type VARCHAR, total_count BIGINT, processed_count BIGINT, failed_count BIGINT, pending_count BIGINT, avg_processing_time_ms NUMERIC ) AS $$ BEGIN RETURN QUERY SELECT pm.push_type, COUNT(*) as total_count, COUNT(*) FILTER (WHERE pm.processing_status = 'processed') as processed_count, COUNT(*) FILTER (WHERE pm.processing_status = 'failed') as failed_count, COUNT(*) FILTER (WHERE pm.processing_status = 'pending') as pending_count, AVG(EXTRACT(EPOCH FROM (pm.processed_at - pm.received_at)) * 1000) as avg_processing_time_ms FROM push_messages pm WHERE pm.received_at >= CURRENT_TIMESTAMP - (hours_back || ' hours')::INTERVAL AND pm.is_deleted = FALSE GROUP BY pm.push_type ORDER BY total_count DESC; END; $$ LANGUAGE plpgsql; -- 15. 创建定时任务(需要pg_cron扩展) -- 每天凌晨2点清理30天前的数据 -- SELECT cron.schedule('cleanup-old-messages', '0 2 * * *', 'SELECT cleanup_old_messages(30);'); -- 16. 权限设置 GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO push_service; GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO push_service; GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO push_service; -- 17. 初始化完成标记 INSERT INTO system_stats (stat_date, stat_hour, push_type, message_count) VALUES (CURRENT_DATE, EXTRACT(HOUR FROM CURRENT_TIMESTAMP), 'SYSTEM', 0) ON CONFLICT (stat_date, stat_hour, push_type) DO NOTHING; COMMENT ON DATABASE push_messages IS '推送消息接收服务数据库 - 存储所有接收到的推送消息及相关数据'; COMMENT ON TABLE push_messages IS '推送消息主表 - 存储所有接收到的推送消息'; COMMENT ON TABLE push_types IS '推送类型配置表 - 定义各种推送消息类型及其验证规则'; COMMENT ON TABLE message_processing_logs IS '消息处理日志表 - 记录消息处理过程的详细日志'; COMMENT ON TABLE devices IS '设备信息表 - 存储推送消息来源设备的信息'; COMMENT ON TABLE users IS '用户信息表 - 存储用户基本信息'; COMMENT ON TABLE system_stats IS '系统统计表 - 存储系统运行统计数据';