302 lines
16 KiB
Markdown
302 lines
16 KiB
Markdown
# 微通信架构与端到端流程设计(MQTT/Kafka/Supabase/ClickHouse + 个推)
|
||
|
||
本方案围绕当前既有逻辑“设备/客户端通过 MQTT 上报 →(Kafka 或)直接写入 Supabase → 前端 chat/index 接收 → 前端发送消息至 MQTT 并写入 Supabase”进行系统化设计,覆盖后端组件、数据流、推送、可靠性与前端实现要点。
|
||
|
||
## 1. 总览
|
||
|
||
- 目标
|
||
- 低延迟、多终端的聊天与事件通知,既支持 App/Web,也支持物联设备/边缘端。
|
||
- 统一消息落库(PostgreSQL on Supabase)+ 实时分发(Supabase Realtime / MQTT)。
|
||
- 推送采用“个推”在 App 后台或离线时补充通知到达率。
|
||
- 核心组件
|
||
- MQTT Broker(EMQX/Mosquitto):收消息/发消息的设备级/轻端入口,QoS≥1。
|
||
- 网关服务 gateway-mqtt(可选:内置 Kafka Bridge):
|
||
- 认证/ACL、协议转换、消息校验与标准化。
|
||
- 转发到 Kafka(chat.inbound)或直接调用 Supabase(REST/RPC/pg)落库。
|
||
- Kafka(可选/增强):
|
||
- 主题:chat.inbound(入口)、chat.persisted(落库成功后)、chat.push(通知任务)。
|
||
- 解耦高峰、可回放、做扩展处理(统计、风控、审计)。
|
||
- Supabase(Postgres + Realtime + Edge Functions):
|
||
- 表:chat_conversations / chat_participants / chat_messages / chat_notifications(已建表,见 `doc_chat/create_chat_tables.sql`)。
|
||
- 触发器:chat_on_message_insert(更新 last_message_at + 写通知)。
|
||
- RLS:按参与者隔离访问(已配置)。
|
||
- Realtime:前端订阅消息/通知的实时流。
|
||
- ClickHouse(列存/OLAP/时序聚合):
|
||
- 承载健康/传感类高频时序数据(大跨度查询、长保留、低成本)。
|
||
- 与 Kafka/Telegraf 对接,建立明细表 + 物化视图做分钟/小时聚合。
|
||
- 网关运行报表(Supabase 内):
|
||
- 表:`gateway_nodes`、`gateway_heartbeats`(见 `doc_chat/create_gateway_reporting.sql`)。
|
||
- Node 网关周期写入心跳与计数,供看板与告警使用。
|
||
- push-notify-worker(个推):
|
||
- 通过 Supabase Webhook/Edge Function 或 Kafka chat.push 触发,调用个推 API 下发推送。
|
||
|
||
## 2. 事件/消息模型
|
||
|
||
采用统一 envelope(参考 CloudEvents 思想):
|
||
|
||
```json
|
||
{
|
||
"id": "<uuid>", // 客户端生成/网关补齐(用于幂等)
|
||
"ts": "2025-09-23T10:00:00Z", // ISO 时间
|
||
"type": "chat.message", // 事件类型
|
||
"source": "mqtt|web|server", // 来源
|
||
"conversation_id": "...",
|
||
"sender_id": "...",
|
||
"content": "...", // 文本或 URL(音频/文件)
|
||
"content_type": "text|audio|image|file|markdown|json",
|
||
"metadata": {"duration_ms": 1234, "mime": "audio/mpeg", "size": 102400}
|
||
}
|
||
```
|
||
|
||
- 客户端发送到 MQTT 主题时携带此结构;
|
||
- 网关校验/补齐后落库为 chat_messages 的一行(content/metadata 等字段对应)。
|
||
|
||
另:健康/传感时序(Telemetry)建议采用“轻 envelope + 列存模型”的二段式:
|
||
|
||
```json
|
||
{
|
||
"id": "<uuid>",
|
||
"ts": "2025-09-23T10:00:00Z",
|
||
"type": "ts.metric",
|
||
"user_id": "...",
|
||
"device_id": "...",
|
||
"metric": "hr|spo2|temp|bp_systolic|bp_diastolic|steps",
|
||
"value": 72.0,
|
||
"meta": { "unit": "bpm", "src": "ble" }
|
||
}
|
||
```
|
||
由网关投递至 Kafka `ts.health`,并落地 ClickHouse `health_raw`(示例见 `doc_chat/kafka_mqtt_clickhouse.md`)。
|
||
|
||
## 3. 主题与分区规划(MQTT/Kafka)
|
||
|
||
- MQTT 主题
|
||
- chat/send/{conversationId}:客户端→服务端,发送消息入口。
|
||
- chat/recv/{conversationId}:服务端→客户端,转发已确认写库的消息(可选)。
|
||
- presence/{userId}:在线状态(使用 Last Will 设置离线告警)。
|
||
- ack/{messageId}/{userId}(可选):消息已达/已读 ACK 路由。
|
||
- 建议 QoS1(至少一次),禁用 retain(聊天消息不保留),presence 可保留。
|
||
- ACL:
|
||
- 仅参与者可发布 chat/send/{conversationId};
|
||
- 仅参与者可订阅 chat/recv/{conversationId};
|
||
- 用户仅能发布/订阅自身 presence/{userId} 与 ack 自己路由。
|
||
- Kafka 主题(可选)
|
||
- chat.inbound(key=conversationId):保证同会话有序;
|
||
- chat.persisted:写库成功后的广播,用于下游计算或转发;
|
||
- chat.push:推送任务。
|
||
- ts.health(key=user_id 或 device_id):健康/传感数据入口,供 ClickHouse 消费。
|
||
|
||
## 4. 后端流程设计
|
||
|
||
```text
|
||
[Client/Device]
|
||
| MQTT publish chat/send/{conversationId}
|
||
v
|
||
[gateway-mqtt]
|
||
| 校验/标准化/幂等
|
||
|--(A) 直写 Supabase PostgREST/Edge Function --> [Postgres]
|
||
|--(B) 投递 Kafka chat.inbound ---------------> [persist-worker -> Postgres]
|
||
|(trigger) -> chat_notifications
|
||
|(optional) -> MQTT chat/recv/{conversationId}
|
||
| -> Kafka chat.push
|
||
v
|
||
[push-notify-worker -> 个推]
|
||
|
||
---- Telemetry 时序流 ----
|
||
|--(T1) 网关将时序上报写入 Kafka ts.health ------> [ClickHouse 明细表 health_raw]
|
||
|--(T2) 物化/降采样视图做分钟/小时聚合 ----------> [Grafana 可视化]
|
||
|--(T3) 最近快照/告警回写 Supabase(可选) ------> [user_metrics_last / alerts]
|
||
```
|
||
|
||
### 4.1 MQTT → Supabase(入口)
|
||
|
||
1) 客户端/设备 发布到 chat/send/{conversationId}(QoS1)。
|
||
2) gateway-mqtt 收到消息:
|
||
- 校验 JWT/设备证书,鉴权是否会话参与者;
|
||
- 解析 envelope,若无 id 则补 uuid;
|
||
- 幂等检查(可选:Redis setNx(messageId) 窗口 24h)。
|
||
3) 落库:
|
||
- 方案 A:直写 Supabase PostgREST(chat_messages.insert)或 RPC。
|
||
- 方案 B:写入 Kafka chat.inbound → persist-worker 落库(高峰更稳)。
|
||
4) Postgres 触发器 chat_on_message_insert:
|
||
- 更新 chat_conversations.last_message_at;
|
||
- 插入 chat_notifications(除发送者外所有参与者)。
|
||
5) 可选:服务端再发布到 MQTT chat/recv/{conversationId}(供仅连 MQTT 的轻端即时收到),或依赖 Supabase Realtime 给连 H5/App 的用户推送。
|
||
|
||
### 4.2 Supabase → 个推(通知)
|
||
|
||
- 触发方式:
|
||
- Edge Function 订阅 Postgres Changes(或 Supabase Webhook)监听 chat_notifications insert;
|
||
- 或 Kafka chat.push 消费者。
|
||
- push-notify-worker:
|
||
- 查询 userId 的个推 cid;
|
||
- 生成通知文案(可带会话标题、消息摘要、角标);
|
||
- 调用个推 API 下发;记录投递结果(可回写到一个 delivery_logs 表)。
|
||
|
||
### 4.3 前端发送(chat/index)
|
||
|
||
1) 音频/附件:先 S3 预签名上传,获得 URL 与 metadata。
|
||
2) 生成消息 id(uuid),组装 envelope。
|
||
3) 并行/串行两路:
|
||
- 发布到 MQTT chat/send/{conversationId}(低延迟通知网关/其他 MQTT 客户端)。
|
||
- 调用 Supabase 插入 chat_messages(确保落库)。
|
||
4) UI 乐观更新;若落库失败则回滚/重试。
|
||
|
||
### 4.4 前端接收
|
||
|
||
- H5/App(连 Supabase Realtime):
|
||
- 订阅 chat_messages(INSERT)与 chat_notifications(INSERT)。
|
||
- 房间内做去重(根据 id,已在 room.uvue 实现)。
|
||
- 仅 MQTT 客户端:
|
||
- 订阅 chat/recv/{conversationId} 主题获取已确认消息(可由网关回发)。
|
||
|
||
### 4.5 Telemetry → ClickHouse(健康/传感数据)
|
||
|
||
1) 设备/网关 MQTT 上报(可以复用同一 broker,不同主题前缀如 `ts/send/{deviceId}`)。
|
||
2) gateway-mqtt 将清洗后的数据写入 Kafka `ts.health`(key 使用 user_id 或 device_id 保证局部有序)。
|
||
3) ClickHouse 落地:明细表 `health_raw`(MergeTree,按月份分区,TTL 保留),并建立 1m/1h 物化视图。
|
||
4) 前端时序图:Grafana 直连 ClickHouse;应用内查询可通过后端代理(统一鉴权,参见 `kafka_mqtt_clickhouse.md`)。
|
||
5) 最近快照/告警回写 Supabase(`user_metrics_last`/`alerts`),与业务数据/权限体系对齐。
|
||
|
||
## 5. 推送(个推)策略
|
||
|
||
- 触发:插入 chat_notifications 后对目标用户触发推送。
|
||
- 去重:个推透传携带 message_id/conversation_id,客户端去重。
|
||
- 点击动作:深链到 pages/sport/chat/room?conversation_id=xxx。
|
||
- 频率限制:同会话短时多条合并为“你有 X 条新消息”。
|
||
- 前台/后台判定:前台不推或用本地通知;后台/离线才用个推。
|
||
|
||
## 6. 安全与权限
|
||
|
||
- MQTT 认证:JWT(携带 user_id、过期时间)或 TLS 证书;ACL 映射会话参与者。
|
||
- Supabase RLS:已基于 chat_participants;确保 Edge Function 走 service role 仅内部调用。
|
||
- S3 附件:仅用预签名上传;公有读或 CDN 鉴权按需配置。
|
||
- Idempotency:message_id 唯一约束(Postgres 可加唯一索引),网关 Redis 幂等窗。
|
||
- Telemetry:ClickHouse 作为原始明细库不直接暴露到前端;前端通过后端代理或预聚合接口访问,鉴权仍以 Supabase 用户为中心。
|
||
|
||
## 7. 可靠性与一致性
|
||
|
||
- 顺序:以 conversationId 为 key 的 Kafka 分区(或直接依赖单库顺序 + created_at 排序)。
|
||
- 重试:
|
||
- 前端 AkReq 已支持网络重试;
|
||
- 网关→Supabase 写库失败使用指数退避重试;
|
||
- push 失败重试并退避;
|
||
- 断线:
|
||
- MQTT QoS1 + Clean Session;
|
||
- 前端 Realtime 自动重连(建议在 AkSupaRealtime 增加重连与订阅恢复)。
|
||
- 未读计数:
|
||
- 客户端进入会话更新 chat_participants.last_read_at;
|
||
- 会话列表通过 last_message_at 与 last_read_at 计算角标(服务端或客户端)。
|
||
- 网关运行健康:
|
||
- Node 网关定期上报 `gateway_heartbeats`(CPU/内存/连接状态/消息计数/错误等)。
|
||
- 提供 `gateway_status_latest` 视图与 `gateway_daily_stats` 物化视图做看板/告警。
|
||
|
||
## 8. 与现有表结构的映射
|
||
|
||
- chat_messages.content 存文本或 S3 URL;content_type 包含 "audio"(已支持)。
|
||
- chat_on_message_insert 已生成 chat_notifications,供 Realtime 与个推使用。
|
||
- 参与者列表通过 chat_participants 查询;权限与 RLS 已覆盖。
|
||
- 网关运行报表:
|
||
- `gateway_nodes`:注册网关(以 `mqtt_client_id` 作为唯一标识)。
|
||
- `gateway_heartbeats`:周期心跳与计数;`gateway_status_latest` 展示最近一次状态。
|
||
- SQL 见 `doc_chat/create_gateway_reporting.sql`。
|
||
|
||
## 9. 前端实现要点(结合现有代码)
|
||
|
||
- 单 WS 连接多订阅:`ChatDataService.ensureRealtime()` 已实现。
|
||
- 房间消息去重:`room.uvue` 已按 id 去重。
|
||
- 录音与上传:`AudioUploadService` 预签名 + multipart,返回 URL 后调用 `sendAudioMessage()`。
|
||
- 发送路径:保持 MQTT 发布 + Supabase 插入的“双写”,若担心两者不一致,可切换为“仅插库 → 后端转发到 MQTT”。
|
||
- 订阅释放:路由切换时调用 `dispose()` 与 `closeRealtime()` 清理。
|
||
- Telemetry 展示:趋势/历史直接走 ClickHouse(Grafana 或后端代理 API);业务侧角标/卡片展示用 Supabase 快照表(低延迟)。
|
||
|
||
## 10. 运维与监控
|
||
|
||
- 网关与推送服务加 Prometheus 指标:QPS、失败率、延时、重试数。
|
||
- Kafka Lag 监控(若使用 Kafka)。
|
||
- Postgres 指标与 Realtime 通道数监控。
|
||
- ClickHouse:插入延迟、分区大小、查询耗时、物化视图滞后;Grafana 有现成数据源。
|
||
- 个推调用成功率,退避与熔断策略。
|
||
- 统一 compose 部署(见 `doc_chat/docker-compose.yml`):`kafka`、`redis`、`clickhouse`、`grafana`、`mqtt` 一体化联调。
|
||
|
||
## 11. 最小可行落地顺序
|
||
|
||
1) 网关直写 Supabase(跳过 Kafka),完成 MQTT → DB → Realtime 闭环;
|
||
2) 接入个推:从 chat_notifications 插入触发;
|
||
3) 加上 MQTT 回发(chat/recv)供仅 MQTT 客户端;
|
||
4) 引入 Kafka 做解耦与扩展(如需要);
|
||
5) 增强 Realtime 重连与订阅恢复;完善未读角标与已读回执。
|
||
6) Telemetry 引入 ClickHouse:接入 Kafka `ts.health` → 明细表 + 物化视图;前端改走 ClickHouse/Grafana。
|
||
|
||
## 12. 配置与文件索引(本仓库)
|
||
|
||
- Docker 一体化:`doc_chat/docker-compose.yml`
|
||
- MQTT Broker 配置:`doc_chat/mosquitto/config/mosquitto.conf`
|
||
- 网关服务:
|
||
- 代码:`server/gateway-mqtt-node/src/index.js`(MQTT→Kafka/Supabase、心跳上报)
|
||
- Worker:`server/gateway-mqtt-node/src/worker.js`(Kafka chat.inbound → Supabase 持久化)
|
||
- 环境模板:`server/gateway-mqtt-node/.env.example`
|
||
- 聊天表结构与触发器:`doc_chat/create_chat_tables.sql`
|
||
- 网关运行报表:`doc_chat/create_gateway_reporting.sql`
|
||
- 时序选型与 ClickHouse 方案:`doc_chat/kafka_mqtt_clickhouse.md`
|
||
|
||
---
|
||
如需我生成网关示例(Node.js/Go,鉴权+插库)或 Supabase Edge Function 模板(监听 notifications → 个推),告诉我技术栈偏好即可。
|
||
|
||
---
|
||
|
||
## 附录:按会话划分 Supabase Realtime 通道方案
|
||
|
||
### 背景
|
||
|
||
- 现状:前端通过单一 `chat_messages` 订阅获取所有对话的 INSERT 事件,再由客户端根据 `conversation_id` 做过滤。
|
||
- 问题:
|
||
- 参加多个会话的用户会收到不相关会话的通知,虽然 UI 再过滤,但仍产生额外的网络和序列化开销。
|
||
- 如果后续需要对同一个 Supabase Realtime 连接做权限隔离(例如服务端使用 service key 代理订阅),需要显式限定查询范围。
|
||
|
||
### 设计目标
|
||
|
||
1. 每个会话使用独立的 Realtime channel,通过 Postgres Changes 的 `filter` 或 `event` 参数仅推送该会话的数据。
|
||
2. 在保持单连接多订阅的基础上,动态创建/关闭频道,避免过多 WebSocket 连接。
|
||
3. 兼容移动端(uni-app X)与 Web,同步维护订阅生命周期。
|
||
|
||
### 频道命名与过滤
|
||
|
||
| 场景 | Supabase channel 名称 | Postgres Changes 配置 |
|
||
|------|------------------------|------------------------|
|
||
| 会话消息 | `chat:msg:<conversationId>` | `{ event: "INSERT", schema: "public", table: "chat_messages", filter: "conversation_id=eq.<conversationId>" }` |
|
||
| 会话通知 | `chat:notify:<conversationId>`(可选) | `{ event: "INSERT", schema: "public", table: "chat_notifications", filter: "conversation_id=eq.<conversationId>" }` |
|
||
|
||
> 命名规则保留 `chat:` 前缀,方便统一管理;`conversationId` 建议做 URL encode 避免特殊字符。
|
||
|
||
### 前端改造步骤
|
||
|
||
1. **订阅管理器**:在 `ChatDataService.ensureRealtime()` 基础上,增加 `ensureConversationChannel(conversationId)`:
|
||
- 若已有 `convChannels[conversationId]` 直接返回。
|
||
- 否则调用 `realtime.channel(name)`,并使用 `channel.on('postgres_changes', {...})` 设置监听。
|
||
- 调用 `channel.subscribe()`,成功后写入 Map;失败时重试或回退为 table 级订阅。
|
||
2. **进入会话**:`room.uvue` 调用 `ChatDataService.subscribeMessages(conversationId)` 时,改为:
|
||
- `ensureConversationChannel(conversationId)`。
|
||
- 在 channel 回调里只派发该会话的消息(无需再过滤)。
|
||
3. **退出会话**:在 `onUnload/onHide` 调用 `ChatDataService.releaseConversationChannel(conversationId)`:
|
||
- 取消监听,调用 `channel.unsubscribe()` 并从 Map 移除。
|
||
- 可设置闲置超时,避免频繁退出/进入导致频繁订阅。
|
||
4. **多会话同时打开**(例如侧边栏预览):允许同时保持多个频道订阅,根据 UI 激活状态决定是否真正渲染进入消息队列。
|
||
|
||
### 权限与安全
|
||
|
||
- 使用匿名/终端用户 token 连接 Realtime 时,RLS 会自动限制仅返回参与者可访问的 `conversation_id`。即便如此,按会话过滤可以进一步降低服务端负载。
|
||
- 若使用 service key 统一代理订阅(例如后端服务转发消息),务必在 channel 过滤项中带上目标会话 ID,避免泄露。
|
||
|
||
### 服务器端同步策略
|
||
|
||
- 若网关或中间层也监听 Realtime,可采用同样的 channel 命名规范,或通过 `joinTopicIfNeeded('chat:msg:<id>', payload)` 方式统一管理。
|
||
- Channel 生命周期可和应用级订阅保持一致,并在服务器端维护引用计数,最后一个消费者离开时再真正 `unsubscribe()`。
|
||
|
||
### 回退策略
|
||
|
||
- 如果遇到 Supabase 版本限制无法使用 `filter`,可以退回到表级订阅,但保留 channel 命名(仍使用 `chat:msg:<id>`),在客户端回调内做过滤。这样可以平滑升级。
|
||
- 同理,在网络不佳时可侦测订阅失败并自动切换到现有的“全量监听 + 客户端过滤”。
|
||
|
||
---
|
||
该方案不会改变已有聊天流程,只是优化 Realtime 的订阅粒度,实现“每个会话一个 channel”。配合现有的缓存/去重逻辑可直接落地,下一步可在 `ChatDataService` 中实现上述方法并编写单元测试覆盖多订阅场景。
|