# EMQX 配置与端口规划(配合当前微通信网关) 本文给出在 IP 119.146.131.237 上,以您提供的端口规划,如何配置 EMQX 以最小代价配合我们当前的功能(聊天上行 → Webhook 入库;设备下发 → 网关发布;ACK 标记)。 ## 端口/NAT 规划 - 外网 → 内部端口: - 8105 → 1883(MQTT/TCP):移动端/设备、网关 MQTT 直连使用 - 8083 → 8083(MQTT over WebSocket/WS):Web 端非 TLS 调试 - 8084 → 8084(MQTT over WebSocket/WSS):Web 端 TLS(需证书) - 8104 → 18083(EMQX Dashboard):运维面板(建议仅内网或白名单) - 9560 → 3000(网关 HTTP Webhook):EMQX 推送 Webhook 到网关 `http://119.146.131.237:9560/webhooks/mqtt` - 3306(MySQL):仅当使用 MySQL 作为 EMQX 的认证/ACL 后端时需要暴露;否则建议只内网放通 网关(server/gateway-mqtt-node)默认 HTTP Webhook 端口为 3000,已在 .env 可改;若和 EMQX 不同机房,则通过 9560 做端口映射公开给 EMQX 调用。 ## 网关 .env 对应配置 位于 `server/gateway-mqtt-node/.env`: ``` MQTT_URL=mqtt://119.146.131.237:8105 HTTP_PORT=3000 WEBHOOK_TOKEN=<自定义强密码> # ACK 与下发已启用(代码已支持) CHAT_DOWNLINK_ENABLE=true ACK_ENABLE=true ACK_TOPIC_PATTERN=device/+/ack ``` 说明: - EMQX Webhook 需携带 `x-webhook-token: <同上>` 以通过网关校验。 - 下发主题约定:`device/{userId}/down`;ACK 主题:`device/{userId}/ack`。 ## EMQX 与网关的功能对接 我们当前的主题规范: - 客户端上行(发消息):`chat/send/{conversationId}`(EMQX → Webhook → 网关 → Supabase) - 网关回送确认(可选 echo):`chat/recv/{conversationId}`(网关 → MQTT) - 设备下发:`device/{userId}/down`(网关 → MQTT) - 设备 ACK:`device/{userId}/ack`(设备 → MQTT → 网关订阅标记 ack) ### 方案 A(EMQX 5.x 推荐):规则引擎 + HTTP 数据桥(Webhook) 1) 创建 HTTP 服务器资源(Data Bridge → HTTP Server) - 目标 URL: `http://119.146.131.237:9560/webhooks/mqtt` - Header: `x-webhook-token: <与网关 .env 一致>` - Payload 编码:开启 Base64(以兼容任意负载) 2) 创建规则(Rules) - 规则 SQL(仅转发我们关心的主题): ```sql SELECT payload as payload, topic as topic, clientid as clientid, username as username, timestamp as ts FROM "message.publish" WHERE topic LIKE 'chat/send/%' ``` - 选择上一步的 HTTP 资源作为动作(Action); - 在动作的请求体中,使用 EMQX 模板自定义为: ```json { "event": "message.publish", "topic": "${topic}", "clientid": "${clientid}", "username": "${username}", "payload_base64": "${payload}" } ``` - 保存并启用规则。 规则生效后,任意客户端向 `chat/send/xxx` 发布消息,EMQX 会将事件通过 HTTP POST 投递到网关;网关会校验 `x-webhook-token`,解析 payload,写入 Supabase。 ### 方案 B(EMQX 4.x 可用):启用 WebHook 插件 在 `emqx.conf` / `plugins` 中启用 `emqx_web_hook`,并配置: ``` web.hook.api.url = http://119.146.131.237:9560/webhooks/mqtt web.hook.encode_payload = base64 web.hook.headers = x-webhook-token=<与网关 .env 一致> # 只打开需要的事件(message.publish 即可) web.hook.rule.message.publish.1 = {"action": "on_message_publish"} ``` 重启生效后,与方案 A 类似生效路径。 ## 认证与 ACL 建议 最小可用(开发):可先允许匿名或统一账号连接,靠网关与数据库 RLS 做细粒度校验。生产建议: 1) 认证(任选其一) - JWT 认证:使用 EMQX JWT Auth,设置 HS256 密钥为 Supabase 项目的 JWT Secret,使客户端将 Supabase 的 JWT 作为 password(或 username)提交,由 EMQX 验签。 - 用户名/密码:内置用户数据库,按用户维度下发 ACL。 - MySQL/HTTP 认证:若必须使用 3306 的 MySQL 为后端,建议仅内网放通 3306,并在 EMQX 配 MySQL 认证与 ACL 表。 2) ACL(示例规则) - 普通客户端(以用户名等于 userId 为例): - 允许 Publish 到 `chat/send/#` - 允许 Subscribe `chat/recv/#` - 允许 Subscribe `device/%u/down`(%u 为用户名) - 允许 Publish `device/%u/ack` - 网关客户端(clientid= gateway-* 或特定用户名): - 允许 Subscribe `chat/send/#`、`device/+/ack` - 允许 Publish `chat/recv/#`、`device/+/down` EMQX 5.x 可在 Authorization 中添加 Built-in Database 规则;EMQX 4.x 可用 `emqx_authz.conf`: ``` {allow, {user, ""}, subscribe, ["chat/send/#", "device/+/ack"]}. {allow, {user, ""}, publish, ["chat/recv/#", "device/+/down"]}. {allow, all, publish, ["chat/send/#", "device/%u/ack"]}. {allow, all, subscribe, ["chat/recv/#", "device/%u/down"]}. {deny, all, all, ["#"]}. ``` > 说明:严格的会话参与者校验在网关/数据库侧完成,EMQX 侧 ACL 采用“粗粒度放行、最小必要”的方式,降低规则复杂度。 ## 客户端接入地址 - MQTT/TCP:`mqtt://119.146.131.237:8105` - WebSocket (WS):`ws://119.146.131.237:8083/mqtt` - WebSocket (WSS):`wss://119.146.131.237:8084/mqtt`(需部署证书) - Dashboard:`http://119.146.131.237:8104`(建议加 IP 白名单或 VPN) ## 验证流程(端到端) 1) 启动网关(确保 .env 已正确配置 MQTT_URL/WEBHOOK_TOKEN 等): - 在 `server/gateway-mqtt-node` 目录启动:`npm run dev` 2) 验证上行(客户端 → EMQX → Webhook → 网关 → DB) - 用任意 MQTT 客户端向 `chat/send/` 发布 JSON 负载,例如: ```json {"sender_id":"","content":"hello","content_type":"text"} ``` - 观察网关日志应有 persist ok,并在 Supabase 的 chat_messages 有记录。 3) 验证下发(DB → 网关 → EMQX → 设备) - 运行仓库中的脚本插入下发表: - `npm run simulate:chat:downlink`(需设置 SIM_CHAT_CONVERSATION_ID / SIM_TARGET_USER_ID) - 设备(或 MQTT 客户端)订阅 `device//down` 应收到消息。 4) 验证 ACK(设备 → EMQX → 网关 → DB 标记) - 设备向 `device//ack` 发布包含 `correlation_id` 的 JSON,或使用脚本: - `npm run simulate:ack`(设置 SIM_ACK_TARGET / SIM_CORRELATION_ID) - 网关日志显示 `ack applied`,数据库下发记录状态变为 `acked`。 ## 安全与运维建议 - 关闭不必要的外网端口:如非必须,请勿对公网开放 3306;Dashboard 18083 建议仅内网或加白名单。 - 配置 TLS:为 8084/WSS(及可选 8883/MQTTS)部署证书,强制 Web 客户端走 WSS。 - Webhook 验证:务必设置强随机 `WEBHOOK_TOKEN` 并在 EMQX 配相同 Header。 - 账号与 ACL:生产环境务必启用认证(JWT/用户名密码/HTTP/MySQL)与最小权限 ACL;网关使用独立账户。 - 监控:可将 EMQX 指标(Prometheus)与网关心跳(gateway_heartbeats)接入统一监控。 --- 如需,我可以根据实际 EMQX 版本(4.x/5.x)提供对应的导出/导入 JSON 或 CLI 命令,或远程协助完成配置。