继上一篇vue2集成mqtt之后,我们的需求又进行了升级,需要使用vue3和pinia来集成mqtt,并且支持开发环境使用ws协议,而生产环境使用wss协议。
具体思路如下:
- 在pinia中增加mqtt模块,统一管理前端的mqtt客户端连接、主题订阅以及业务回调
- 提供mqtt-subscribe-client组件,在mounted方法中,订阅传入的主题
- 在app启动时,调用客户端的连接方法,创建mqtt客户端实例
- 开启emqx服务器的wss端口监听,并配置证书
- 在服务端配置nginx代理,将前端的wss协议报文,转发给emqx服务器
- 在index.html页面增加CSP(Content-Security-Policy)标签,并在环境变量中配置CSP的值
环境准备
# 前端
"vite": "5.0.4",
"vue": "3.3.9",
"pinia": "2.1.7",
"mqtt.js": "5.6.0"
# 服务端
nginx
ssl证书
话不多说,上代码
mqtt前端代码
mqtt.js
在/store/modules目录下创建mqtt.js文件
import { guid } from '@/utils/common'
import mqtt from "mqtt"
const useMqttStore = defineStore(
'mqtt',
{
state: () => ({
url: undefined,
client: undefined,
// 订阅主题的集合,key为topic, value为接收到该topic时需要执行的回调
subscribeMembers: {}
}),
actions: {
// 创建mqtt连接
connect(url) {
let client = mqtt.connect(url, clientOptions)
client.on("connect", onConnect)
client.on("reconnect", onReconnect)
client.on("error", onError)
client.on("message", onMessage)
this.client = client
this.url = url
},
disconnect() {
this.client.end()
this.client = undefined
this.subscribeMembers = {}
console.log(`服务器已断开连接!`)
},
/**
* 订阅
* @param topic 消息主题
* @param subscribeOption 订阅设置
* @param callback 接收消息的回调
*/
subscribe({topic, callback, subscribeOption}) {
if (!this.client && this.url) {
this.connect(this.url)
}
this.client.subscribe(topic,
subscribeOption || {qos: clientOptions.qos},
(error, granted) => {
if (error) {
console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败: `,
error)
} else {
console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功`)
}
})
this.subscribeMembers[topic] = callback
},
/**
* 取消订阅
* @param topic 消息主题
*/
unsubscribe(topic) {
if (!this.client) {
return
}
this.client.unsubscribe(topic,
{},
(error, granted) => {
if (error) {
console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败: `,
error)
} else {
console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功`)
}
})
this.subscribeMembers[topic] = undefined
},
publish({ topic, message }) {
if ((!this.client || !this.client.connected) && this.url) {
this.connect(this.url)
}
this.client.publish(topic, message, {qos: clientOptions.qos}, (e) => {
if(e) {
console.log(`客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败: `, e)
}
})
}
}
})
const clientOptions = {
clean: true, // true: 清除会话, false: 保留会话
connectTimeout: 4000, // 超时时间
keepAlive:60,
clientId: 'pc_mqtt_client',
// 认证信息
username: 'yourUserName',
password: 'yourPassword',
// 开启log后,控制台会答应mqtt的调试日志
// log: console.log.bind(console),
qos: 1,
protocolVersion: 5,
rejectUnauthorized: false
}
const onConnect = (e) => {
console.log(`客户端: ${clientOptions.clientId}, 连接emqx服务器成功:`, e)
}
const onReconnect = (error) => {
console.log(`客户端: ${clientOptions.clientId}, 正在重连:`, error)
}
const onError = (error) => {
console.log(`客户端: ${clientOptions.clientId}, 连接失败:`, error)
}
const onMessage = (topic, message) => {
console.log(`客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息: `, message?.toString())
let callback = useMqttStore().subscribeMembers?.[topic]
callback?.(topic, message?.toString())
}
export default useMqttStore
订阅组件封装
订阅组件与上一篇vue2集成mqtt中的基本一致
<script>
import useUserStore from '@/store/modules/user'
import useMqttStore from '@/store/modules/mqtt'
export default {
name: 'MqttSubscribeClient',
props: {
topic: {
type: String,
required: true
},
global: {
type: Boolean,
default: false
}
},
setup() {
const mqttStore = useMqttStore()
const userStore = useUserStore()
return {
mqttStore,
userStore
}
},
computed: {
subscribeTopic() {
return this.global ? this.topic : `${this.topic}/${this.userStore.name}`
}
},
mounted() {
this.mqttStore.subscribe({topic: this.subscribeTopic, callback: this.subscribeCallback})
},
beforeDestroy() {
this.mqttStore.unsubscribe(this.subscribeTopic)
},
data() {
return {
}
},
methods: {
subscribeCallback(topic, message) {
this.$emit("receive", topic, message ? JSON.parse(message) : message)
}
}
}
</script>
创建连接
在App.vue中,即程序的入口中,统一管理mqtt连接的创建和销毁。
<script setup>
import useMqttStore from '@/store/modules/mqtt'
// 创建mqtt连接
const mqttStore = useMqttStore()
mqttStore.connect(import.meta.env.VITE_BASE_EMQX_SERVER_HOST)
onUnmounted(() => {
mqttStore.disconnect()
})
</script>
由于使用VITE打包,所以之前的配置
VUE_APP_BASE_EMQX_SERVER_HOST需要改成
VITE_BASE_EMQX_SERVER_HOST,至于为什么需要改成VITE_开头的配置,别问我为什么知道的,自行百度。
使用订阅组件
<script>
receive(topic, message) {
// 处理mqtt消息
}
</script>
至此,vue3集成mqtt客户端就完成了。
emqx服务器配置
emqx服务器的配置中需要增加wss协议的配置
listeners {
tcp.default {
enabled = true
bind = "0.0.0.0:63081"
}
ssl.default {
enabled = true
bind = "0.0.0.0:63082"
ssl_options {
keyfile = "etc/certs/your.key"
certfile = "etc/certs/your.pem"
cacertfile = "etc/certs/your_root.crt"
# 开启对端验证
verify = verify_peer
# 强制开启双向认证,如果客户端无法提供证书,则 SSL/TLS 连接将被拒绝
fail_if_no_peer_cert = true
}
}
ws.default {
enabled = true
bind = "0.0.0.0:63084"
}
wss.default {
enabled = true
bind = "0.0.0.0:63085"
ssl_options {
keyfile = "etc/certs/your.key"
certfile = "etc/certs/your.pem"
cacertfile = "etc/certs/your_root.crt"
# 开启对端验证
verify = verify_peer
# 强制开启双向认证,如果客户端无法提供证书,则 SSL/TLS 连接将被拒绝
fail_if_no_peer_cert = false
}
}
}
配置中需要注意的有以下几点:
- 端口号:注意各个协议绑定的端口号要不一致,否则启动会有问题
- 证书信息:key为密钥文件,certfile为签发的证书文件,cacaertfile为根证书,证书绑定的ip或者域名需要与emqx部署的服务器一致。
- verify:是否开启对端验证,verify_peer为开启,verify_none为不开启
- fail_if_no_peer_cert:true则客户端必须提供证书信息,false则客户端无需提供证书,此处ssl协议和wss协议分别配置成不同的值,方便测试
nginx配置
现代浏览器如果开启了https协议,那么都会强制要求开启wss协议。不过庆幸的是,他们都可以通过nginx代理,方便的进行证书配置。
# 后端服务转发
server {
listen 60081 ssl;
server_name your.domain.com;
ssl_certificate path/to/your.pem;
ssl_certificate_key path/to/your.key;
ssl_session_timeout 5m;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:HIGH:!aNULL:!MD5:!RC4:!DHE;
ssl_prefer_server_ciphers on;
root /path/to/your/dist;
index index.html index.htm;
# 文件上传405问题处理
error_page 405 =200 $uri;
location / {
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS';
add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization';
if ($request_method = 'OPTIONS') {
return 204;
}
# 此处的 @router 实际上是引用下面的转发,否则在 Vue 路由刷新时可能会抛出 404
try_files $uri $uri/ @router;
# 请求指向的首页
index index.html;
}
# 由于路由的资源不一定是真实的路径,无法找到具体文件
# 所以需要将请求重写到 index.html 中,然后交给真正的 Vue 路由处理请求资源
location @router {
rewrite ^.*$ /index.html last;
}
location /prod-api/ {
proxy_pass https://localhost:18080/;
proxy_set_header x-forwarded-for $remote_addr;
}
}
# mqtt wss协议转发,前端使用63083端口,nginx转发到emqx服务器的wss协议绑定的63085端口
server {
listen 63083 ssl;
server_name your.domain.com;
ssl_certificate path/to/your.pem;
ssl_certificate_key path/to/your.key;
ssl_session_timeout 5m;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:HIGH:!aNULL:!MD5:!RC4:!DHE;
ssl_prefer_server_ciphers on;
location /mqtt {
proxy_pass https://localhost:63085;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# 默认60s断开连接
proxy_read_timeout 60s;
}
}
配置中需要注意的是:
- listen 端口后面需要增加ssl
- server_name:需要使用与证书绑定的ip或者域名
- mqtt的wss转发配置监听63083端口,转发到emqx服务器wss协议监听的63085端口,proxy_pass需要使用https协议。
会有人问为什么需要使用nginx来转发,而不是使用前端直连emqx的63085端口,个人猜测是因为我们的客户端是浏览器,mqtt.js库对于浏览器做了特殊处理,丢弃了证书相关的配置内容。 其实也很好理解,如果浏览器端直连,那么首先用户需要将证书下载到本地,并且每次连接时都需要从本地系统读取证书,让浏览器有自动读取本地文件的权限,很显然是不被允许的。所以只能在服务端做这件事情。nginx就是很好的实现手段。
前端动态配置协议
进入下一个环节,根据环境变量动态切换ws和wss协议。ws和wss的关系类似于http和https。前端开启安全协议的方式是在根目录index.html文件中增加meta信息:
为了不影响开发环境,所以我们通过加载环境变量的方式,动态切换是否开启安全协议。
首先在env文件中增加相应的配置
env.development
# 开发环境配置
ENV = 'development'
VITE_BASE_EMQX_SERVER_HOST='ws://your.domain.com:63084/mqtt'
VITE_BASE_CSP = ""
env.production
# 开发环境配置
ENV = 'production'
VITE_BASE_EMQX_SERVER_HOST='wss://your.domain.com:63083/mqtt'
VITE_BASE_CSP = "upgrade-insecure-requests;"
注意2个文件中的区别:
- 首先是协议:dev环境使用ws,prod环境使用wss
- 其次是端口号,dev使用emqx服务器ws协议绑定的端口号63084,但prod使用的并不是emqx服务器wss协议绑定的端口号63085,因为需要通过nginx转发来携带证书信息,此处是重点,重点,重点!!!(手动加粗)
然后将index.html中的CSP meta信息修改一下:
至此,大功告成。
总结
经历了几天的折腾,总算把整个流程打通,网上关于mqtt over wss的文章很少,有的也是只言片语,mqtt.js源码中提供的示例也是让人摸不着头脑,官方文档也没有解释为什么浏览器端无法传递证书信息(可能是觉得这是基础知识,或者觉得这是服务端的事,关我一个前端库屁事)。写下这篇文章,希望更多的人在使用mqtt时,可以少踩一些坑。