Files
akmon/doc_chat/微通信.md
2026-01-20 08:04:15 +08:00

16 KiB
Raw Permalink Blame History

微通信架构与端到端流程设计MQTT/Kafka/Supabase/ClickHouse + 个推)

本方案围绕当前既有逻辑“设备/客户端通过 MQTT 上报 →Kafka 或)直接写入 Supabase → 前端 chat/index 接收 → 前端发送消息至 MQTT 并写入 Supabase”进行系统化设计覆盖后端组件、数据流、推送、可靠性与前端实现要点。

1. 总览

  • 目标
    • 低延迟、多终端的聊天与事件通知,既支持 App/Web也支持物联设备/边缘端。
    • 统一消息落库PostgreSQL on Supabase+ 实时分发Supabase Realtime / MQTT
    • 推送采用“个推”在 App 后台或离线时补充通知到达率。
  • 核心组件
    • MQTT BrokerEMQX/Mosquitto收消息/发消息的设备级/轻端入口QoS≥1。
    • 网关服务 gateway-mqtt可选内置 Kafka Bridge
      • 认证/ACL、协议转换、消息校验与标准化。
      • 转发到 Kafkachat.inbound或直接调用 SupabaseREST/RPC/pg落库。
    • Kafka可选/增强):
      • 主题chat.inbound入口、chat.persisted落库成功后、chat.push通知任务
      • 解耦高峰、可回放、做扩展处理(统计、风控、审计)。
    • SupabasePostgres + Realtime + Edge Functions
      • chat_conversations / chat_participants / chat_messages / chat_notifications已建表doc_chat/create_chat_tables.sql)。
      • 触发器chat_on_message_insert更新 last_message_at + 写通知)。
      • RLS按参与者隔离访问已配置
      • Realtime前端订阅消息/通知的实时流。
    • ClickHouse列存/OLAP/时序聚合):
      • 承载健康/传感类高频时序数据(大跨度查询、长保留、低成本)。
      • 与 Kafka/Telegraf 对接,建立明细表 + 物化视图做分钟/小时聚合。
    • 网关运行报表Supabase 内):
      • 表:gateway_nodesgateway_heartbeats(见 doc_chat/create_gateway_reporting.sql)。
      • Node 网关周期写入心跳与计数,供看板与告警使用。
    • push-notify-worker个推
      • 通过 Supabase Webhook/Edge Function 或 Kafka chat.push 触发,调用个推 API 下发推送。

2. 事件/消息模型

采用统一 envelope参考 CloudEvents 思想):

{
  "id": "<uuid>",              // 客户端生成/网关补齐(用于幂等)
  "ts": "2025-09-23T10:00:00Z", // ISO 时间
  "type": "chat.message",       // 事件类型
  "source": "mqtt|web|server",  // 来源
  "conversation_id": "...",
  "sender_id": "...",
  "content": "...",            // 文本或 URL音频/文件)
  "content_type": "text|audio|image|file|markdown|json",
  "metadata": {"duration_ms": 1234, "mime": "audio/mpeg", "size": 102400}
}
  • 客户端发送到 MQTT 主题时携带此结构;
  • 网关校验/补齐后落库为 chat_messages 的一行content/metadata 等字段对应)。

另:健康/传感时序Telemetry建议采用“轻 envelope + 列存模型”的二段式:

{
  "id": "<uuid>",
  "ts": "2025-09-23T10:00:00Z",
  "type": "ts.metric",
  "user_id": "...",
  "device_id": "...",
  "metric": "hr|spo2|temp|bp_systolic|bp_diastolic|steps",
  "value": 72.0,
  "meta": { "unit": "bpm", "src": "ble" }
}

由网关投递至 Kafka ts.health,并落地 ClickHouse health_raw(示例见 doc_chat/kafka_mqtt_clickhouse.md)。

3. 主题与分区规划MQTT/Kafka

  • MQTT 主题
    • chat/send/{conversationId}:客户端→服务端,发送消息入口。
    • chat/recv/{conversationId}:服务端→客户端,转发已确认写库的消息(可选)。
    • presence/{userId}:在线状态(使用 Last Will 设置离线告警)。
    • ack/{messageId}/{userId}(可选):消息已达/已读 ACK 路由。
    • 建议 QoS1至少一次禁用 retain聊天消息不保留presence 可保留。
    • ACL
      • 仅参与者可发布 chat/send/{conversationId}
      • 仅参与者可订阅 chat/recv/{conversationId}
      • 用户仅能发布/订阅自身 presence/{userId} 与 ack 自己路由。
  • Kafka 主题(可选)
    • chat.inboundkey=conversationId保证同会话有序
    • chat.persisted写库成功后的广播用于下游计算或转发
    • chat.push推送任务。
    • ts.healthkey=user_id 或 device_id健康/传感数据入口,供 ClickHouse 消费。

4. 后端流程设计

[Client/Device]
   |  MQTT publish chat/send/{conversationId}
   v
[gateway-mqtt]
   |  校验/标准化/幂等
   |--(A) 直写 Supabase PostgREST/Edge Function --> [Postgres]
   |--(B) 投递 Kafka chat.inbound ---------------> [persist-worker -> Postgres]
                                      |(trigger)  -> chat_notifications
                                      |(optional) -> MQTT chat/recv/{conversationId}
                                      |            -> Kafka chat.push
                                      v
                                  [push-notify-worker -> 个推]

---- Telemetry 时序流 ----
  |--(T1) 网关将时序上报写入 Kafka ts.health ------> [ClickHouse 明细表 health_raw]
  |--(T2) 物化/降采样视图做分钟/小时聚合 ----------> [Grafana 可视化]
  |--(T3) 最近快照/告警回写 Supabase可选 ------> [user_metrics_last / alerts]

4.1 MQTT → Supabase入口

  1. 客户端/设备 发布到 chat/send/{conversationId}QoS1
  2. gateway-mqtt 收到消息:
    • 校验 JWT/设备证书,鉴权是否会话参与者;
    • 解析 envelope若无 id 则补 uuid
    • 幂等检查可选Redis setNx(messageId) 窗口 24h
  3. 落库:
    • 方案 A直写 Supabase PostgRESTchat_messages.insert或 RPC。
    • 方案 B写入 Kafka chat.inbound → persist-worker 落库(高峰更稳)。
  4. Postgres 触发器 chat_on_message_insert
    • 更新 chat_conversations.last_message_at
    • 插入 chat_notifications除发送者外所有参与者
  5. 可选:服务端再发布到 MQTT chat/recv/{conversationId}(供仅连 MQTT 的轻端即时收到),或依赖 Supabase Realtime 给连 H5/App 的用户推送。

4.2 Supabase → 个推(通知)

  • 触发方式:
    • Edge Function 订阅 Postgres Changes或 Supabase Webhook监听 chat_notifications insert
    • 或 Kafka chat.push 消费者。
  • push-notify-worker
    • 查询 userId 的个推 cid
    • 生成通知文案(可带会话标题、消息摘要、角标);
    • 调用个推 API 下发;记录投递结果(可回写到一个 delivery_logs 表)。

4.3 前端发送chat/index

  1. 音频/附件:先 S3 预签名上传,获得 URL 与 metadata。
  2. 生成消息 iduuid组装 envelope。
  3. 并行/串行两路:
    • 发布到 MQTT chat/send/{conversationId}(低延迟通知网关/其他 MQTT 客户端)。
    • 调用 Supabase 插入 chat_messages确保落库
  4. UI 乐观更新;若落库失败则回滚/重试。

4.4 前端接收

  • H5/App连 Supabase Realtime
    • 订阅 chat_messagesINSERT与 chat_notificationsINSERT
    • 房间内做去重(根据 id已在 room.uvue 实现)。
  • 仅 MQTT 客户端:
    • 订阅 chat/recv/{conversationId} 主题获取已确认消息(可由网关回发)。

4.5 Telemetry → ClickHouse健康/传感数据)

  1. 设备/网关 MQTT 上报(可以复用同一 broker不同主题前缀如 ts/send/{deviceId})。
  2. gateway-mqtt 将清洗后的数据写入 Kafka ts.healthkey 使用 user_id 或 device_id 保证局部有序)。
  3. ClickHouse 落地:明细表 health_rawMergeTree按月份分区TTL 保留),并建立 1m/1h 物化视图。
  4. 前端时序图Grafana 直连 ClickHouse应用内查询可通过后端代理统一鉴权参见 kafka_mqtt_clickhouse.md)。
  5. 最近快照/告警回写 Supabaseuser_metrics_last/alerts),与业务数据/权限体系对齐。

5. 推送(个推)策略

  • 触发:插入 chat_notifications 后对目标用户触发推送。
  • 去重:个推透传携带 message_id/conversation_id客户端去重。
  • 点击动作:深链到 pages/sport/chat/room?conversation_id=xxx。
  • 频率限制:同会话短时多条合并为“你有 X 条新消息”。
  • 前台/后台判定:前台不推或用本地通知;后台/离线才用个推。

6. 安全与权限

  • MQTT 认证JWT携带 user_id、过期时间或 TLS 证书ACL 映射会话参与者。
  • Supabase RLS已基于 chat_participants确保 Edge Function 走 service role 仅内部调用。
  • S3 附件:仅用预签名上传;公有读或 CDN 鉴权按需配置。
  • Idempotencymessage_id 唯一约束Postgres 可加唯一索引),网关 Redis 幂等窗。
  • TelemetryClickHouse 作为原始明细库不直接暴露到前端;前端通过后端代理或预聚合接口访问,鉴权仍以 Supabase 用户为中心。

7. 可靠性与一致性

  • 顺序:以 conversationId 为 key 的 Kafka 分区(或直接依赖单库顺序 + created_at 排序)。
  • 重试:
    • 前端 AkReq 已支持网络重试;
    • 网关→Supabase 写库失败使用指数退避重试;
    • push 失败重试并退避;
  • 断线:
    • MQTT QoS1 + Clean Session
    • 前端 Realtime 自动重连(建议在 AkSupaRealtime 增加重连与订阅恢复)。
  • 未读计数:
    • 客户端进入会话更新 chat_participants.last_read_at
    • 会话列表通过 last_message_at 与 last_read_at 计算角标(服务端或客户端)。
  • 网关运行健康:
    • Node 网关定期上报 gateway_heartbeatsCPU/内存/连接状态/消息计数/错误等)。
    • 提供 gateway_status_latest 视图与 gateway_daily_stats 物化视图做看板/告警。

8. 与现有表结构的映射

  • chat_messages.content 存文本或 S3 URLcontent_type 包含 "audio"(已支持)。
  • chat_on_message_insert 已生成 chat_notifications供 Realtime 与个推使用。
  • 参与者列表通过 chat_participants 查询;权限与 RLS 已覆盖。
  • 网关运行报表:
    • gateway_nodes:注册网关(以 mqtt_client_id 作为唯一标识)。
    • gateway_heartbeats:周期心跳与计数;gateway_status_latest 展示最近一次状态。
    • SQL 见 doc_chat/create_gateway_reporting.sql

9. 前端实现要点(结合现有代码)

  • 单 WS 连接多订阅:ChatDataService.ensureRealtime() 已实现。
  • 房间消息去重:room.uvue 已按 id 去重。
  • 录音与上传:AudioUploadService 预签名 + multipart返回 URL 后调用 sendAudioMessage()
  • 发送路径:保持 MQTT 发布 + Supabase 插入的“双写”,若担心两者不一致,可切换为“仅插库 → 后端转发到 MQTT”。
  • 订阅释放:路由切换时调用 dispose()closeRealtime() 清理。
  • Telemetry 展示:趋势/历史直接走 ClickHouseGrafana 或后端代理 API业务侧角标/卡片展示用 Supabase 快照表(低延迟)。

10. 运维与监控

  • 网关与推送服务加 Prometheus 指标QPS、失败率、延时、重试数。
  • Kafka Lag 监控(若使用 Kafka
  • Postgres 指标与 Realtime 通道数监控。
  • ClickHouse插入延迟、分区大小、查询耗时、物化视图滞后Grafana 有现成数据源。
  • 个推调用成功率,退避与熔断策略。
  • 统一 compose 部署(见 doc_chat/docker-compose.ymlkafkaredisclickhousegrafanamqtt 一体化联调。

11. 最小可行落地顺序

  1. 网关直写 Supabase跳过 Kafka完成 MQTT → DB → Realtime 闭环;
  2. 接入个推:从 chat_notifications 插入触发;
  3. 加上 MQTT 回发chat/recv供仅 MQTT 客户端;
  4. 引入 Kafka 做解耦与扩展(如需要);
  5. 增强 Realtime 重连与订阅恢复;完善未读角标与已读回执。
  6. Telemetry 引入 ClickHouse接入 Kafka ts.health → 明细表 + 物化视图;前端改走 ClickHouse/Grafana。

12. 配置与文件索引(本仓库)

  • Docker 一体化:doc_chat/docker-compose.yml
  • MQTT Broker 配置:doc_chat/mosquitto/config/mosquitto.conf
  • 网关服务:
    • 代码:server/gateway-mqtt-node/src/index.jsMQTT→Kafka/Supabase、心跳上报
    • Workerserver/gateway-mqtt-node/src/worker.jsKafka chat.inbound → Supabase 持久化)
    • 环境模板:server/gateway-mqtt-node/.env.example
  • 聊天表结构与触发器:doc_chat/create_chat_tables.sql
  • 网关运行报表:doc_chat/create_gateway_reporting.sql
  • 时序选型与 ClickHouse 方案:doc_chat/kafka_mqtt_clickhouse.md

如需我生成网关示例Node.js/Go鉴权+插库)或 Supabase Edge Function 模板(监听 notifications → 个推),告诉我技术栈偏好即可。


附录:按会话划分 Supabase Realtime 通道方案

背景

  • 现状:前端通过单一 chat_messages 订阅获取所有对话的 INSERT 事件,再由客户端根据 conversation_id 做过滤。
  • 问题:
    • 参加多个会话的用户会收到不相关会话的通知,虽然 UI 再过滤,但仍产生额外的网络和序列化开销。
    • 如果后续需要对同一个 Supabase Realtime 连接做权限隔离(例如服务端使用 service key 代理订阅),需要显式限定查询范围。

设计目标

  1. 每个会话使用独立的 Realtime channel通过 Postgres Changes 的 filterevent 参数仅推送该会话的数据。
  2. 在保持单连接多订阅的基础上,动态创建/关闭频道,避免过多 WebSocket 连接。
  3. 兼容移动端uni-app X与 Web同步维护订阅生命周期。

频道命名与过滤

场景 Supabase channel 名称 Postgres Changes 配置
会话消息 chat:msg:<conversationId> { event: "INSERT", schema: "public", table: "chat_messages", filter: "conversation_id=eq.<conversationId>" }
会话通知 chat:notify:<conversationId>(可选) { event: "INSERT", schema: "public", table: "chat_notifications", filter: "conversation_id=eq.<conversationId>" }

命名规则保留 chat: 前缀,方便统一管理;conversationId 建议做 URL encode 避免特殊字符。

前端改造步骤

  1. 订阅管理器:在 ChatDataService.ensureRealtime() 基础上,增加 ensureConversationChannel(conversationId)
    • 若已有 convChannels[conversationId] 直接返回。
    • 否则调用 realtime.channel(name),并使用 channel.on('postgres_changes', {...}) 设置监听。
    • 调用 channel.subscribe(),成功后写入 Map失败时重试或回退为 table 级订阅。
  2. 进入会话room.uvue 调用 ChatDataService.subscribeMessages(conversationId) 时,改为:
    • ensureConversationChannel(conversationId)
    • 在 channel 回调里只派发该会话的消息(无需再过滤)。
  3. 退出会话:在 onUnload/onHide 调用 ChatDataService.releaseConversationChannel(conversationId)
    • 取消监听,调用 channel.unsubscribe() 并从 Map 移除。
    • 可设置闲置超时,避免频繁退出/进入导致频繁订阅。
  4. 多会话同时打开(例如侧边栏预览):允许同时保持多个频道订阅,根据 UI 激活状态决定是否真正渲染进入消息队列。

权限与安全

  • 使用匿名/终端用户 token 连接 Realtime 时RLS 会自动限制仅返回参与者可访问的 conversation_id。即便如此,按会话过滤可以进一步降低服务端负载。
  • 若使用 service key 统一代理订阅(例如后端服务转发消息),务必在 channel 过滤项中带上目标会话 ID避免泄露。

服务器端同步策略

  • 若网关或中间层也监听 Realtime可采用同样的 channel 命名规范,或通过 joinTopicIfNeeded('chat:msg:<id>', payload) 方式统一管理。
  • Channel 生命周期可和应用级订阅保持一致,并在服务器端维护引用计数,最后一个消费者离开时再真正 unsubscribe()

回退策略

  • 如果遇到 Supabase 版本限制无法使用 filter,可以退回到表级订阅,但保留 channel 命名(仍使用 chat:msg:<id>),在客户端回调内做过滤。这样可以平滑升级。
  • 同理,在网络不佳时可侦测订阅失败并自动切换到现有的“全量监听 + 客户端过滤”。

该方案不会改变已有聊天流程,只是优化 Realtime 的订阅粒度,实现“每个会话一个 channel”。配合现有的缓存/去重逻辑可直接落地,下一步可在 ChatDataService 中实现上述方法并编写单元测试覆盖多订阅场景。