Files
akmon/doc_chat/微通信.md
2026-01-20 08:04:15 +08:00

302 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 微通信架构与端到端流程设计MQTT/Kafka/Supabase/ClickHouse + 个推)
本方案围绕当前既有逻辑“设备/客户端通过 MQTT 上报 →Kafka 或)直接写入 Supabase → 前端 chat/index 接收 → 前端发送消息至 MQTT 并写入 Supabase”进行系统化设计覆盖后端组件、数据流、推送、可靠性与前端实现要点。
## 1. 总览
- 目标
- 低延迟、多终端的聊天与事件通知,既支持 App/Web也支持物联设备/边缘端。
- 统一消息落库PostgreSQL on Supabase+ 实时分发Supabase Realtime / MQTT
- 推送采用“个推”在 App 后台或离线时补充通知到达率。
- 核心组件
- MQTT BrokerEMQX/Mosquitto收消息/发消息的设备级/轻端入口QoS≥1。
- 网关服务 gateway-mqtt可选内置 Kafka Bridge
- 认证/ACL、协议转换、消息校验与标准化。
- 转发到 Kafkachat.inbound或直接调用 SupabaseREST/RPC/pg落库。
- Kafka可选/增强):
- 主题chat.inbound入口、chat.persisted落库成功后、chat.push通知任务
- 解耦高峰、可回放、做扩展处理(统计、风控、审计)。
- SupabasePostgres + Realtime + Edge Functions
-chat_conversations / chat_participants / chat_messages / chat_notifications已建表`doc_chat/create_chat_tables.sql`)。
- 触发器chat_on_message_insert更新 last_message_at + 写通知)。
- RLS按参与者隔离访问已配置
- Realtime前端订阅消息/通知的实时流。
- ClickHouse列存/OLAP/时序聚合):
- 承载健康/传感类高频时序数据(大跨度查询、长保留、低成本)。
- 与 Kafka/Telegraf 对接,建立明细表 + 物化视图做分钟/小时聚合。
- 网关运行报表Supabase 内):
- 表:`gateway_nodes``gateway_heartbeats`(见 `doc_chat/create_gateway_reporting.sql`)。
- Node 网关周期写入心跳与计数,供看板与告警使用。
- push-notify-worker个推
- 通过 Supabase Webhook/Edge Function 或 Kafka chat.push 触发,调用个推 API 下发推送。
## 2. 事件/消息模型
采用统一 envelope参考 CloudEvents 思想):
```json
{
"id": "<uuid>", // 客户端生成/网关补齐(用于幂等)
"ts": "2025-09-23T10:00:00Z", // ISO 时间
"type": "chat.message", // 事件类型
"source": "mqtt|web|server", // 来源
"conversation_id": "...",
"sender_id": "...",
"content": "...", // 文本或 URL音频/文件)
"content_type": "text|audio|image|file|markdown|json",
"metadata": {"duration_ms": 1234, "mime": "audio/mpeg", "size": 102400}
}
```
- 客户端发送到 MQTT 主题时携带此结构;
- 网关校验/补齐后落库为 chat_messages 的一行content/metadata 等字段对应)。
另:健康/传感时序Telemetry建议采用“轻 envelope + 列存模型”的二段式:
```json
{
"id": "<uuid>",
"ts": "2025-09-23T10:00:00Z",
"type": "ts.metric",
"user_id": "...",
"device_id": "...",
"metric": "hr|spo2|temp|bp_systolic|bp_diastolic|steps",
"value": 72.0,
"meta": { "unit": "bpm", "src": "ble" }
}
```
由网关投递至 Kafka `ts.health`,并落地 ClickHouse `health_raw`(示例见 `doc_chat/kafka_mqtt_clickhouse.md`)。
## 3. 主题与分区规划MQTT/Kafka
- MQTT 主题
- chat/send/{conversationId}:客户端→服务端,发送消息入口。
- chat/recv/{conversationId}:服务端→客户端,转发已确认写库的消息(可选)。
- presence/{userId}:在线状态(使用 Last Will 设置离线告警)。
- ack/{messageId}/{userId}(可选):消息已达/已读 ACK 路由。
- 建议 QoS1至少一次禁用 retain聊天消息不保留presence 可保留。
- ACL
- 仅参与者可发布 chat/send/{conversationId}
- 仅参与者可订阅 chat/recv/{conversationId}
- 用户仅能发布/订阅自身 presence/{userId} 与 ack 自己路由。
- Kafka 主题(可选)
- chat.inboundkey=conversationId保证同会话有序
- chat.persisted写库成功后的广播用于下游计算或转发
- chat.push推送任务。
- ts.healthkey=user_id 或 device_id健康/传感数据入口,供 ClickHouse 消费。
## 4. 后端流程设计
```text
[Client/Device]
| MQTT publish chat/send/{conversationId}
v
[gateway-mqtt]
| 校验/标准化/幂等
|--(A) 直写 Supabase PostgREST/Edge Function --> [Postgres]
|--(B) 投递 Kafka chat.inbound ---------------> [persist-worker -> Postgres]
|(trigger) -> chat_notifications
|(optional) -> MQTT chat/recv/{conversationId}
| -> Kafka chat.push
v
[push-notify-worker -> 个推]
---- Telemetry 时序流 ----
|--(T1) 网关将时序上报写入 Kafka ts.health ------> [ClickHouse 明细表 health_raw]
|--(T2) 物化/降采样视图做分钟/小时聚合 ----------> [Grafana 可视化]
|--(T3) 最近快照/告警回写 Supabase可选 ------> [user_metrics_last / alerts]
```
### 4.1 MQTT → Supabase入口
1) 客户端/设备 发布到 chat/send/{conversationId}QoS1
2) gateway-mqtt 收到消息:
- 校验 JWT/设备证书,鉴权是否会话参与者;
- 解析 envelope若无 id 则补 uuid
- 幂等检查可选Redis setNx(messageId) 窗口 24h
3) 落库:
- 方案 A直写 Supabase PostgRESTchat_messages.insert或 RPC。
- 方案 B写入 Kafka chat.inbound → persist-worker 落库(高峰更稳)。
4) Postgres 触发器 chat_on_message_insert
- 更新 chat_conversations.last_message_at
- 插入 chat_notifications除发送者外所有参与者
5) 可选:服务端再发布到 MQTT chat/recv/{conversationId}(供仅连 MQTT 的轻端即时收到),或依赖 Supabase Realtime 给连 H5/App 的用户推送。
### 4.2 Supabase → 个推(通知)
- 触发方式:
- Edge Function 订阅 Postgres Changes或 Supabase Webhook监听 chat_notifications insert
- 或 Kafka chat.push 消费者。
- push-notify-worker
- 查询 userId 的个推 cid
- 生成通知文案(可带会话标题、消息摘要、角标);
- 调用个推 API 下发;记录投递结果(可回写到一个 delivery_logs 表)。
### 4.3 前端发送chat/index
1) 音频/附件:先 S3 预签名上传,获得 URL 与 metadata。
2) 生成消息 iduuid组装 envelope。
3) 并行/串行两路:
- 发布到 MQTT chat/send/{conversationId}(低延迟通知网关/其他 MQTT 客户端)。
- 调用 Supabase 插入 chat_messages确保落库
4) UI 乐观更新;若落库失败则回滚/重试。
### 4.4 前端接收
- H5/App连 Supabase Realtime
- 订阅 chat_messagesINSERT与 chat_notificationsINSERT
- 房间内做去重(根据 id已在 room.uvue 实现)。
- 仅 MQTT 客户端:
- 订阅 chat/recv/{conversationId} 主题获取已确认消息(可由网关回发)。
### 4.5 Telemetry → ClickHouse健康/传感数据)
1) 设备/网关 MQTT 上报(可以复用同一 broker不同主题前缀如 `ts/send/{deviceId}`)。
2) gateway-mqtt 将清洗后的数据写入 Kafka `ts.health`key 使用 user_id 或 device_id 保证局部有序)。
3) ClickHouse 落地:明细表 `health_raw`MergeTree按月份分区TTL 保留),并建立 1m/1h 物化视图。
4) 前端时序图Grafana 直连 ClickHouse应用内查询可通过后端代理统一鉴权参见 `kafka_mqtt_clickhouse.md`)。
5) 最近快照/告警回写 Supabase`user_metrics_last`/`alerts`),与业务数据/权限体系对齐。
## 5. 推送(个推)策略
- 触发:插入 chat_notifications 后对目标用户触发推送。
- 去重:个推透传携带 message_id/conversation_id客户端去重。
- 点击动作:深链到 pages/sport/chat/room?conversation_id=xxx。
- 频率限制:同会话短时多条合并为“你有 X 条新消息”。
- 前台/后台判定:前台不推或用本地通知;后台/离线才用个推。
## 6. 安全与权限
- MQTT 认证JWT携带 user_id、过期时间或 TLS 证书ACL 映射会话参与者。
- Supabase RLS已基于 chat_participants确保 Edge Function 走 service role 仅内部调用。
- S3 附件:仅用预签名上传;公有读或 CDN 鉴权按需配置。
- Idempotencymessage_id 唯一约束Postgres 可加唯一索引),网关 Redis 幂等窗。
- TelemetryClickHouse 作为原始明细库不直接暴露到前端;前端通过后端代理或预聚合接口访问,鉴权仍以 Supabase 用户为中心。
## 7. 可靠性与一致性
- 顺序:以 conversationId 为 key 的 Kafka 分区(或直接依赖单库顺序 + created_at 排序)。
- 重试:
- 前端 AkReq 已支持网络重试;
- 网关→Supabase 写库失败使用指数退避重试;
- push 失败重试并退避;
- 断线:
- MQTT QoS1 + Clean Session
- 前端 Realtime 自动重连(建议在 AkSupaRealtime 增加重连与订阅恢复)。
- 未读计数:
- 客户端进入会话更新 chat_participants.last_read_at
- 会话列表通过 last_message_at 与 last_read_at 计算角标(服务端或客户端)。
- 网关运行健康:
- Node 网关定期上报 `gateway_heartbeats`CPU/内存/连接状态/消息计数/错误等)。
- 提供 `gateway_status_latest` 视图与 `gateway_daily_stats` 物化视图做看板/告警。
## 8. 与现有表结构的映射
- chat_messages.content 存文本或 S3 URLcontent_type 包含 "audio"(已支持)。
- chat_on_message_insert 已生成 chat_notifications供 Realtime 与个推使用。
- 参与者列表通过 chat_participants 查询;权限与 RLS 已覆盖。
- 网关运行报表:
- `gateway_nodes`:注册网关(以 `mqtt_client_id` 作为唯一标识)。
- `gateway_heartbeats`:周期心跳与计数;`gateway_status_latest` 展示最近一次状态。
- SQL 见 `doc_chat/create_gateway_reporting.sql`
## 9. 前端实现要点(结合现有代码)
- 单 WS 连接多订阅:`ChatDataService.ensureRealtime()` 已实现。
- 房间消息去重:`room.uvue` 已按 id 去重。
- 录音与上传:`AudioUploadService` 预签名 + multipart返回 URL 后调用 `sendAudioMessage()`
- 发送路径:保持 MQTT 发布 + Supabase 插入的“双写”,若担心两者不一致,可切换为“仅插库 → 后端转发到 MQTT”。
- 订阅释放:路由切换时调用 `dispose()``closeRealtime()` 清理。
- Telemetry 展示:趋势/历史直接走 ClickHouseGrafana 或后端代理 API业务侧角标/卡片展示用 Supabase 快照表(低延迟)。
## 10. 运维与监控
- 网关与推送服务加 Prometheus 指标QPS、失败率、延时、重试数。
- Kafka Lag 监控(若使用 Kafka
- Postgres 指标与 Realtime 通道数监控。
- ClickHouse插入延迟、分区大小、查询耗时、物化视图滞后Grafana 有现成数据源。
- 个推调用成功率,退避与熔断策略。
- 统一 compose 部署(见 `doc_chat/docker-compose.yml``kafka``redis``clickhouse``grafana``mqtt` 一体化联调。
## 11. 最小可行落地顺序
1) 网关直写 Supabase跳过 Kafka完成 MQTT → DB → Realtime 闭环;
2) 接入个推:从 chat_notifications 插入触发;
3) 加上 MQTT 回发chat/recv供仅 MQTT 客户端;
4) 引入 Kafka 做解耦与扩展(如需要);
5) 增强 Realtime 重连与订阅恢复;完善未读角标与已读回执。
6) Telemetry 引入 ClickHouse接入 Kafka `ts.health` → 明细表 + 物化视图;前端改走 ClickHouse/Grafana。
## 12. 配置与文件索引(本仓库)
- Docker 一体化:`doc_chat/docker-compose.yml`
- MQTT Broker 配置:`doc_chat/mosquitto/config/mosquitto.conf`
- 网关服务:
- 代码:`server/gateway-mqtt-node/src/index.js`MQTT→Kafka/Supabase、心跳上报
- Worker`server/gateway-mqtt-node/src/worker.js`Kafka chat.inbound → Supabase 持久化)
- 环境模板:`server/gateway-mqtt-node/.env.example`
- 聊天表结构与触发器:`doc_chat/create_chat_tables.sql`
- 网关运行报表:`doc_chat/create_gateway_reporting.sql`
- 时序选型与 ClickHouse 方案:`doc_chat/kafka_mqtt_clickhouse.md`
---
如需我生成网关示例Node.js/Go鉴权+插库)或 Supabase Edge Function 模板(监听 notifications → 个推),告诉我技术栈偏好即可。
---
## 附录:按会话划分 Supabase Realtime 通道方案
### 背景
- 现状:前端通过单一 `chat_messages` 订阅获取所有对话的 INSERT 事件,再由客户端根据 `conversation_id` 做过滤。
- 问题:
- 参加多个会话的用户会收到不相关会话的通知,虽然 UI 再过滤,但仍产生额外的网络和序列化开销。
- 如果后续需要对同一个 Supabase Realtime 连接做权限隔离(例如服务端使用 service key 代理订阅),需要显式限定查询范围。
### 设计目标
1. 每个会话使用独立的 Realtime channel通过 Postgres Changes 的 `filter``event` 参数仅推送该会话的数据。
2. 在保持单连接多订阅的基础上,动态创建/关闭频道,避免过多 WebSocket 连接。
3. 兼容移动端uni-app X与 Web同步维护订阅生命周期。
### 频道命名与过滤
| 场景 | Supabase channel 名称 | Postgres Changes 配置 |
|------|------------------------|------------------------|
| 会话消息 | `chat:msg:<conversationId>` | `{ event: "INSERT", schema: "public", table: "chat_messages", filter: "conversation_id=eq.<conversationId>" }` |
| 会话通知 | `chat:notify:<conversationId>`(可选) | `{ event: "INSERT", schema: "public", table: "chat_notifications", filter: "conversation_id=eq.<conversationId>" }` |
> 命名规则保留 `chat:` 前缀,方便统一管理;`conversationId` 建议做 URL encode 避免特殊字符。
### 前端改造步骤
1. **订阅管理器**:在 `ChatDataService.ensureRealtime()` 基础上,增加 `ensureConversationChannel(conversationId)`
- 若已有 `convChannels[conversationId]` 直接返回。
- 否则调用 `realtime.channel(name)`,并使用 `channel.on('postgres_changes', {...})` 设置监听。
- 调用 `channel.subscribe()`,成功后写入 Map失败时重试或回退为 table 级订阅。
2. **进入会话**`room.uvue` 调用 `ChatDataService.subscribeMessages(conversationId)` 时,改为:
- `ensureConversationChannel(conversationId)`
- 在 channel 回调里只派发该会话的消息(无需再过滤)。
3. **退出会话**:在 `onUnload/onHide` 调用 `ChatDataService.releaseConversationChannel(conversationId)`
- 取消监听,调用 `channel.unsubscribe()` 并从 Map 移除。
- 可设置闲置超时,避免频繁退出/进入导致频繁订阅。
4. **多会话同时打开**(例如侧边栏预览):允许同时保持多个频道订阅,根据 UI 激活状态决定是否真正渲染进入消息队列。
### 权限与安全
- 使用匿名/终端用户 token 连接 Realtime 时RLS 会自动限制仅返回参与者可访问的 `conversation_id`。即便如此,按会话过滤可以进一步降低服务端负载。
- 若使用 service key 统一代理订阅(例如后端服务转发消息),务必在 channel 过滤项中带上目标会话 ID避免泄露。
### 服务器端同步策略
- 若网关或中间层也监听 Realtime可采用同样的 channel 命名规范,或通过 `joinTopicIfNeeded('chat:msg:<id>', payload)` 方式统一管理。
- Channel 生命周期可和应用级订阅保持一致,并在服务器端维护引用计数,最后一个消费者离开时再真正 `unsubscribe()`
### 回退策略
- 如果遇到 Supabase 版本限制无法使用 `filter`,可以退回到表级订阅,但保留 channel 命名(仍使用 `chat:msg:<id>`),在客户端回调内做过滤。这样可以平滑升级。
- 同理,在网络不佳时可侦测订阅失败并自动切换到现有的“全量监听 + 客户端过滤”。
---
该方案不会改变已有聊天流程,只是优化 Realtime 的订阅粒度,实现“每个会话一个 channel”。配合现有的缓存/去重逻辑可直接落地,下一步可在 `ChatDataService` 中实现上述方法并编写单元测试覆盖多订阅场景。