7.1 KiB
EMQX 配置与端口规划(配合当前微通信网关)
本文给出在 IP 119.146.131.237 上,以您提供的端口规划,如何配置 EMQX 以最小代价配合我们当前的功能(聊天上行 → Webhook 入库;设备下发 → 网关发布;ACK 标记)。
端口/NAT 规划
- 外网 → 内部端口:
- 8105 → 1883(MQTT/TCP):移动端/设备、网关 MQTT 直连使用
- 8083 → 8083(MQTT over WebSocket/WS):Web 端非 TLS 调试
- 8084 → 8084(MQTT over WebSocket/WSS):Web 端 TLS(需证书)
- 8104 → 18083(EMQX Dashboard):运维面板(建议仅内网或白名单)
- 9560 → 3000(网关 HTTP Webhook):EMQX 推送 Webhook 到网关
http://119.146.131.237:9560/webhooks/mqtt - 3306(MySQL):仅当使用 MySQL 作为 EMQX 的认证/ACL 后端时需要暴露;否则建议只内网放通
网关(server/gateway-mqtt-node)默认 HTTP Webhook 端口为 3000,已在 .env 可改;若和 EMQX 不同机房,则通过 9560 做端口映射公开给 EMQX 调用。
网关 .env 对应配置
位于 server/gateway-mqtt-node/.env:
MQTT_URL=mqtt://119.146.131.237:8105
HTTP_PORT=3000
WEBHOOK_TOKEN=<自定义强密码>
# ACK 与下发已启用(代码已支持)
CHAT_DOWNLINK_ENABLE=true
ACK_ENABLE=true
ACK_TOPIC_PATTERN=device/+/ack
说明:
- EMQX Webhook 需携带
x-webhook-token: <同上>以通过网关校验。 - 下发主题约定:
device/{userId}/down;ACK 主题:device/{userId}/ack。
EMQX 与网关的功能对接
我们当前的主题规范:
- 客户端上行(发消息):
chat/send/{conversationId}(EMQX → Webhook → 网关 → Supabase) - 网关回送确认(可选 echo):
chat/recv/{conversationId}(网关 → MQTT) - 设备下发:
device/{userId}/down(网关 → MQTT) - 设备 ACK:
device/{userId}/ack(设备 → MQTT → 网关订阅标记 ack)
方案 A(EMQX 5.x 推荐):规则引擎 + HTTP 数据桥(Webhook)
-
创建 HTTP 服务器资源(Data Bridge → HTTP Server)
- 目标 URL:
http://119.146.131.237:9560/webhooks/mqtt - Header:
x-webhook-token: <与网关 .env 一致> - Payload 编码:开启 Base64(以兼容任意负载)
- 目标 URL:
-
创建规则(Rules)
- 规则 SQL(仅转发我们关心的主题):
SELECT payload as payload, topic as topic, clientid as clientid, username as username, timestamp as ts FROM "message.publish" WHERE topic LIKE 'chat/send/%' - 选择上一步的 HTTP 资源作为动作(Action);
- 在动作的请求体中,使用 EMQX 模板自定义为:
{ "event": "message.publish", "topic": "${topic}", "clientid": "${clientid}", "username": "${username}", "payload_base64": "${payload}" } - 保存并启用规则。
- 规则 SQL(仅转发我们关心的主题):
规则生效后,任意客户端向 chat/send/xxx 发布消息,EMQX 会将事件通过 HTTP POST 投递到网关;网关会校验 x-webhook-token,解析 payload,写入 Supabase。
方案 B(EMQX 4.x 可用):启用 WebHook 插件
在 emqx.conf / plugins 中启用 emqx_web_hook,并配置:
web.hook.api.url = http://119.146.131.237:9560/webhooks/mqtt
web.hook.encode_payload = base64
web.hook.headers = x-webhook-token=<与网关 .env 一致>
# 只打开需要的事件(message.publish 即可)
web.hook.rule.message.publish.1 = {"action": "on_message_publish"}
重启生效后,与方案 A 类似生效路径。
认证与 ACL 建议
最小可用(开发):可先允许匿名或统一账号连接,靠网关与数据库 RLS 做细粒度校验。生产建议:
-
认证(任选其一)
- JWT 认证:使用 EMQX JWT Auth,设置 HS256 密钥为 Supabase 项目的 JWT Secret,使客户端将 Supabase 的 JWT 作为 password(或 username)提交,由 EMQX 验签。
- 用户名/密码:内置用户数据库,按用户维度下发 ACL。
- MySQL/HTTP 认证:若必须使用 3306 的 MySQL 为后端,建议仅内网放通 3306,并在 EMQX 配 MySQL 认证与 ACL 表。
-
ACL(示例规则)
- 普通客户端(以用户名等于 userId 为例):
- 允许 Publish 到
chat/send/# - 允许 Subscribe
chat/recv/# - 允许 Subscribe
device/%u/down(%u 为用户名) - 允许 Publish
device/%u/ack
- 允许 Publish 到
- 网关客户端(clientid= gateway-* 或特定用户名):
- 允许 Subscribe
chat/send/#、device/+/ack - 允许 Publish
chat/recv/#、device/+/down
- 允许 Subscribe
- 普通客户端(以用户名等于 userId 为例):
EMQX 5.x 可在 Authorization 中添加 Built-in Database 规则;EMQX 4.x 可用 emqx_authz.conf:
{allow, {user, "<gateway-user>"}, subscribe, ["chat/send/#", "device/+/ack"]}.
{allow, {user, "<gateway-user>"}, publish, ["chat/recv/#", "device/+/down"]}.
{allow, all, publish, ["chat/send/#", "device/%u/ack"]}.
{allow, all, subscribe, ["chat/recv/#", "device/%u/down"]}.
{deny, all, all, ["#"]}.
说明:严格的会话参与者校验在网关/数据库侧完成,EMQX 侧 ACL 采用“粗粒度放行、最小必要”的方式,降低规则复杂度。
客户端接入地址
- MQTT/TCP:
mqtt://119.146.131.237:8105 - WebSocket (WS):
ws://119.146.131.237:8083/mqtt - WebSocket (WSS):
wss://119.146.131.237:8084/mqtt(需部署证书) - Dashboard:
http://119.146.131.237:8104(建议加 IP 白名单或 VPN)
验证流程(端到端)
-
启动网关(确保 .env 已正确配置 MQTT_URL/WEBHOOK_TOKEN 等):
- 在
server/gateway-mqtt-node目录启动:npm run dev
- 在
-
验证上行(客户端 → EMQX → Webhook → 网关 → DB)
- 用任意 MQTT 客户端向
chat/send/<conversationId>发布 JSON 负载,例如:{"sender_id":"<userId>","content":"hello","content_type":"text"} - 观察网关日志应有 persist ok,并在 Supabase 的 chat_messages 有记录。
- 用任意 MQTT 客户端向
-
验证下发(DB → 网关 → EMQX → 设备)
- 运行仓库中的脚本插入下发表:
npm run simulate:chat:downlink(需设置 SIM_CHAT_CONVERSATION_ID / SIM_TARGET_USER_ID)
- 设备(或 MQTT 客户端)订阅
device/<userId>/down应收到消息。
- 运行仓库中的脚本插入下发表:
-
验证 ACK(设备 → EMQX → 网关 → DB 标记)
- 设备向
device/<userId>/ack发布包含correlation_id的 JSON,或使用脚本:npm run simulate:ack(设置 SIM_ACK_TARGET / SIM_CORRELATION_ID)
- 网关日志显示
ack applied,数据库下发记录状态变为acked。
- 设备向
安全与运维建议
- 关闭不必要的外网端口:如非必须,请勿对公网开放 3306;Dashboard 18083 建议仅内网或加白名单。
- 配置 TLS:为 8084/WSS(及可选 8883/MQTTS)部署证书,强制 Web 客户端走 WSS。
- Webhook 验证:务必设置强随机
WEBHOOK_TOKEN并在 EMQX 配相同 Header。 - 账号与 ACL:生产环境务必启用认证(JWT/用户名密码/HTTP/MySQL)与最小权限 ACL;网关使用独立账户。
- 监控:可将 EMQX 指标(Prometheus)与网关心跳(gateway_heartbeats)接入统一监控。
如需,我可以根据实际 EMQX 版本(4.x/5.x)提供对应的导出/导入 JSON 或 CLI 命令,或远程协助完成配置。