16 KiB
16 KiB
微通信架构与端到端流程设计(MQTT/Kafka/Supabase/ClickHouse + 个推)
本方案围绕当前既有逻辑“设备/客户端通过 MQTT 上报 →(Kafka 或)直接写入 Supabase → 前端 chat/index 接收 → 前端发送消息至 MQTT 并写入 Supabase”进行系统化设计,覆盖后端组件、数据流、推送、可靠性与前端实现要点。
1. 总览
- 目标
- 低延迟、多终端的聊天与事件通知,既支持 App/Web,也支持物联设备/边缘端。
- 统一消息落库(PostgreSQL on Supabase)+ 实时分发(Supabase Realtime / MQTT)。
- 推送采用“个推”在 App 后台或离线时补充通知到达率。
- 核心组件
- MQTT Broker(EMQX/Mosquitto):收消息/发消息的设备级/轻端入口,QoS≥1。
- 网关服务 gateway-mqtt(可选:内置 Kafka Bridge):
- 认证/ACL、协议转换、消息校验与标准化。
- 转发到 Kafka(chat.inbound)或直接调用 Supabase(REST/RPC/pg)落库。
- Kafka(可选/增强):
- 主题:chat.inbound(入口)、chat.persisted(落库成功后)、chat.push(通知任务)。
- 解耦高峰、可回放、做扩展处理(统计、风控、审计)。
- Supabase(Postgres + Realtime + Edge Functions):
- 表:chat_conversations / chat_participants / chat_messages / chat_notifications(已建表,见
doc_chat/create_chat_tables.sql)。 - 触发器:chat_on_message_insert(更新 last_message_at + 写通知)。
- RLS:按参与者隔离访问(已配置)。
- Realtime:前端订阅消息/通知的实时流。
- 表:chat_conversations / chat_participants / chat_messages / chat_notifications(已建表,见
- ClickHouse(列存/OLAP/时序聚合):
- 承载健康/传感类高频时序数据(大跨度查询、长保留、低成本)。
- 与 Kafka/Telegraf 对接,建立明细表 + 物化视图做分钟/小时聚合。
- 网关运行报表(Supabase 内):
- 表:
gateway_nodes、gateway_heartbeats(见doc_chat/create_gateway_reporting.sql)。 - Node 网关周期写入心跳与计数,供看板与告警使用。
- 表:
- push-notify-worker(个推):
- 通过 Supabase Webhook/Edge Function 或 Kafka chat.push 触发,调用个推 API 下发推送。
2. 事件/消息模型
采用统一 envelope(参考 CloudEvents 思想):
{
"id": "<uuid>", // 客户端生成/网关补齐(用于幂等)
"ts": "2025-09-23T10:00:00Z", // ISO 时间
"type": "chat.message", // 事件类型
"source": "mqtt|web|server", // 来源
"conversation_id": "...",
"sender_id": "...",
"content": "...", // 文本或 URL(音频/文件)
"content_type": "text|audio|image|file|markdown|json",
"metadata": {"duration_ms": 1234, "mime": "audio/mpeg", "size": 102400}
}
- 客户端发送到 MQTT 主题时携带此结构;
- 网关校验/补齐后落库为 chat_messages 的一行(content/metadata 等字段对应)。
另:健康/传感时序(Telemetry)建议采用“轻 envelope + 列存模型”的二段式:
{
"id": "<uuid>",
"ts": "2025-09-23T10:00:00Z",
"type": "ts.metric",
"user_id": "...",
"device_id": "...",
"metric": "hr|spo2|temp|bp_systolic|bp_diastolic|steps",
"value": 72.0,
"meta": { "unit": "bpm", "src": "ble" }
}
由网关投递至 Kafka ts.health,并落地 ClickHouse health_raw(示例见 doc_chat/kafka_mqtt_clickhouse.md)。
3. 主题与分区规划(MQTT/Kafka)
- MQTT 主题
- chat/send/{conversationId}:客户端→服务端,发送消息入口。
- chat/recv/{conversationId}:服务端→客户端,转发已确认写库的消息(可选)。
- presence/{userId}:在线状态(使用 Last Will 设置离线告警)。
- ack/{messageId}/{userId}(可选):消息已达/已读 ACK 路由。
- 建议 QoS1(至少一次),禁用 retain(聊天消息不保留),presence 可保留。
- ACL:
- 仅参与者可发布 chat/send/{conversationId};
- 仅参与者可订阅 chat/recv/{conversationId};
- 用户仅能发布/订阅自身 presence/{userId} 与 ack 自己路由。
- Kafka 主题(可选)
- chat.inbound(key=conversationId):保证同会话有序;
- chat.persisted:写库成功后的广播,用于下游计算或转发;
- chat.push:推送任务。
- ts.health(key=user_id 或 device_id):健康/传感数据入口,供 ClickHouse 消费。
4. 后端流程设计
[Client/Device]
| MQTT publish chat/send/{conversationId}
v
[gateway-mqtt]
| 校验/标准化/幂等
|--(A) 直写 Supabase PostgREST/Edge Function --> [Postgres]
|--(B) 投递 Kafka chat.inbound ---------------> [persist-worker -> Postgres]
|(trigger) -> chat_notifications
|(optional) -> MQTT chat/recv/{conversationId}
| -> Kafka chat.push
v
[push-notify-worker -> 个推]
---- Telemetry 时序流 ----
|--(T1) 网关将时序上报写入 Kafka ts.health ------> [ClickHouse 明细表 health_raw]
|--(T2) 物化/降采样视图做分钟/小时聚合 ----------> [Grafana 可视化]
|--(T3) 最近快照/告警回写 Supabase(可选) ------> [user_metrics_last / alerts]
4.1 MQTT → Supabase(入口)
- 客户端/设备 发布到 chat/send/{conversationId}(QoS1)。
- gateway-mqtt 收到消息:
- 校验 JWT/设备证书,鉴权是否会话参与者;
- 解析 envelope,若无 id 则补 uuid;
- 幂等检查(可选:Redis setNx(messageId) 窗口 24h)。
- 落库:
- 方案 A:直写 Supabase PostgREST(chat_messages.insert)或 RPC。
- 方案 B:写入 Kafka chat.inbound → persist-worker 落库(高峰更稳)。
- Postgres 触发器 chat_on_message_insert:
- 更新 chat_conversations.last_message_at;
- 插入 chat_notifications(除发送者外所有参与者)。
- 可选:服务端再发布到 MQTT chat/recv/{conversationId}(供仅连 MQTT 的轻端即时收到),或依赖 Supabase Realtime 给连 H5/App 的用户推送。
4.2 Supabase → 个推(通知)
- 触发方式:
- Edge Function 订阅 Postgres Changes(或 Supabase Webhook)监听 chat_notifications insert;
- 或 Kafka chat.push 消费者。
- push-notify-worker:
- 查询 userId 的个推 cid;
- 生成通知文案(可带会话标题、消息摘要、角标);
- 调用个推 API 下发;记录投递结果(可回写到一个 delivery_logs 表)。
4.3 前端发送(chat/index)
- 音频/附件:先 S3 预签名上传,获得 URL 与 metadata。
- 生成消息 id(uuid),组装 envelope。
- 并行/串行两路:
- 发布到 MQTT chat/send/{conversationId}(低延迟通知网关/其他 MQTT 客户端)。
- 调用 Supabase 插入 chat_messages(确保落库)。
- UI 乐观更新;若落库失败则回滚/重试。
4.4 前端接收
- H5/App(连 Supabase Realtime):
- 订阅 chat_messages(INSERT)与 chat_notifications(INSERT)。
- 房间内做去重(根据 id,已在 room.uvue 实现)。
- 仅 MQTT 客户端:
- 订阅 chat/recv/{conversationId} 主题获取已确认消息(可由网关回发)。
4.5 Telemetry → ClickHouse(健康/传感数据)
- 设备/网关 MQTT 上报(可以复用同一 broker,不同主题前缀如
ts/send/{deviceId})。 - gateway-mqtt 将清洗后的数据写入 Kafka
ts.health(key 使用 user_id 或 device_id 保证局部有序)。 - ClickHouse 落地:明细表
health_raw(MergeTree,按月份分区,TTL 保留),并建立 1m/1h 物化视图。 - 前端时序图:Grafana 直连 ClickHouse;应用内查询可通过后端代理(统一鉴权,参见
kafka_mqtt_clickhouse.md)。 - 最近快照/告警回写 Supabase(
user_metrics_last/alerts),与业务数据/权限体系对齐。
5. 推送(个推)策略
- 触发:插入 chat_notifications 后对目标用户触发推送。
- 去重:个推透传携带 message_id/conversation_id,客户端去重。
- 点击动作:深链到 pages/sport/chat/room?conversation_id=xxx。
- 频率限制:同会话短时多条合并为“你有 X 条新消息”。
- 前台/后台判定:前台不推或用本地通知;后台/离线才用个推。
6. 安全与权限
- MQTT 认证:JWT(携带 user_id、过期时间)或 TLS 证书;ACL 映射会话参与者。
- Supabase RLS:已基于 chat_participants;确保 Edge Function 走 service role 仅内部调用。
- S3 附件:仅用预签名上传;公有读或 CDN 鉴权按需配置。
- Idempotency:message_id 唯一约束(Postgres 可加唯一索引),网关 Redis 幂等窗。
- Telemetry:ClickHouse 作为原始明细库不直接暴露到前端;前端通过后端代理或预聚合接口访问,鉴权仍以 Supabase 用户为中心。
7. 可靠性与一致性
- 顺序:以 conversationId 为 key 的 Kafka 分区(或直接依赖单库顺序 + created_at 排序)。
- 重试:
- 前端 AkReq 已支持网络重试;
- 网关→Supabase 写库失败使用指数退避重试;
- push 失败重试并退避;
- 断线:
- MQTT QoS1 + Clean Session;
- 前端 Realtime 自动重连(建议在 AkSupaRealtime 增加重连与订阅恢复)。
- 未读计数:
- 客户端进入会话更新 chat_participants.last_read_at;
- 会话列表通过 last_message_at 与 last_read_at 计算角标(服务端或客户端)。
- 网关运行健康:
- Node 网关定期上报
gateway_heartbeats(CPU/内存/连接状态/消息计数/错误等)。 - 提供
gateway_status_latest视图与gateway_daily_stats物化视图做看板/告警。
- Node 网关定期上报
8. 与现有表结构的映射
- chat_messages.content 存文本或 S3 URL;content_type 包含 "audio"(已支持)。
- chat_on_message_insert 已生成 chat_notifications,供 Realtime 与个推使用。
- 参与者列表通过 chat_participants 查询;权限与 RLS 已覆盖。
- 网关运行报表:
gateway_nodes:注册网关(以mqtt_client_id作为唯一标识)。gateway_heartbeats:周期心跳与计数;gateway_status_latest展示最近一次状态。- SQL 见
doc_chat/create_gateway_reporting.sql。
9. 前端实现要点(结合现有代码)
- 单 WS 连接多订阅:
ChatDataService.ensureRealtime()已实现。 - 房间消息去重:
room.uvue已按 id 去重。 - 录音与上传:
AudioUploadService预签名 + multipart,返回 URL 后调用sendAudioMessage()。 - 发送路径:保持 MQTT 发布 + Supabase 插入的“双写”,若担心两者不一致,可切换为“仅插库 → 后端转发到 MQTT”。
- 订阅释放:路由切换时调用
dispose()与closeRealtime()清理。 - Telemetry 展示:趋势/历史直接走 ClickHouse(Grafana 或后端代理 API);业务侧角标/卡片展示用 Supabase 快照表(低延迟)。
10. 运维与监控
- 网关与推送服务加 Prometheus 指标:QPS、失败率、延时、重试数。
- Kafka Lag 监控(若使用 Kafka)。
- Postgres 指标与 Realtime 通道数监控。
- ClickHouse:插入延迟、分区大小、查询耗时、物化视图滞后;Grafana 有现成数据源。
- 个推调用成功率,退避与熔断策略。
- 统一 compose 部署(见
doc_chat/docker-compose.yml):kafka、redis、clickhouse、grafana、mqtt一体化联调。
11. 最小可行落地顺序
- 网关直写 Supabase(跳过 Kafka),完成 MQTT → DB → Realtime 闭环;
- 接入个推:从 chat_notifications 插入触发;
- 加上 MQTT 回发(chat/recv)供仅 MQTT 客户端;
- 引入 Kafka 做解耦与扩展(如需要);
- 增强 Realtime 重连与订阅恢复;完善未读角标与已读回执。
- Telemetry 引入 ClickHouse:接入 Kafka
ts.health→ 明细表 + 物化视图;前端改走 ClickHouse/Grafana。
12. 配置与文件索引(本仓库)
- Docker 一体化:
doc_chat/docker-compose.yml - MQTT Broker 配置:
doc_chat/mosquitto/config/mosquitto.conf - 网关服务:
- 代码:
server/gateway-mqtt-node/src/index.js(MQTT→Kafka/Supabase、心跳上报) - Worker:
server/gateway-mqtt-node/src/worker.js(Kafka chat.inbound → Supabase 持久化) - 环境模板:
server/gateway-mqtt-node/.env.example
- 代码:
- 聊天表结构与触发器:
doc_chat/create_chat_tables.sql - 网关运行报表:
doc_chat/create_gateway_reporting.sql - 时序选型与 ClickHouse 方案:
doc_chat/kafka_mqtt_clickhouse.md
如需我生成网关示例(Node.js/Go,鉴权+插库)或 Supabase Edge Function 模板(监听 notifications → 个推),告诉我技术栈偏好即可。
附录:按会话划分 Supabase Realtime 通道方案
背景
- 现状:前端通过单一
chat_messages订阅获取所有对话的 INSERT 事件,再由客户端根据conversation_id做过滤。 - 问题:
- 参加多个会话的用户会收到不相关会话的通知,虽然 UI 再过滤,但仍产生额外的网络和序列化开销。
- 如果后续需要对同一个 Supabase Realtime 连接做权限隔离(例如服务端使用 service key 代理订阅),需要显式限定查询范围。
设计目标
- 每个会话使用独立的 Realtime channel,通过 Postgres Changes 的
filter或event参数仅推送该会话的数据。 - 在保持单连接多订阅的基础上,动态创建/关闭频道,避免过多 WebSocket 连接。
- 兼容移动端(uni-app X)与 Web,同步维护订阅生命周期。
频道命名与过滤
| 场景 | Supabase channel 名称 | Postgres Changes 配置 |
|---|---|---|
| 会话消息 | chat:msg:<conversationId> |
{ event: "INSERT", schema: "public", table: "chat_messages", filter: "conversation_id=eq.<conversationId>" } |
| 会话通知 | chat:notify:<conversationId>(可选) |
{ event: "INSERT", schema: "public", table: "chat_notifications", filter: "conversation_id=eq.<conversationId>" } |
命名规则保留
chat:前缀,方便统一管理;conversationId建议做 URL encode 避免特殊字符。
前端改造步骤
- 订阅管理器:在
ChatDataService.ensureRealtime()基础上,增加ensureConversationChannel(conversationId):- 若已有
convChannels[conversationId]直接返回。 - 否则调用
realtime.channel(name),并使用channel.on('postgres_changes', {...})设置监听。 - 调用
channel.subscribe(),成功后写入 Map;失败时重试或回退为 table 级订阅。
- 若已有
- 进入会话:
room.uvue调用ChatDataService.subscribeMessages(conversationId)时,改为:ensureConversationChannel(conversationId)。- 在 channel 回调里只派发该会话的消息(无需再过滤)。
- 退出会话:在
onUnload/onHide调用ChatDataService.releaseConversationChannel(conversationId):- 取消监听,调用
channel.unsubscribe()并从 Map 移除。 - 可设置闲置超时,避免频繁退出/进入导致频繁订阅。
- 取消监听,调用
- 多会话同时打开(例如侧边栏预览):允许同时保持多个频道订阅,根据 UI 激活状态决定是否真正渲染进入消息队列。
权限与安全
- 使用匿名/终端用户 token 连接 Realtime 时,RLS 会自动限制仅返回参与者可访问的
conversation_id。即便如此,按会话过滤可以进一步降低服务端负载。 - 若使用 service key 统一代理订阅(例如后端服务转发消息),务必在 channel 过滤项中带上目标会话 ID,避免泄露。
服务器端同步策略
- 若网关或中间层也监听 Realtime,可采用同样的 channel 命名规范,或通过
joinTopicIfNeeded('chat:msg:<id>', payload)方式统一管理。 - Channel 生命周期可和应用级订阅保持一致,并在服务器端维护引用计数,最后一个消费者离开时再真正
unsubscribe()。
回退策略
- 如果遇到 Supabase 版本限制无法使用
filter,可以退回到表级订阅,但保留 channel 命名(仍使用chat:msg:<id>),在客户端回调内做过滤。这样可以平滑升级。 - 同理,在网络不佳时可侦测订阅失败并自动切换到现有的“全量监听 + 客户端过滤”。
该方案不会改变已有聊天流程,只是优化 Realtime 的订阅粒度,实现“每个会话一个 channel”。配合现有的缓存/去重逻辑可直接落地,下一步可在 ChatDataService 中实现上述方法并编写单元测试覆盖多订阅场景。