169 lines
7.1 KiB
Markdown
169 lines
7.1 KiB
Markdown
# EMQX 配置与端口规划(配合当前微通信网关)
|
||
|
||
本文给出在 IP 119.146.131.237 上,以您提供的端口规划,如何配置 EMQX 以最小代价配合我们当前的功能(聊天上行 → Webhook 入库;设备下发 → 网关发布;ACK 标记)。
|
||
|
||
## 端口/NAT 规划
|
||
|
||
- 外网 → 内部端口:
|
||
- 8105 → 1883(MQTT/TCP):移动端/设备、网关 MQTT 直连使用
|
||
- 8083 → 8083(MQTT over WebSocket/WS):Web 端非 TLS 调试
|
||
- 8084 → 8084(MQTT over WebSocket/WSS):Web 端 TLS(需证书)
|
||
- 8104 → 18083(EMQX Dashboard):运维面板(建议仅内网或白名单)
|
||
- 9560 → 3000(网关 HTTP Webhook):EMQX 推送 Webhook 到网关 `http://119.146.131.237:9560/webhooks/mqtt`
|
||
- 3306(MySQL):仅当使用 MySQL 作为 EMQX 的认证/ACL 后端时需要暴露;否则建议只内网放通
|
||
|
||
网关(server/gateway-mqtt-node)默认 HTTP Webhook 端口为 3000,已在 .env 可改;若和 EMQX 不同机房,则通过 9560 做端口映射公开给 EMQX 调用。
|
||
|
||
## 网关 .env 对应配置
|
||
|
||
位于 `server/gateway-mqtt-node/.env`:
|
||
|
||
```
|
||
MQTT_URL=mqtt://119.146.131.237:8105
|
||
HTTP_PORT=3000
|
||
WEBHOOK_TOKEN=<自定义强密码>
|
||
|
||
# ACK 与下发已启用(代码已支持)
|
||
CHAT_DOWNLINK_ENABLE=true
|
||
ACK_ENABLE=true
|
||
ACK_TOPIC_PATTERN=device/+/ack
|
||
```
|
||
|
||
说明:
|
||
- EMQX Webhook 需携带 `x-webhook-token: <同上>` 以通过网关校验。
|
||
- 下发主题约定:`device/{userId}/down`;ACK 主题:`device/{userId}/ack`。
|
||
|
||
## EMQX 与网关的功能对接
|
||
|
||
我们当前的主题规范:
|
||
- 客户端上行(发消息):`chat/send/{conversationId}`(EMQX → Webhook → 网关 → Supabase)
|
||
- 网关回送确认(可选 echo):`chat/recv/{conversationId}`(网关 → MQTT)
|
||
- 设备下发:`device/{userId}/down`(网关 → MQTT)
|
||
- 设备 ACK:`device/{userId}/ack`(设备 → MQTT → 网关订阅标记 ack)
|
||
|
||
### 方案 A(EMQX 5.x 推荐):规则引擎 + HTTP 数据桥(Webhook)
|
||
|
||
1) 创建 HTTP 服务器资源(Data Bridge → HTTP Server)
|
||
- 目标 URL: `http://119.146.131.237:9560/webhooks/mqtt`
|
||
- Header: `x-webhook-token: <与网关 .env 一致>`
|
||
- Payload 编码:开启 Base64(以兼容任意负载)
|
||
|
||
2) 创建规则(Rules)
|
||
- 规则 SQL(仅转发我们关心的主题):
|
||
```sql
|
||
SELECT
|
||
payload as payload,
|
||
topic as topic,
|
||
clientid as clientid,
|
||
username as username,
|
||
timestamp as ts
|
||
FROM
|
||
"message.publish"
|
||
WHERE
|
||
topic LIKE 'chat/send/%'
|
||
```
|
||
- 选择上一步的 HTTP 资源作为动作(Action);
|
||
- 在动作的请求体中,使用 EMQX 模板自定义为:
|
||
```json
|
||
{
|
||
"event": "message.publish",
|
||
"topic": "${topic}",
|
||
"clientid": "${clientid}",
|
||
"username": "${username}",
|
||
"payload_base64": "${payload}"
|
||
}
|
||
```
|
||
- 保存并启用规则。
|
||
|
||
规则生效后,任意客户端向 `chat/send/xxx` 发布消息,EMQX 会将事件通过 HTTP POST 投递到网关;网关会校验 `x-webhook-token`,解析 payload,写入 Supabase。
|
||
|
||
### 方案 B(EMQX 4.x 可用):启用 WebHook 插件
|
||
|
||
在 `emqx.conf` / `plugins` 中启用 `emqx_web_hook`,并配置:
|
||
|
||
```
|
||
web.hook.api.url = http://119.146.131.237:9560/webhooks/mqtt
|
||
web.hook.encode_payload = base64
|
||
web.hook.headers = x-webhook-token=<与网关 .env 一致>
|
||
|
||
# 只打开需要的事件(message.publish 即可)
|
||
web.hook.rule.message.publish.1 = {"action": "on_message_publish"}
|
||
```
|
||
|
||
重启生效后,与方案 A 类似生效路径。
|
||
|
||
## 认证与 ACL 建议
|
||
|
||
最小可用(开发):可先允许匿名或统一账号连接,靠网关与数据库 RLS 做细粒度校验。生产建议:
|
||
|
||
1) 认证(任选其一)
|
||
- JWT 认证:使用 EMQX JWT Auth,设置 HS256 密钥为 Supabase 项目的 JWT Secret,使客户端将 Supabase 的 JWT 作为 password(或 username)提交,由 EMQX 验签。
|
||
- 用户名/密码:内置用户数据库,按用户维度下发 ACL。
|
||
- MySQL/HTTP 认证:若必须使用 3306 的 MySQL 为后端,建议仅内网放通 3306,并在 EMQX 配 MySQL 认证与 ACL 表。
|
||
|
||
2) ACL(示例规则)
|
||
- 普通客户端(以用户名等于 userId 为例):
|
||
- 允许 Publish 到 `chat/send/#`
|
||
- 允许 Subscribe `chat/recv/#`
|
||
- 允许 Subscribe `device/%u/down`(%u 为用户名)
|
||
- 允许 Publish `device/%u/ack`
|
||
- 网关客户端(clientid= gateway-* 或特定用户名):
|
||
- 允许 Subscribe `chat/send/#`、`device/+/ack`
|
||
- 允许 Publish `chat/recv/#`、`device/+/down`
|
||
|
||
EMQX 5.x 可在 Authorization 中添加 Built-in Database 规则;EMQX 4.x 可用 `emqx_authz.conf`:
|
||
|
||
```
|
||
{allow, {user, "<gateway-user>"}, subscribe, ["chat/send/#", "device/+/ack"]}.
|
||
{allow, {user, "<gateway-user>"}, publish, ["chat/recv/#", "device/+/down"]}.
|
||
|
||
{allow, all, publish, ["chat/send/#", "device/%u/ack"]}.
|
||
{allow, all, subscribe, ["chat/recv/#", "device/%u/down"]}.
|
||
|
||
{deny, all, all, ["#"]}.
|
||
```
|
||
|
||
> 说明:严格的会话参与者校验在网关/数据库侧完成,EMQX 侧 ACL 采用“粗粒度放行、最小必要”的方式,降低规则复杂度。
|
||
|
||
## 客户端接入地址
|
||
|
||
- MQTT/TCP:`mqtt://119.146.131.237:8105`
|
||
- WebSocket (WS):`ws://119.146.131.237:8083/mqtt`
|
||
- WebSocket (WSS):`wss://119.146.131.237:8084/mqtt`(需部署证书)
|
||
- Dashboard:`http://119.146.131.237:8104`(建议加 IP 白名单或 VPN)
|
||
|
||
## 验证流程(端到端)
|
||
|
||
1) 启动网关(确保 .env 已正确配置 MQTT_URL/WEBHOOK_TOKEN 等):
|
||
- 在 `server/gateway-mqtt-node` 目录启动:`npm run dev`
|
||
|
||
2) 验证上行(客户端 → EMQX → Webhook → 网关 → DB)
|
||
- 用任意 MQTT 客户端向 `chat/send/<conversationId>` 发布 JSON 负载,例如:
|
||
```json
|
||
{"sender_id":"<userId>","content":"hello","content_type":"text"}
|
||
```
|
||
- 观察网关日志应有 persist ok,并在 Supabase 的 chat_messages 有记录。
|
||
|
||
3) 验证下发(DB → 网关 → EMQX → 设备)
|
||
- 运行仓库中的脚本插入下发表:
|
||
- `npm run simulate:chat:downlink`(需设置 SIM_CHAT_CONVERSATION_ID / SIM_TARGET_USER_ID)
|
||
- 设备(或 MQTT 客户端)订阅 `device/<userId>/down` 应收到消息。
|
||
|
||
4) 验证 ACK(设备 → EMQX → 网关 → DB 标记)
|
||
- 设备向 `device/<userId>/ack` 发布包含 `correlation_id` 的 JSON,或使用脚本:
|
||
- `npm run simulate:ack`(设置 SIM_ACK_TARGET / SIM_CORRELATION_ID)
|
||
- 网关日志显示 `ack applied`,数据库下发记录状态变为 `acked`。
|
||
|
||
## 安全与运维建议
|
||
|
||
- 关闭不必要的外网端口:如非必须,请勿对公网开放 3306;Dashboard 18083 建议仅内网或加白名单。
|
||
- 配置 TLS:为 8084/WSS(及可选 8883/MQTTS)部署证书,强制 Web 客户端走 WSS。
|
||
- Webhook 验证:务必设置强随机 `WEBHOOK_TOKEN` 并在 EMQX 配相同 Header。
|
||
- 账号与 ACL:生产环境务必启用认证(JWT/用户名密码/HTTP/MySQL)与最小权限 ACL;网关使用独立账户。
|
||
- 监控:可将 EMQX 指标(Prometheus)与网关心跳(gateway_heartbeats)接入统一监控。
|
||
|
||
---
|
||
|
||
如需,我可以根据实际 EMQX 版本(4.x/5.x)提供对应的导出/导入 JSON 或 CLI 命令,或远程协助完成配置。
|
||
|