Files
akmon/doc_chat/emqx配置.md
2026-01-20 08:04:15 +08:00

169 lines
7.1 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# EMQX 配置与端口规划(配合当前微通信网关)
本文给出在 IP 119.146.131.237 上,以您提供的端口规划,如何配置 EMQX 以最小代价配合我们当前的功能(聊天上行 → Webhook 入库;设备下发 → 网关发布ACK 标记)。
## 端口/NAT 规划
- 外网 → 内部端口:
- 8105 → 1883MQTT/TCP移动端/设备、网关 MQTT 直连使用
- 8083 → 8083MQTT over WebSocket/WSWeb 端非 TLS 调试
- 8084 → 8084MQTT over WebSocket/WSSWeb 端 TLS需证书
- 8104 → 18083EMQX Dashboard运维面板建议仅内网或白名单
- 9560 → 3000网关 HTTP WebhookEMQX 推送 Webhook 到网关 `http://119.146.131.237:9560/webhooks/mqtt`
- 3306MySQL仅当使用 MySQL 作为 EMQX 的认证/ACL 后端时需要暴露;否则建议只内网放通
网关server/gateway-mqtt-node默认 HTTP Webhook 端口为 3000已在 .env 可改;若和 EMQX 不同机房,则通过 9560 做端口映射公开给 EMQX 调用。
## 网关 .env 对应配置
位于 `server/gateway-mqtt-node/.env`
```
MQTT_URL=mqtt://119.146.131.237:8105
HTTP_PORT=3000
WEBHOOK_TOKEN=<自定义强密码>
# ACK 与下发已启用(代码已支持)
CHAT_DOWNLINK_ENABLE=true
ACK_ENABLE=true
ACK_TOPIC_PATTERN=device/+/ack
```
说明:
- EMQX Webhook 需携带 `x-webhook-token: <同上>` 以通过网关校验。
- 下发主题约定:`device/{userId}/down`ACK 主题:`device/{userId}/ack`
## EMQX 与网关的功能对接
我们当前的主题规范:
- 客户端上行(发消息):`chat/send/{conversationId}`EMQX → Webhook → 网关 → Supabase
- 网关回送确认(可选 echo`chat/recv/{conversationId}`(网关 → MQTT
- 设备下发:`device/{userId}/down`(网关 → MQTT
- 设备 ACK`device/{userId}/ack`(设备 → MQTT → 网关订阅标记 ack
### 方案 AEMQX 5.x 推荐):规则引擎 + HTTP 数据桥Webhook
1) 创建 HTTP 服务器资源Data Bridge → HTTP Server
- 目标 URL: `http://119.146.131.237:9560/webhooks/mqtt`
- Header: `x-webhook-token: <与网关 .env 一致>`
- Payload 编码:开启 Base64以兼容任意负载
2) 创建规则Rules
- 规则 SQL仅转发我们关心的主题
```sql
SELECT
payload as payload,
topic as topic,
clientid as clientid,
username as username,
timestamp as ts
FROM
"message.publish"
WHERE
topic LIKE 'chat/send/%'
```
- 选择上一步的 HTTP 资源作为动作Action
- 在动作的请求体中,使用 EMQX 模板自定义为:
```json
{
"event": "message.publish",
"topic": "${topic}",
"clientid": "${clientid}",
"username": "${username}",
"payload_base64": "${payload}"
}
```
- 保存并启用规则。
规则生效后,任意客户端向 `chat/send/xxx` 发布消息EMQX 会将事件通过 HTTP POST 投递到网关;网关会校验 `x-webhook-token`,解析 payload写入 Supabase。
### 方案 BEMQX 4.x 可用):启用 WebHook 插件
在 `emqx.conf` / `plugins` 中启用 `emqx_web_hook`,并配置:
```
web.hook.api.url = http://119.146.131.237:9560/webhooks/mqtt
web.hook.encode_payload = base64
web.hook.headers = x-webhook-token=<与网关 .env 一致>
# 只打开需要的事件message.publish 即可)
web.hook.rule.message.publish.1 = {"action": "on_message_publish"}
```
重启生效后,与方案 A 类似生效路径。
## 认证与 ACL 建议
最小可用(开发):可先允许匿名或统一账号连接,靠网关与数据库 RLS 做细粒度校验。生产建议:
1) 认证(任选其一)
- JWT 认证:使用 EMQX JWT Auth设置 HS256 密钥为 Supabase 项目的 JWT Secret使客户端将 Supabase 的 JWT 作为 password或 username提交由 EMQX 验签。
- 用户名/密码:内置用户数据库,按用户维度下发 ACL。
- MySQL/HTTP 认证:若必须使用 3306 的 MySQL 为后端,建议仅内网放通 3306并在 EMQX 配 MySQL 认证与 ACL 表。
2) ACL示例规则
- 普通客户端(以用户名等于 userId 为例):
- 允许 Publish 到 `chat/send/#`
- 允许 Subscribe `chat/recv/#`
- 允许 Subscribe `device/%u/down`%u 为用户名)
- 允许 Publish `device/%u/ack`
- 网关客户端clientid= gateway-* 或特定用户名):
- 允许 Subscribe `chat/send/#`、`device/+/ack`
- 允许 Publish `chat/recv/#`、`device/+/down`
EMQX 5.x 可在 Authorization 中添加 Built-in Database 规则EMQX 4.x 可用 `emqx_authz.conf`
```
{allow, {user, "<gateway-user>"}, subscribe, ["chat/send/#", "device/+/ack"]}.
{allow, {user, "<gateway-user>"}, publish, ["chat/recv/#", "device/+/down"]}.
{allow, all, publish, ["chat/send/#", "device/%u/ack"]}.
{allow, all, subscribe, ["chat/recv/#", "device/%u/down"]}.
{deny, all, all, ["#"]}.
```
> 说明:严格的会话参与者校验在网关/数据库侧完成EMQX 侧 ACL 采用“粗粒度放行、最小必要”的方式,降低规则复杂度。
## 客户端接入地址
- MQTT/TCP`mqtt://119.146.131.237:8105`
- WebSocket (WS)`ws://119.146.131.237:8083/mqtt`
- WebSocket (WSS)`wss://119.146.131.237:8084/mqtt`(需部署证书)
- Dashboard`http://119.146.131.237:8104`(建议加 IP 白名单或 VPN
## 验证流程(端到端)
1) 启动网关(确保 .env 已正确配置 MQTT_URL/WEBHOOK_TOKEN 等):
- 在 `server/gateway-mqtt-node` 目录启动:`npm run dev`
2) 验证上行(客户端 → EMQX → Webhook → 网关 → DB
- 用任意 MQTT 客户端向 `chat/send/<conversationId>` 发布 JSON 负载,例如:
```json
{"sender_id":"<userId>","content":"hello","content_type":"text"}
```
- 观察网关日志应有 persist ok并在 Supabase 的 chat_messages 有记录。
3) 验证下发DB → 网关 → EMQX → 设备)
- 运行仓库中的脚本插入下发表:
- `npm run simulate:chat:downlink`(需设置 SIM_CHAT_CONVERSATION_ID / SIM_TARGET_USER_ID
- 设备(或 MQTT 客户端)订阅 `device/<userId>/down` 应收到消息。
4) 验证 ACK设备 → EMQX → 网关 → DB 标记)
- 设备向 `device/<userId>/ack` 发布包含 `correlation_id` 的 JSON或使用脚本
- `npm run simulate:ack`(设置 SIM_ACK_TARGET / SIM_CORRELATION_ID
- 网关日志显示 `ack applied`,数据库下发记录状态变为 `acked`。
## 安全与运维建议
- 关闭不必要的外网端口:如非必须,请勿对公网开放 3306Dashboard 18083 建议仅内网或加白名单。
- 配置 TLS为 8084/WSS及可选 8883/MQTTS部署证书强制 Web 客户端走 WSS。
- Webhook 验证:务必设置强随机 `WEBHOOK_TOKEN` 并在 EMQX 配相同 Header。
- 账号与 ACL生产环境务必启用认证JWT/用户名密码/HTTP/MySQL与最小权限 ACL网关使用独立账户。
- 监控:可将 EMQX 指标Prometheus与网关心跳gateway_heartbeats接入统一监控。
---
如需,我可以根据实际 EMQX 版本4.x/5.x提供对应的导出/导入 JSON 或 CLI 命令,或远程协助完成配置。