-- 推送消息接收服务 Supabase 数据库设计 -- 在 Supabase Dashboard 的 SQL Editor 中执行此脚本 -- 适配现有的 ak_users 和 ak_devices 表结构 -- 启用必要的扩展 CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; CREATE EXTENSION IF NOT EXISTS "pg_trgm"; CREATE EXTENSION IF NOT EXISTS "pgcrypto"; -- 1. 推送消息主表 CREATE TABLE IF NOT EXISTS ps_push_messages ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), message_id VARCHAR(255), -- 外部消息ID(如果有) push_type VARCHAR(50) NOT NULL, -- 推送类型:SOS, HEALTH, LOCATION, ALERT 等 user_id UUID REFERENCES public.ak_users(id) ON DELETE SET NULL, -- 关联到现有用户表 device_id UUID REFERENCES public.ak_devices(id) ON DELETE SET NULL, -- 关联到现有设备表 source_ip INET, -- 来源IP地址 user_agent TEXT, -- 用户代理 -- 消息内容(JSON格式存储原始数据) raw_data JSONB NOT NULL, -- 原始接收到的完整数据 parsed_data JSONB, -- 解析后的结构化数据 -- 时间戳 received_at TIMESTAMPTZ DEFAULT NOW(), created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), -- 处理状态 processing_status VARCHAR(20) DEFAULT 'pending', -- pending, processed, failed, ignored processed_at TIMESTAMPTZ, error_message TEXT, retry_count INTEGER DEFAULT 0, -- 优先级和分类 priority INTEGER DEFAULT 5, -- 1-10,数字越小优先级越高 category VARCHAR(100), -- 消息分类 tags TEXT[], -- 标签数组 -- 验证和重复检查 checksum VARCHAR(64), -- 消息校验和,用于去重 is_duplicate BOOLEAN DEFAULT FALSE, original_message_id UUID, -- 如果是重复消息,指向原始消息ID -- 地理位置信息(如果有) latitude DECIMAL(10, 8), longitude DECIMAL(11, 8), location_accuracy FLOAT, location_timestamp TIMESTAMPTZ, -- 索引和搜索 search_vector TSVECTOR, -- 全文搜索向量 -- 软删除 is_deleted BOOLEAN DEFAULT FALSE, deleted_at TIMESTAMPTZ, -- 约束 CONSTRAINT unique_checksum UNIQUE (checksum), CONSTRAINT fk_original_message FOREIGN KEY (original_message_id) REFERENCES ps_push_messages(id) ); -- 2. 推送类型配置表 CREATE TABLE IF NOT EXISTS ps_push_types ( id SERIAL PRIMARY KEY, type_code VARCHAR(50) UNIQUE NOT NULL, type_name VARCHAR(100) NOT NULL, description TEXT, default_priority INTEGER DEFAULT 5, validation_schema JSONB, -- JSON Schema 用于验证消息格式 is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); -- 3. 消息处理日志表 CREATE TABLE IF NOT EXISTS ps_message_processing_logs ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), message_id UUID REFERENCES ps_push_messages(id) ON DELETE CASCADE, processing_step VARCHAR(100) NOT NULL, status VARCHAR(20) NOT NULL, -- started, completed, failed started_at TIMESTAMPTZ DEFAULT NOW(), completed_at TIMESTAMPTZ, duration_ms INTEGER, details JSONB, error_details TEXT ); -- 4. 设备信息表 - 已存在,使用现有的 ak_devices 表 -- CREATE TABLE public.ak_devices ( -- id UUID PRIMARY KEY, -- user_id UUID REFERENCES public.ak_users(id), -- device_type VARCHAR(32) NOT NULL, -- device_name VARCHAR(64), -- device_mac VARCHAR(64), -- bind_time TIMESTAMPTZ DEFAULT now(), -- status VARCHAR(16) DEFAULT 'active', -- extra JSONB -- ); -- 5. 用户信息表 - 已存在,使用现有的 ak_users 表 -- CREATE TABLE public.ak_users ( -- id UUID PRIMARY KEY, -- username VARCHAR(64) UNIQUE NOT NULL, -- email VARCHAR(128) UNIQUE NOT NULL, -- role VARCHAR(32) DEFAULT 'student', -- created_at TIMESTAMPTZ DEFAULT now(), -- ...其他字段 -- ); -- 6. 系统统计表 CREATE TABLE IF NOT EXISTS ps_system_stats ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), stat_date DATE DEFAULT CURRENT_DATE, stat_hour INTEGER DEFAULT EXTRACT(HOUR FROM NOW()), push_type VARCHAR(50), message_count INTEGER DEFAULT 0, success_count INTEGER DEFAULT 0, error_count INTEGER DEFAULT 0, avg_processing_time_ms FLOAT, created_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE(stat_date, stat_hour, push_type) ); -- 7. 创建索引 CREATE INDEX IF NOT EXISTS idx_ps_push_messages_received_at ON ps_push_messages(received_at DESC); CREATE INDEX IF NOT EXISTS idx_ps_push_messages_push_type ON ps_push_messages(push_type); CREATE INDEX IF NOT EXISTS idx_ps_push_messages_user_id ON ps_push_messages(user_id); CREATE INDEX IF NOT EXISTS idx_ps_push_messages_device_id ON ps_push_messages(device_id); CREATE INDEX IF NOT EXISTS idx_ps_push_messages_processing_status ON ps_push_messages(processing_status); CREATE INDEX IF NOT EXISTS idx_ps_push_messages_priority ON ps_push_messages(priority); CREATE INDEX IF NOT EXISTS idx_ps_push_messages_checksum ON ps_push_messages(checksum); CREATE INDEX IF NOT EXISTS idx_ps_push_messages_search_vector ON ps_push_messages USING GIN(search_vector); CREATE INDEX IF NOT EXISTS idx_ps_push_messages_raw_data ON ps_push_messages USING GIN(raw_data); CREATE INDEX IF NOT EXISTS idx_ps_push_messages_parsed_data ON ps_push_messages USING GIN(parsed_data); CREATE INDEX IF NOT EXISTS idx_ps_push_messages_location ON ps_push_messages(latitude, longitude) WHERE latitude IS NOT NULL AND longitude IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_ps_processing_logs_message_id ON ps_message_processing_logs(message_id); CREATE INDEX IF NOT EXISTS idx_ps_processing_logs_started_at ON ps_message_processing_logs(started_at DESC); -- 现有 ak_devices 和 ak_users 表的索引已存在,不需要重复创建 CREATE INDEX IF NOT EXISTS idx_ps_system_stats_date_hour ON ps_system_stats(stat_date DESC, stat_hour DESC); CREATE INDEX IF NOT EXISTS idx_ps_system_stats_push_type ON ps_system_stats(push_type); -- 8. 创建触发器函数 CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION update_search_vector() RETURNS TRIGGER AS $$ BEGIN NEW.search_vector := to_tsvector('english', COALESCE(NEW.push_type, '') || ' ' || COALESCE(NEW.user_id, '') || ' ' || COALESCE(NEW.device_id, '') || ' ' || COALESCE(NEW.category, '') || ' ' || COALESCE(array_to_string(NEW.tags, ' '), '') || ' ' || COALESCE(NEW.raw_data::text, '') ); RETURN NEW; END; $$ LANGUAGE plpgsql; -- 9. 创建触发器 DROP TRIGGER IF EXISTS update_ps_push_messages_updated_at ON ps_push_messages; CREATE TRIGGER update_ps_push_messages_updated_at BEFORE UPDATE ON ps_push_messages FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); DROP TRIGGER IF EXISTS update_ps_push_messages_search_vector ON ps_push_messages; CREATE TRIGGER update_ps_push_messages_search_vector BEFORE INSERT OR UPDATE ON ps_push_messages FOR EACH ROW EXECUTE FUNCTION update_search_vector(); DROP TRIGGER IF EXISTS update_ps_push_types_updated_at ON ps_push_types; CREATE TRIGGER update_ps_push_types_updated_at BEFORE UPDATE ON ps_push_types FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); -- 现有的 ak_devices 和 ak_users 表可能已有更新时间触发器,不需要重复创建 -- 10. 插入基础推送类型配置 INSERT INTO ps_push_types (type_code, type_name, description, default_priority, validation_schema) VALUES ('SOS', '紧急求救', 'SOS紧急求救信号', 1, '{"type": "object", "required": ["userId"], "properties": {"userId": {"type": "string"}, "emergencyLevel": {"type": "string", "enum": ["LOW", "MEDIUM", "HIGH", "CRITICAL"]}}}'), ('HEALTH', '健康数据', '健康监测数据推送', 3, '{"type": "object", "required": ["userId"], "properties": {"userId": {"type": "string"}, "H": {"type": "number"}, "O": {"type": "number"}, "T": {"type": "number"}}}'), ('LOCATION', '位置信息', '位置定位数据推送', 4, '{"type": "object", "required": ["userId"], "properties": {"userId": {"type": "string"}, "lat": {"type": "number"}, "lng": {"type": "number"}, "accuracy": {"type": "number"}}}'), ('ALERT', '告警信息', '各类告警信息推送', 2, '{"type": "object", "required": ["userId", "alertType"], "properties": {"userId": {"type": "string"}, "alertType": {"type": "string"}, "severity": {"type": "string"}}}'), ('ACTIVITY', '活动数据', '运动和活动数据推送', 5, '{"type": "object", "required": ["userId"], "properties": {"userId": {"type": "string"}, "activityType": {"type": "string"}, "duration": {"type": "number"}, "calories": {"type": "number"}}}'), ('DEVICE_STATUS', '设备状态', '设备状态信息推送', 4, '{"type": "object", "required": ["deviceId"], "properties": {"deviceId": {"type": "string"}, "status": {"type": "string"}, "batteryLevel": {"type": "number"}}}') ON CONFLICT (type_code) DO NOTHING; -- 11. 创建视图 CREATE OR REPLACE VIEW ps_recent_message_stats AS SELECT push_type, COUNT(*) as total_count, COUNT(*) FILTER (WHERE processing_status = 'processed') as processed_count, COUNT(*) FILTER (WHERE processing_status = 'failed') as failed_count, COUNT(*) FILTER (WHERE processing_status = 'pending') as pending_count, AVG(EXTRACT(EPOCH FROM (processed_at - received_at)) * 1000) as avg_processing_time_ms, MIN(received_at) as first_received, MAX(received_at) as last_received FROM ps_push_messages WHERE received_at >= NOW() - INTERVAL '24 hours' AND is_deleted = FALSE GROUP BY push_type; CREATE OR REPLACE VIEW ps_active_devices_stats AS SELECT d.device_type, COUNT(*) as total_devices, COUNT(*) FILTER (WHERE d.bind_time >= NOW() - INTERVAL '1 hour') as active_1h, COUNT(*) FILTER (WHERE d.bind_time >= NOW() - INTERVAL '24 hours') as active_24h, COUNT(*) FILTER (WHERE d.bind_time >= NOW() - INTERVAL '7 days') as active_7d FROM public.ak_devices d WHERE d.status = 'active' GROUP BY d.device_type; -- 12. 创建函数 CREATE OR REPLACE FUNCTION ps_cleanup_old_messages(days_to_keep INTEGER DEFAULT 30) RETURNS INTEGER AS $$ DECLARE deleted_count INTEGER; BEGIN -- 软删除超过指定天数的消息 UPDATE ps_push_messages SET is_deleted = TRUE, deleted_at = NOW() WHERE received_at < NOW() - (days_to_keep || ' days')::INTERVAL AND is_deleted = FALSE; GET DIAGNOSTICS deleted_count = ROW_COUNT; -- 记录清理日志 INSERT INTO ps_message_processing_logs (message_id, processing_step, status, details) VALUES (NULL, 'cleanup_old_messages', 'completed', json_build_object('days_to_keep', days_to_keep, 'deleted_count', deleted_count)); RETURN deleted_count; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION ps_get_message_stats(hours_back INTEGER DEFAULT 24) RETURNS TABLE( push_type VARCHAR, total_count BIGINT, processed_count BIGINT, failed_count BIGINT, pending_count BIGINT, avg_processing_time_ms NUMERIC ) AS $$ BEGIN RETURN QUERY SELECT pm.push_type, COUNT(*) as total_count, COUNT(*) FILTER (WHERE pm.processing_status = 'processed') as processed_count, COUNT(*) FILTER (WHERE pm.processing_status = 'failed') as failed_count, COUNT(*) FILTER (WHERE pm.processing_status = 'pending') as pending_count, AVG(EXTRACT(EPOCH FROM (pm.processed_at - pm.received_at)) * 1000) as avg_processing_time_ms FROM ps_push_messages pm WHERE pm.received_at >= NOW() - (hours_back || ' hours')::INTERVAL AND pm.is_deleted = FALSE GROUP BY pm.push_type ORDER BY total_count DESC; END; $$ LANGUAGE plpgsql; -- 13. 启用 Row Level Security (RLS) ALTER TABLE ps_push_messages ENABLE ROW LEVEL SECURITY; ALTER TABLE ps_push_types ENABLE ROW LEVEL SECURITY; ALTER TABLE ps_message_processing_logs ENABLE ROW LEVEL SECURITY; ALTER TABLE ps_system_stats ENABLE ROW LEVEL SECURITY; -- 现有的 ak_devices 和 ak_users 表可能已有 RLS 设置 -- 14. 创建 RLS 策略(Service Role 可以访问所有数据) CREATE POLICY "Enable all access for service role" ON ps_push_messages FOR ALL USING (auth.role() = 'service_role'); CREATE POLICY "Enable all access for service role" ON ps_push_types FOR ALL USING (auth.role() = 'service_role'); CREATE POLICY "Enable all access for service role" ON ps_message_processing_logs FOR ALL USING (auth.role() = 'service_role'); CREATE POLICY "Enable all access for service role" ON ps_system_stats FOR ALL USING (auth.role() = 'service_role'); -- 现有的 ak_devices 和 ak_users 表可能已有 RLS 策略 -- 15. 创建 API 访问策略(允许匿名用户插入数据) CREATE POLICY "Enable insert for anon users" ON ps_push_messages FOR INSERT WITH CHECK (true); CREATE POLICY "Enable read for anon users" ON ps_push_types FOR SELECT USING (is_active = true); -- 16. 初始化完成标记 INSERT INTO ps_system_stats (stat_date, stat_hour, push_type, message_count) VALUES (CURRENT_DATE, EXTRACT(HOUR FROM NOW()), 'SYSTEM', 0) ON CONFLICT (stat_date, stat_hour, push_type) DO NOTHING; -- 17. 创建实时订阅(用于监控推送消息) -- 在 Supabase Dashboard 的 Database > Replication 中启用 ps_push_messages 表的实时功能 -- 注释 COMMENT ON TABLE ps_push_messages IS '推送消息主表 - 存储所有接收到的推送消息'; COMMENT ON TABLE ps_push_types IS '推送类型配置表 - 定义各种推送消息类型及其验证规则'; COMMENT ON TABLE ps_message_processing_logs IS '消息处理日志表 - 记录消息处理过程的详细日志'; COMMENT ON TABLE ps_system_stats IS '系统统计表 - 存储系统运行统计数据'; -- 现有表的注释: -- public.ak_devices: 设备表 - 存储用户绑定的各种设备信息 -- public.ak_users: 用户表 - 存储系统用户的基本信息和配置