# 微通信架构与端到端流程设计(MQTT/Kafka/Supabase/ClickHouse + 个推) 本方案围绕当前既有逻辑“设备/客户端通过 MQTT 上报 →(Kafka 或)直接写入 Supabase → 前端 chat/index 接收 → 前端发送消息至 MQTT 并写入 Supabase”进行系统化设计,覆盖后端组件、数据流、推送、可靠性与前端实现要点。 ## 1. 总览 - 目标 - 低延迟、多终端的聊天与事件通知,既支持 App/Web,也支持物联设备/边缘端。 - 统一消息落库(PostgreSQL on Supabase)+ 实时分发(Supabase Realtime / MQTT)。 - 推送采用“个推”在 App 后台或离线时补充通知到达率。 - 核心组件 - MQTT Broker(EMQX/Mosquitto):收消息/发消息的设备级/轻端入口,QoS≥1。 - 网关服务 gateway-mqtt(可选:内置 Kafka Bridge): - 认证/ACL、协议转换、消息校验与标准化。 - 转发到 Kafka(chat.inbound)或直接调用 Supabase(REST/RPC/pg)落库。 - Kafka(可选/增强): - 主题:chat.inbound(入口)、chat.persisted(落库成功后)、chat.push(通知任务)。 - 解耦高峰、可回放、做扩展处理(统计、风控、审计)。 - Supabase(Postgres + Realtime + Edge Functions): - 表:chat_conversations / chat_participants / chat_messages / chat_notifications(已建表,见 `doc_chat/create_chat_tables.sql`)。 - 触发器:chat_on_message_insert(更新 last_message_at + 写通知)。 - RLS:按参与者隔离访问(已配置)。 - Realtime:前端订阅消息/通知的实时流。 - ClickHouse(列存/OLAP/时序聚合): - 承载健康/传感类高频时序数据(大跨度查询、长保留、低成本)。 - 与 Kafka/Telegraf 对接,建立明细表 + 物化视图做分钟/小时聚合。 - 网关运行报表(Supabase 内): - 表:`gateway_nodes`、`gateway_heartbeats`(见 `doc_chat/create_gateway_reporting.sql`)。 - Node 网关周期写入心跳与计数,供看板与告警使用。 - push-notify-worker(个推): - 通过 Supabase Webhook/Edge Function 或 Kafka chat.push 触发,调用个推 API 下发推送。 ## 2. 事件/消息模型 采用统一 envelope(参考 CloudEvents 思想): ```json { "id": "", // 客户端生成/网关补齐(用于幂等) "ts": "2025-09-23T10:00:00Z", // ISO 时间 "type": "chat.message", // 事件类型 "source": "mqtt|web|server", // 来源 "conversation_id": "...", "sender_id": "...", "content": "...", // 文本或 URL(音频/文件) "content_type": "text|audio|image|file|markdown|json", "metadata": {"duration_ms": 1234, "mime": "audio/mpeg", "size": 102400} } ``` - 客户端发送到 MQTT 主题时携带此结构; - 网关校验/补齐后落库为 chat_messages 的一行(content/metadata 等字段对应)。 另:健康/传感时序(Telemetry)建议采用“轻 envelope + 列存模型”的二段式: ```json { "id": "", "ts": "2025-09-23T10:00:00Z", "type": "ts.metric", "user_id": "...", "device_id": "...", "metric": "hr|spo2|temp|bp_systolic|bp_diastolic|steps", "value": 72.0, "meta": { "unit": "bpm", "src": "ble" } } ``` 由网关投递至 Kafka `ts.health`,并落地 ClickHouse `health_raw`(示例见 `doc_chat/kafka_mqtt_clickhouse.md`)。 ## 3. 主题与分区规划(MQTT/Kafka) - MQTT 主题 - chat/send/{conversationId}:客户端→服务端,发送消息入口。 - chat/recv/{conversationId}:服务端→客户端,转发已确认写库的消息(可选)。 - presence/{userId}:在线状态(使用 Last Will 设置离线告警)。 - ack/{messageId}/{userId}(可选):消息已达/已读 ACK 路由。 - 建议 QoS1(至少一次),禁用 retain(聊天消息不保留),presence 可保留。 - ACL: - 仅参与者可发布 chat/send/{conversationId}; - 仅参与者可订阅 chat/recv/{conversationId}; - 用户仅能发布/订阅自身 presence/{userId} 与 ack 自己路由。 - Kafka 主题(可选) - chat.inbound(key=conversationId):保证同会话有序; - chat.persisted:写库成功后的广播,用于下游计算或转发; - chat.push:推送任务。 - ts.health(key=user_id 或 device_id):健康/传感数据入口,供 ClickHouse 消费。 ## 4. 后端流程设计 ```text [Client/Device] | MQTT publish chat/send/{conversationId} v [gateway-mqtt] | 校验/标准化/幂等 |--(A) 直写 Supabase PostgREST/Edge Function --> [Postgres] |--(B) 投递 Kafka chat.inbound ---------------> [persist-worker -> Postgres] |(trigger) -> chat_notifications |(optional) -> MQTT chat/recv/{conversationId} | -> Kafka chat.push v [push-notify-worker -> 个推] ---- Telemetry 时序流 ---- |--(T1) 网关将时序上报写入 Kafka ts.health ------> [ClickHouse 明细表 health_raw] |--(T2) 物化/降采样视图做分钟/小时聚合 ----------> [Grafana 可视化] |--(T3) 最近快照/告警回写 Supabase(可选) ------> [user_metrics_last / alerts] ``` ### 4.1 MQTT → Supabase(入口) 1) 客户端/设备 发布到 chat/send/{conversationId}(QoS1)。 2) gateway-mqtt 收到消息: - 校验 JWT/设备证书,鉴权是否会话参与者; - 解析 envelope,若无 id 则补 uuid; - 幂等检查(可选:Redis setNx(messageId) 窗口 24h)。 3) 落库: - 方案 A:直写 Supabase PostgREST(chat_messages.insert)或 RPC。 - 方案 B:写入 Kafka chat.inbound → persist-worker 落库(高峰更稳)。 4) Postgres 触发器 chat_on_message_insert: - 更新 chat_conversations.last_message_at; - 插入 chat_notifications(除发送者外所有参与者)。 5) 可选:服务端再发布到 MQTT chat/recv/{conversationId}(供仅连 MQTT 的轻端即时收到),或依赖 Supabase Realtime 给连 H5/App 的用户推送。 ### 4.2 Supabase → 个推(通知) - 触发方式: - Edge Function 订阅 Postgres Changes(或 Supabase Webhook)监听 chat_notifications insert; - 或 Kafka chat.push 消费者。 - push-notify-worker: - 查询 userId 的个推 cid; - 生成通知文案(可带会话标题、消息摘要、角标); - 调用个推 API 下发;记录投递结果(可回写到一个 delivery_logs 表)。 ### 4.3 前端发送(chat/index) 1) 音频/附件:先 S3 预签名上传,获得 URL 与 metadata。 2) 生成消息 id(uuid),组装 envelope。 3) 并行/串行两路: - 发布到 MQTT chat/send/{conversationId}(低延迟通知网关/其他 MQTT 客户端)。 - 调用 Supabase 插入 chat_messages(确保落库)。 4) UI 乐观更新;若落库失败则回滚/重试。 ### 4.4 前端接收 - H5/App(连 Supabase Realtime): - 订阅 chat_messages(INSERT)与 chat_notifications(INSERT)。 - 房间内做去重(根据 id,已在 room.uvue 实现)。 - 仅 MQTT 客户端: - 订阅 chat/recv/{conversationId} 主题获取已确认消息(可由网关回发)。 ### 4.5 Telemetry → ClickHouse(健康/传感数据) 1) 设备/网关 MQTT 上报(可以复用同一 broker,不同主题前缀如 `ts/send/{deviceId}`)。 2) gateway-mqtt 将清洗后的数据写入 Kafka `ts.health`(key 使用 user_id 或 device_id 保证局部有序)。 3) ClickHouse 落地:明细表 `health_raw`(MergeTree,按月份分区,TTL 保留),并建立 1m/1h 物化视图。 4) 前端时序图:Grafana 直连 ClickHouse;应用内查询可通过后端代理(统一鉴权,参见 `kafka_mqtt_clickhouse.md`)。 5) 最近快照/告警回写 Supabase(`user_metrics_last`/`alerts`),与业务数据/权限体系对齐。 ## 5. 推送(个推)策略 - 触发:插入 chat_notifications 后对目标用户触发推送。 - 去重:个推透传携带 message_id/conversation_id,客户端去重。 - 点击动作:深链到 pages/sport/chat/room?conversation_id=xxx。 - 频率限制:同会话短时多条合并为“你有 X 条新消息”。 - 前台/后台判定:前台不推或用本地通知;后台/离线才用个推。 ## 6. 安全与权限 - MQTT 认证:JWT(携带 user_id、过期时间)或 TLS 证书;ACL 映射会话参与者。 - Supabase RLS:已基于 chat_participants;确保 Edge Function 走 service role 仅内部调用。 - S3 附件:仅用预签名上传;公有读或 CDN 鉴权按需配置。 - Idempotency:message_id 唯一约束(Postgres 可加唯一索引),网关 Redis 幂等窗。 - Telemetry:ClickHouse 作为原始明细库不直接暴露到前端;前端通过后端代理或预聚合接口访问,鉴权仍以 Supabase 用户为中心。 ## 7. 可靠性与一致性 - 顺序:以 conversationId 为 key 的 Kafka 分区(或直接依赖单库顺序 + created_at 排序)。 - 重试: - 前端 AkReq 已支持网络重试; - 网关→Supabase 写库失败使用指数退避重试; - push 失败重试并退避; - 断线: - MQTT QoS1 + Clean Session; - 前端 Realtime 自动重连(建议在 AkSupaRealtime 增加重连与订阅恢复)。 - 未读计数: - 客户端进入会话更新 chat_participants.last_read_at; - 会话列表通过 last_message_at 与 last_read_at 计算角标(服务端或客户端)。 - 网关运行健康: - Node 网关定期上报 `gateway_heartbeats`(CPU/内存/连接状态/消息计数/错误等)。 - 提供 `gateway_status_latest` 视图与 `gateway_daily_stats` 物化视图做看板/告警。 ## 8. 与现有表结构的映射 - chat_messages.content 存文本或 S3 URL;content_type 包含 "audio"(已支持)。 - chat_on_message_insert 已生成 chat_notifications,供 Realtime 与个推使用。 - 参与者列表通过 chat_participants 查询;权限与 RLS 已覆盖。 - 网关运行报表: - `gateway_nodes`:注册网关(以 `mqtt_client_id` 作为唯一标识)。 - `gateway_heartbeats`:周期心跳与计数;`gateway_status_latest` 展示最近一次状态。 - SQL 见 `doc_chat/create_gateway_reporting.sql`。 ## 9. 前端实现要点(结合现有代码) - 单 WS 连接多订阅:`ChatDataService.ensureRealtime()` 已实现。 - 房间消息去重:`room.uvue` 已按 id 去重。 - 录音与上传:`AudioUploadService` 预签名 + multipart,返回 URL 后调用 `sendAudioMessage()`。 - 发送路径:保持 MQTT 发布 + Supabase 插入的“双写”,若担心两者不一致,可切换为“仅插库 → 后端转发到 MQTT”。 - 订阅释放:路由切换时调用 `dispose()` 与 `closeRealtime()` 清理。 - Telemetry 展示:趋势/历史直接走 ClickHouse(Grafana 或后端代理 API);业务侧角标/卡片展示用 Supabase 快照表(低延迟)。 ## 10. 运维与监控 - 网关与推送服务加 Prometheus 指标:QPS、失败率、延时、重试数。 - Kafka Lag 监控(若使用 Kafka)。 - Postgres 指标与 Realtime 通道数监控。 - ClickHouse:插入延迟、分区大小、查询耗时、物化视图滞后;Grafana 有现成数据源。 - 个推调用成功率,退避与熔断策略。 - 统一 compose 部署(见 `doc_chat/docker-compose.yml`):`kafka`、`redis`、`clickhouse`、`grafana`、`mqtt` 一体化联调。 ## 11. 最小可行落地顺序 1) 网关直写 Supabase(跳过 Kafka),完成 MQTT → DB → Realtime 闭环; 2) 接入个推:从 chat_notifications 插入触发; 3) 加上 MQTT 回发(chat/recv)供仅 MQTT 客户端; 4) 引入 Kafka 做解耦与扩展(如需要); 5) 增强 Realtime 重连与订阅恢复;完善未读角标与已读回执。 6) Telemetry 引入 ClickHouse:接入 Kafka `ts.health` → 明细表 + 物化视图;前端改走 ClickHouse/Grafana。 ## 12. 配置与文件索引(本仓库) - Docker 一体化:`doc_chat/docker-compose.yml` - MQTT Broker 配置:`doc_chat/mosquitto/config/mosquitto.conf` - 网关服务: - 代码:`server/gateway-mqtt-node/src/index.js`(MQTT→Kafka/Supabase、心跳上报) - Worker:`server/gateway-mqtt-node/src/worker.js`(Kafka chat.inbound → Supabase 持久化) - 环境模板:`server/gateway-mqtt-node/.env.example` - 聊天表结构与触发器:`doc_chat/create_chat_tables.sql` - 网关运行报表:`doc_chat/create_gateway_reporting.sql` - 时序选型与 ClickHouse 方案:`doc_chat/kafka_mqtt_clickhouse.md` --- 如需我生成网关示例(Node.js/Go,鉴权+插库)或 Supabase Edge Function 模板(监听 notifications → 个推),告诉我技术栈偏好即可。 --- ## 附录:按会话划分 Supabase Realtime 通道方案 ### 背景 - 现状:前端通过单一 `chat_messages` 订阅获取所有对话的 INSERT 事件,再由客户端根据 `conversation_id` 做过滤。 - 问题: - 参加多个会话的用户会收到不相关会话的通知,虽然 UI 再过滤,但仍产生额外的网络和序列化开销。 - 如果后续需要对同一个 Supabase Realtime 连接做权限隔离(例如服务端使用 service key 代理订阅),需要显式限定查询范围。 ### 设计目标 1. 每个会话使用独立的 Realtime channel,通过 Postgres Changes 的 `filter` 或 `event` 参数仅推送该会话的数据。 2. 在保持单连接多订阅的基础上,动态创建/关闭频道,避免过多 WebSocket 连接。 3. 兼容移动端(uni-app X)与 Web,同步维护订阅生命周期。 ### 频道命名与过滤 | 场景 | Supabase channel 名称 | Postgres Changes 配置 | |------|------------------------|------------------------| | 会话消息 | `chat:msg:` | `{ event: "INSERT", schema: "public", table: "chat_messages", filter: "conversation_id=eq." }` | | 会话通知 | `chat:notify:`(可选) | `{ event: "INSERT", schema: "public", table: "chat_notifications", filter: "conversation_id=eq." }` | > 命名规则保留 `chat:` 前缀,方便统一管理;`conversationId` 建议做 URL encode 避免特殊字符。 ### 前端改造步骤 1. **订阅管理器**:在 `ChatDataService.ensureRealtime()` 基础上,增加 `ensureConversationChannel(conversationId)`: - 若已有 `convChannels[conversationId]` 直接返回。 - 否则调用 `realtime.channel(name)`,并使用 `channel.on('postgres_changes', {...})` 设置监听。 - 调用 `channel.subscribe()`,成功后写入 Map;失败时重试或回退为 table 级订阅。 2. **进入会话**:`room.uvue` 调用 `ChatDataService.subscribeMessages(conversationId)` 时,改为: - `ensureConversationChannel(conversationId)`。 - 在 channel 回调里只派发该会话的消息(无需再过滤)。 3. **退出会话**:在 `onUnload/onHide` 调用 `ChatDataService.releaseConversationChannel(conversationId)`: - 取消监听,调用 `channel.unsubscribe()` 并从 Map 移除。 - 可设置闲置超时,避免频繁退出/进入导致频繁订阅。 4. **多会话同时打开**(例如侧边栏预览):允许同时保持多个频道订阅,根据 UI 激活状态决定是否真正渲染进入消息队列。 ### 权限与安全 - 使用匿名/终端用户 token 连接 Realtime 时,RLS 会自动限制仅返回参与者可访问的 `conversation_id`。即便如此,按会话过滤可以进一步降低服务端负载。 - 若使用 service key 统一代理订阅(例如后端服务转发消息),务必在 channel 过滤项中带上目标会话 ID,避免泄露。 ### 服务器端同步策略 - 若网关或中间层也监听 Realtime,可采用同样的 channel 命名规范,或通过 `joinTopicIfNeeded('chat:msg:', payload)` 方式统一管理。 - Channel 生命周期可和应用级订阅保持一致,并在服务器端维护引用计数,最后一个消费者离开时再真正 `unsubscribe()`。 ### 回退策略 - 如果遇到 Supabase 版本限制无法使用 `filter`,可以退回到表级订阅,但保留 channel 命名(仍使用 `chat:msg:`),在客户端回调内做过滤。这样可以平滑升级。 - 同理,在网络不佳时可侦测订阅失败并自动切换到现有的“全量监听 + 客户端过滤”。 --- 该方案不会改变已有聊天流程,只是优化 Realtime 的订阅粒度,实现“每个会话一个 channel”。配合现有的缓存/去重逻辑可直接落地,下一步可在 `ChatDataService` 中实现上述方法并编写单元测试覆盖多订阅场景。