Files
akmon/doc_chat/emqx配置.md
2026-01-20 08:04:15 +08:00

7.1 KiB
Raw Permalink Blame History

EMQX 配置与端口规划(配合当前微通信网关)

本文给出在 IP 119.146.131.237 上,以您提供的端口规划,如何配置 EMQX 以最小代价配合我们当前的功能(聊天上行 → Webhook 入库;设备下发 → 网关发布ACK 标记)。

端口/NAT 规划

  • 外网 → 内部端口:
    • 8105 → 1883MQTT/TCP移动端/设备、网关 MQTT 直连使用
    • 8083 → 8083MQTT over WebSocket/WSWeb 端非 TLS 调试
    • 8084 → 8084MQTT over WebSocket/WSSWeb 端 TLS需证书
    • 8104 → 18083EMQX Dashboard运维面板建议仅内网或白名单
    • 9560 → 3000网关 HTTP WebhookEMQX 推送 Webhook 到网关 http://119.146.131.237:9560/webhooks/mqtt
    • 3306MySQL仅当使用 MySQL 作为 EMQX 的认证/ACL 后端时需要暴露;否则建议只内网放通

网关server/gateway-mqtt-node默认 HTTP Webhook 端口为 3000已在 .env 可改;若和 EMQX 不同机房,则通过 9560 做端口映射公开给 EMQX 调用。

网关 .env 对应配置

位于 server/gateway-mqtt-node/.env

MQTT_URL=mqtt://119.146.131.237:8105
HTTP_PORT=3000
WEBHOOK_TOKEN=<自定义强密码>

# ACK 与下发已启用(代码已支持)
CHAT_DOWNLINK_ENABLE=true
ACK_ENABLE=true
ACK_TOPIC_PATTERN=device/+/ack

说明:

  • EMQX Webhook 需携带 x-webhook-token: <同上> 以通过网关校验。
  • 下发主题约定:device/{userId}/downACK 主题:device/{userId}/ack

EMQX 与网关的功能对接

我们当前的主题规范:

  • 客户端上行(发消息):chat/send/{conversationId}EMQX → Webhook → 网关 → Supabase
  • 网关回送确认(可选 echochat/recv/{conversationId}(网关 → MQTT
  • 设备下发:device/{userId}/down(网关 → MQTT
  • 设备 ACKdevice/{userId}/ack(设备 → MQTT → 网关订阅标记 ack

方案 AEMQX 5.x 推荐):规则引擎 + HTTP 数据桥Webhook

  1. 创建 HTTP 服务器资源Data Bridge → HTTP Server

    • 目标 URL: http://119.146.131.237:9560/webhooks/mqtt
    • Header: x-webhook-token: <与网关 .env 一致>
    • Payload 编码:开启 Base64以兼容任意负载
  2. 创建规则Rules

    • 规则 SQL仅转发我们关心的主题
      SELECT
          payload as payload,
          topic as topic,
          clientid as clientid,
          username as username,
          timestamp as ts
      FROM
          "message.publish"
      WHERE
          topic LIKE 'chat/send/%'
      
    • 选择上一步的 HTTP 资源作为动作Action
    • 在动作的请求体中,使用 EMQX 模板自定义为:
      {
          "event": "message.publish",
          "topic": "${topic}",
          "clientid": "${clientid}",
          "username": "${username}",
          "payload_base64": "${payload}"
      }
      
    • 保存并启用规则。

规则生效后,任意客户端向 chat/send/xxx 发布消息EMQX 会将事件通过 HTTP POST 投递到网关;网关会校验 x-webhook-token,解析 payload写入 Supabase。

方案 BEMQX 4.x 可用):启用 WebHook 插件

emqx.conf / plugins 中启用 emqx_web_hook,并配置:

web.hook.api.url = http://119.146.131.237:9560/webhooks/mqtt
web.hook.encode_payload = base64
web.hook.headers = x-webhook-token=<与网关 .env 一致>

# 只打开需要的事件message.publish 即可)
web.hook.rule.message.publish.1 = {"action": "on_message_publish"}

重启生效后,与方案 A 类似生效路径。

认证与 ACL 建议

最小可用(开发):可先允许匿名或统一账号连接,靠网关与数据库 RLS 做细粒度校验。生产建议:

  1. 认证(任选其一)

    • JWT 认证:使用 EMQX JWT Auth设置 HS256 密钥为 Supabase 项目的 JWT Secret使客户端将 Supabase 的 JWT 作为 password或 username提交由 EMQX 验签。
    • 用户名/密码:内置用户数据库,按用户维度下发 ACL。
    • MySQL/HTTP 认证:若必须使用 3306 的 MySQL 为后端,建议仅内网放通 3306并在 EMQX 配 MySQL 认证与 ACL 表。
  2. ACL示例规则

    • 普通客户端(以用户名等于 userId 为例):
      • 允许 Publish 到 chat/send/#
      • 允许 Subscribe chat/recv/#
      • 允许 Subscribe device/%u/down%u 为用户名)
      • 允许 Publish device/%u/ack
    • 网关客户端clientid= gateway-* 或特定用户名):
      • 允许 Subscribe chat/send/#device/+/ack
      • 允许 Publish chat/recv/#device/+/down

EMQX 5.x 可在 Authorization 中添加 Built-in Database 规则EMQX 4.x 可用 emqx_authz.conf

{allow, {user, "<gateway-user>"}, subscribe, ["chat/send/#", "device/+/ack"]}.
{allow, {user, "<gateway-user>"}, publish,   ["chat/recv/#", "device/+/down"]}.

{allow, all, publish,   ["chat/send/#", "device/%u/ack"]}.
{allow, all, subscribe, ["chat/recv/#", "device/%u/down"]}.

{deny, all, all, ["#"]}.

说明:严格的会话参与者校验在网关/数据库侧完成EMQX 侧 ACL 采用“粗粒度放行、最小必要”的方式,降低规则复杂度。

客户端接入地址

  • MQTT/TCPmqtt://119.146.131.237:8105
  • WebSocket (WS)ws://119.146.131.237:8083/mqtt
  • WebSocket (WSS)wss://119.146.131.237:8084/mqtt(需部署证书)
  • Dashboardhttp://119.146.131.237:8104(建议加 IP 白名单或 VPN

验证流程(端到端)

  1. 启动网关(确保 .env 已正确配置 MQTT_URL/WEBHOOK_TOKEN 等):

    • server/gateway-mqtt-node 目录启动:npm run dev
  2. 验证上行(客户端 → EMQX → Webhook → 网关 → DB

    • 用任意 MQTT 客户端向 chat/send/<conversationId> 发布 JSON 负载,例如:
      {"sender_id":"<userId>","content":"hello","content_type":"text"}
      
    • 观察网关日志应有 persist ok并在 Supabase 的 chat_messages 有记录。
  3. 验证下发DB → 网关 → EMQX → 设备)

    • 运行仓库中的脚本插入下发表:
      • npm run simulate:chat:downlink(需设置 SIM_CHAT_CONVERSATION_ID / SIM_TARGET_USER_ID
    • 设备(或 MQTT 客户端)订阅 device/<userId>/down 应收到消息。
  4. 验证 ACK设备 → EMQX → 网关 → DB 标记)

    • 设备向 device/<userId>/ack 发布包含 correlation_id 的 JSON或使用脚本
      • npm run simulate:ack(设置 SIM_ACK_TARGET / SIM_CORRELATION_ID
    • 网关日志显示 ack applied,数据库下发记录状态变为 acked

安全与运维建议

  • 关闭不必要的外网端口:如非必须,请勿对公网开放 3306Dashboard 18083 建议仅内网或加白名单。
  • 配置 TLS为 8084/WSS及可选 8883/MQTTS部署证书强制 Web 客户端走 WSS。
  • Webhook 验证:务必设置强随机 WEBHOOK_TOKEN 并在 EMQX 配相同 Header。
  • 账号与 ACL生产环境务必启用认证JWT/用户名密码/HTTP/MySQL与最小权限 ACL网关使用独立账户。
  • 监控:可将 EMQX 指标Prometheus与网关心跳gateway_heartbeats接入统一监控。

如需,我可以根据实际 EMQX 版本4.x/5.x提供对应的导出/导入 JSON 或 CLI 命令,或远程协助完成配置。