MQTT客户端
简介
- MQTT全称为消息队列遥测传输(英语:Message Queuing Telemetry Transport),是ISO 标准(ISO/IEC PRF 20922)下基于发布 (Publish) /订阅 (Subscribe)范式的消息协议,工作在 TCP/IP协议族上。
- MQTT最大优点在于,可以用极少的数据和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
- MQTT 协议定义了两种网络实体:消息代理(message broker)与客户端(client)。其中,消息代理用于接收来自客户端的消息并转发至目标客户端,也被称为MQTT服务器。MQTT 客户端可以是任何运行有 MQTT 库并通过网络连接至消息代理的设备,例如微型控制器或大型服务器。
- 在MQTT 协议中,消息的传输是通过主题(topic)管理的,消息的发布者和订阅者均为客户端(client)。消息由主题(Topic)和负载(payload)两部分组成。当某个客户端有需要分发的数据时,会向连接的消息代理(MQTT服务器)发送指定主题的携带有负载的消息。服务器收到消息后,会向订阅此主题的客户端分发此消息。发布者不需要配置订阅者的相关信息和具体位置;同样,订阅者也不需要配置发布者的相关信息和具体位置。
驱动配置
一般情况下,如果实例配置、设备表配置、表记录配置中存在相同的字段,优先级依次升高,不填时默认继承上一级的配置。
打开设备监控-驱动管理, 点击添加驱动, 选择driver-mqtt-client
配置实例和设备表参数
配置项说明:
1. MQTT服务器:填写要连接的mqtt服务器的地址。
格式为 tcp://服务器IP或域名:服务器端口,开头的协议部分根据服务器不同,可填tcp,mqtt,mqtts,ssl,ws,wss
例如:tcp://127.0.0.1:1883
2. 用户名、密码:填写MQTT服务器的用户名和密码。
3. 只使用资产配置中的topic:勾选后,实际只订阅表记录中填写的topic
4. 客户端:mqtt客户端id,一般不填写,驱动使用随机字符串作为客户端id
5. 服务质量: 即mqtt协议中的QOS,可填0/1/2
6. SSL/TLS: 当服务器地址为ssl或mqtts时,需要选true,自认证服务器需要添加证书信息
7. 订阅消息主题:填写该模型要订阅的消息主题,可使用通配符#和+。
Topic是个utf-8编码的字符串,可由多个'/'分割开。
+:为单级通配符,如:check/+/baseline
#:为多级通配符,必须放在topic的最后,前一个字符必须是'/'
Topic中不要包含空格符。
8. 通讯监控参数:平台通用配置,当某个资产数据点中的最新上数时间距离当前时间超过此参数时,平台判定资产掉线。
9. 自定义脚本:由于mqtt协议中未规定具体的消息内容格式,因此需要用户编写自定义脚本,用于解析消息和组装要发送的消息内容。
注:实例配置和驱动配置基本相同,实际使用中,相同的字段按优先级覆盖。配置实例和设备表参数
设备匹配规则: 驱动优先匹配表记录配置里的设备标识,然后匹配资产编号
脚本
由于MQTT协议通过主题区分不同种类的消息,因此在使用时,不同的主题建议建立不同的设备表,下边介绍脚本的编写方法。
脚本语言采用的是JavaScript,一部分为固定格式:
// 解析接收数据
// 该服务器和主题收到的消息会调用此方法进行解析,消息主题是方法参数topic,消息内容是方法参数package.
// topic类型为字符串,考虑到部分设备的消息内容为二进制数据,package类型为字节数组.
ParseHandle = function (topic, package) {
let msg = JSON.parse(package.toString()) // 把收到的消息反序化为json对象
// 解析数据
let id = msg.id //id是资产编号,平台通过id判断数据和资产的对应关系,必需
let values = msg.values // values是一个对象,例如{"d1": 100},key为数据点标识,值就是采集的具体数值,必需
// let time = 1644290863000 // 数据点采集时间,UNIX毫秒时间戳,没有time时,默认采用服务器时间
return [ //可返回多个对象,对应多个资产
{id, values} //返回的对象中的key的名字必须为id,values,time, 有时间戳时, 返回{id, values, time}
]
}
// 构建发送数据
// topic为指令上配置的发送指令, id为发送指令时选择的资产的资产编号, op
CommandHandle = function (topic, id, op) {
// 构建发送数据
let sendTopic = topic
let sendData = {"d1": 200}
return {sendTopic, sendData} //返回的对象中的key的名字必须为sendTopic,sendData
}
脚本调试功能(测试功能)
最新版本的mqtt驱动增加了脚本调试功能, 可以分别调试在脚本输入框中编写的解析脚本和指令脚本
脚本示例
示例1-默认脚本
当新建驱动实例或新建设备表时,脚本中默认会有脚本,该脚本对应的报文格式为:
{"id":"表记录编号","values":{"数据点标识":数据点值}}
例如: {"id":"node01","values":{"d1":100,"d2":200}}
脚本:
// 解析接收数据
// 该服务器和主题收到的消息会调用此方法进行解析,消息主题是方法参数topic,消息内容是方法参数package.
// topic类型为字符串;考虑到部分设备的消息内容为二进制数据,package类型为字节数组,
// 当上传数据为json字符串时,首先使用JSON.parse(package)将收到的数据转为json对象.
ParseHandle = function (topic, package) {
let msg = JSON.parse(package.toString()) // 把收到的消息反序化为json对象
let id = msg.id
let values = msg.values
return [
{ id, values }
]
}
// 构建发送数据
CommandHandle = function (topic, id, op) {
// 构建发送数据
let sendTopic = topic
let c = JSON.stringify([{ "abc": op.value }])
let sendData = Buffer.from(c)
return { sendTopic, sendData } //返回的对象中的key的名字必须为sendTopic,sendData
}
设备表中的配置如下图所示:
数据点配置:
添加表记录:
未配置表记录中设备配置,驱动将按照表记录编号匹配资产.
平台配置完成后,点击重启驱动.
使用mqttx模拟设备,连接mqtt服务器,使用/test/01主题,发布消息{"id":"node01","values":{"d1":100,"d2":200}}
可在数据点上查看收到的数据
示例2-JSON类型1
消息格式
某数据采集网关发布的主题的为 /sys/网关id/up, 收到的消息内容如下:
{
"timeStamp": 1514764896,
"version": "5.0",
"messageId": 24,
"devices": [{
"deviceId": "Slave01",
"deviceState": 0,
"deviceData": {
"40001": 0
}
}]
}
其中,devices为数组类型,每个对象对应一个设备及其数据点值. 本示例中,设备id为Slave01,deviceData中为采集的数据
因为平台数据点表上不支持纯数字,因此在脚本中,将报文的中的数据点标识增加一个x字母作为前缀.如将40001改为x40001.
脚本如下:
脚本:
// 解析接收数据
let ParseHandle = function (topic, package) {
const msg = JSON.parse(package.toString());
const res = []
msg.devices.map(el => {
const id = el.deviceId
const values = {}
const entries = Object.entries(el.deviceData)
entries.map(([key, value]) => {
values["x" + key] = Number(value)
})
const time = msg.timeStamp * 1000
res.push({ id, values, time })
})
return res
}
// 构建发送数据
CommandHandle = function (topic, id, op) {
let sendTopic = `box/${id}/command`
let sendData = `{"${op.tag}":${op.value}}`
return {sendTopic, sendData}
}
示例2-JSON类型2
消息格式
收到的topic
收到的消息内容如下:
{
"d":
[
{ "tag": "PLC1:风机未开", "value": 0 },
{ "tag": "PLC1:泵未开", "value": 0 },
{ "tag": "PLC1:阀未开", "value": 0 },
{ "tag": "PLC1:实时功率", "value": 300 },
{ "tag": "PLC2:风机未开", "value": 0 },
{ "tag": "PLC2:泵未开", "value": 1 },
{ "tag": "PLC2:阀未开", "value": 0 },
{ "tag": "PLC2:实时功率", "value": 500 }
],
"ts": "1691573552353"
}
其中,d为数组类型,每个对象对应一个设备的其数据点标识和值. 以{ "tag": "PLC1:实时功率", "value": 300 }为例,
表示设备PLC1的实时功率数据点的值为300.因此,经过脚本处理后,整理为以下平台接收的格式,分别匹配2个设备:
[
{
id: 'PLC1',
values: { '风机未开': 0, '泵未开': 0, '阀未开': 0, '实时功率': 300 },
time: 1691573552353
},
{
id: 'PLC2',
values: { '风机未开': 0, '泵未开': 1, '阀未开': 0, '实时功率': 500 },
time: 1691573552353
}
]
脚本:
// 解析接收数据
ParseHandle = function (topic, package) {
try {
let msg = JSON.parse(package.toString());
let res = {}
// 解析数据为一个对象,key为资产id,值为属于该资产的数据点
// res示例: {"WH_PLC300":{"LT3003":130,"LT5002":181,"LT5001":365}}
if (msg.d) {
for (let i = 0; i < msg.d.length; i++) {
const el = msg.d[i];
let splits = el.tag.split(":")
if (splits.length === 2) {
let did = splits[0]
let tagId = splits[1]
if (!res[did]) {
res[did] = {}
}
res[did][tagId] = Number(el.value)
}
}
}
console.log(JSON.stringify(res))
// 时间解析为毫秒时间戳
// let date = moment(msg.ts)
let time = Number(msg.ts)
// 将res转为平台要求的返回格式
let arr = []
for (const k in res) {
if (Object.hasOwnProperty.call(res, k)) {
const v = res[k];
arr.push({ id: k, values: v, time })
}
}
return arr;
} catch (e) {
console.error("解析脚本错误:", e)
}
return [];
}
// 构建发送数据
CommandHandle = function (topic, id, op) {
let sendTopic = `box/${id}/command`
let sendData = `{"${op.tag}":${op.value}}`
return {sendTopic, sendData}
}
示例3-非json格式的消息解析
消息格式
收到的topic
收到的消息内容如下:
dt01:00000 04201 00077 -11467 00126 0.24 7.11 -76738
消息是一个字符串, 收到的topic是设备id, 冒号后为7个数据点的值,其中最后有一个值缩小100倍,
处理后的格式为
[{"id":"","values":{"d1":"04201","d2":"00077","d3":"-11467","d4":"00126","d5":"0.24","d6":"7.11","d7":-767.38}}]
脚本:
// 解析接收数据
ParseHandle = function (topic, package) {
let msg = package.toString()
let newPkg = msg.split(' ')
let id = topic
console.log(id)
let values = {
"d1": newPkg[1],
"d2": newPkg[2],
"d3": newPkg[3],
"d4": newPkg[4],
"d5": newPkg[5],
"d6": newPkg[6],
"d7": newPkg[7] * 0.01,
}
return [
{ 'id': id, 'values': values }
]
}
// 构建发送数据
CommandHandle = function (topic, id, op) {
let sendTopic = `box/${id}/command`
let sendData = `{"${op.tag}":${op.value}}`
return {sendTopic, sendData}
}
内置对象
日志logger
ParseHandle = function (topic, package) {
logger.Debug(topic)
logger.Info(topic)
logger.Warn(topic)
logger.Error(topic)
let id = msg.id
let values = msg.values
return [
{id, values}
]
}
脚本内mqtt客户端mqttClient
ParseHandle = function (topic, package) {
// 发送到该设备表配置的mqtt服务器
mqttClient.publish("要发送的主题","要发送的消息")
let id = msg.id
let values = msg.values
return [
{id, values}
]
}
新建mqtt服务器方法
1. Windows
1.1 复制一份mqtt服务器程序的文件夹
注意: 自带的mosquitto文件夹的名称及里边的文件都不能修改,否则会导致系统无法启动。只能修改复制后的文件夹下的文件。
在软件安装的目录下找到mosquitto文件夹, 默认在/lib/base下。
复制mosquitto文件夹并重命名为mosquitto2。
在mosquitto2文件夹下找到mosquitto.conf文件以记事本方式打开,在211行找到port 1883。将端口进行修改,例如1884。
1.2 在软件平台中添加mqtt的服务器
进入到运维平台127.0.0.1:13030中,点击服务管理—— 添加服务——高级添加,填写以下内容后保存。注意,输入时前后不能有空格。
项目 内容 应用名称 mqtt2 命令 mosquitto.exe -c mosquitto.conf 目录 ./lib/base/mosquitto2 添加后可以在服务管理中看到服务已经启动。
mqtt密码查找位置
2. Linux
2.1 Linux平台启用mqtt server
找到docker-compose.yml文件位置 鼠标右键,选择用记事本编辑
将下面这段复制到docker-compose.yml里。注意: 对外暴露端口根据实际情况修改,本例中为1884。
mqttserver:
container_name: mqttserver
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=public
image: airiot/rabbitmq:3.8.3-management-alpine
logging:
driver: json-file
options:
max-size: 100m
max-file: "1"
networks:
- backend
ports:
- 1884:1883
restart: always
ulimits:
nproc: 40960
nofile:
soft: 10240
hard: 30720
volumes:
- /etc/localtime:/etc/localtime:ro
配置完成后需要进入平台安装目录下 创建并启动服务:docker-compose up -d 查看mqtt服务是否启动:docker ps | grep mqtt