Skip to main content

MQTT客户端

简介

  • MQTT全称为消息队列遥测传输(英语:Message Queuing Telemetry Transport),是ISO 标准(ISO/IEC PRF 20922)下基于发布 (Publish) /订阅 (Subscribe)范式的消息协议,工作在 TCP/IP协议族上。
  • MQTT最大优点在于,可以用极少的数据和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
  • MQTT 协议定义了两种网络实体:消息代理(message broker)与客户端(client)。其中,消息代理用于接收来自客户端的消息并转发至目标客户端,也被称为MQTT服务器。MQTT 客户端可以是任何运行有 MQTT 库并通过网络连接至消息代理的设备,例如微型控制器或大型服务器。
  • 在MQTT 协议中,消息的传输是通过主题(topic)管理的,消息的发布者和订阅者均为客户端(client)。消息由主题(Topic)和负载(payload)两部分组成。当某个客户端有需要分发的数据时,会向连接的消息代理(MQTT服务器)发送指定主题的携带有负载的消息。服务器收到消息后,会向订阅此主题的客户端分发此消息。发布者不需要配置订阅者的相关信息和具体位置;同样,订阅者也不需要配置发布者的相关信息和具体位置。

驱动配置

一般情况下,如果实例配置、设备表配置、表记录配置中存在相同的字段,优先级依次升高,不填时默认继承上一级的配置。

  1. 打开设备监控-驱动管理, 点击添加驱动, 选择driver-mqtt-client image.png

  2. 配置实例和设备表参数 image.png

    配置项说明:
    1. MQTT服务器:填写要连接的mqtt服务器的地址。
    格式为 tcp://服务器IP或域名:服务器端口,开头的协议部分根据服务器不同,可填tcp,mqtt,mqtts,ssl,ws,wss
    例如:tcp://127.0.0.1:1883
    2. 用户名、密码:填写MQTT服务器的用户名和密码。
    3. 只使用资产配置中的topic:勾选后,实际只订阅表记录中填写的topic
    4. 客户端:mqtt客户端id,一般不填写,驱动使用随机字符串作为客户端id
    5. 服务质量: 即mqtt协议中的QOS,可填0/1/2
    6. SSL/TLS: 当服务器地址为ssl或mqtts时,需要选true,自认证服务器需要添加证书信息
    7. 订阅消息主题:填写该模型要订阅的消息主题,可使用通配符#和+。
    Topic是个utf-8编码的字符串,可由多个'/'分割开。
    +:为单级通配符,如:check/+/baseline
    #:为多级通配符,必须放在topic的最后,前一个字符必须是'/'
    Topic中不要包含空格符。
    8. 通讯监控参数:平台通用配置,当某个资产数据点中的最新上数时间距离当前时间超过此参数时,平台判定资产掉线。
    9. 自定义脚本:由于mqtt协议中未规定具体的消息内容格式,因此需要用户编写自定义脚本,用于解析消息和组装要发送的消息内容。

    注:实例配置和驱动配置基本相同,实际使用中,相同的字段按优先级覆盖。
  3. 配置实例和设备表参数

    设备匹配规则: 驱动优先匹配表记录配置里的设备标识,然后匹配资产编号 image.png

脚本

由于MQTT协议通过主题区分不同种类的消息,因此在使用时,不同的主题建议建立不同的设备表,下边介绍脚本的编写方法。

脚本语言采用的是JavaScript,一部分为固定格式:

// 解析接收数据
// 该服务器和主题收到的消息会调用此方法进行解析,消息主题是方法参数topic,消息内容是方法参数package.
// topic类型为字符串,考虑到部分设备的消息内容为二进制数据,package类型为字节数组.
ParseHandle = function (topic, package) {

let msg = JSON.parse(package.toString()) // 把收到的消息反序化为json对象

// 解析数据
let id = msg.id //id是资产编号,平台通过id判断数据和资产的对应关系,必需
let values = msg.values // values是一个对象,例如{"d1": 100},key为数据点标识,值就是采集的具体数值,必需
// let time = 1644290863000 // 数据点采集时间,UNIX毫秒时间戳,没有time时,默认采用服务器时间
return [ //可返回多个对象,对应多个资产
{id, values} //返回的对象中的key的名字必须为id,values,time, 有时间戳时, 返回{id, values, time}
]
}

// 构建发送数据
// topic为指令上配置的发送指令, id为发送指令时选择的资产的资产编号, op
CommandHandle = function (topic, id, op) {
// 构建发送数据
let sendTopic = topic
let sendData = {"d1": 200}
return {sendTopic, sendData} //返回的对象中的key的名字必须为sendTopic,sendData
}

脚本调试功能(测试功能)

最新版本的mqtt驱动增加了脚本调试功能, 可以分别调试在脚本输入框中编写的解析脚本和指令脚本

image.png image.png

脚本示例

示例1-默认脚本

当新建驱动实例或新建设备表时,脚本中默认会有脚本,该脚本对应的报文格式为:

{"id":"表记录编号","values":{"数据点标识":数据点值}}

例如: {"id":"node01","values":{"d1":100,"d2":200}}

脚本:

// 解析接收数据
// 该服务器和主题收到的消息会调用此方法进行解析,消息主题是方法参数topic,消息内容是方法参数package.
// topic类型为字符串;考虑到部分设备的消息内容为二进制数据,package类型为字节数组,
// 当上传数据为json字符串时,首先使用JSON.parse(package)将收到的数据转为json对象.

ParseHandle = function (topic, package) {

let msg = JSON.parse(package.toString()) // 把收到的消息反序化为json对象

let id = msg.id
let values = msg.values

return [
{ id, values }
]
}

// 构建发送数据
CommandHandle = function (topic, id, op) {

// 构建发送数据
let sendTopic = topic
let c = JSON.stringify([{ "abc": op.value }])
let sendData = Buffer.from(c)
return { sendTopic, sendData } //返回的对象中的key的名字必须为sendTopic,sendData
}

设备表中的配置如下图所示: image.png

数据点配置:

image.png

添加表记录:

image.png

未配置表记录中设备配置,驱动将按照表记录编号匹配资产.

平台配置完成后,点击重启驱动.

使用mqttx模拟设备,连接mqtt服务器,使用/test/01主题,发布消息{"id":"node01","values":{"d1":100,"d2":200}}

image.png

可在数据点上查看收到的数据

image.png

示例2-JSON类型1

消息格式

某数据采集网关发布的主题的为 /sys/网关id/up, 收到的消息内容如下:
{
"timeStamp": 1514764896,
"version": "5.0",
"messageId": 24,
"devices": [{
"deviceId": "Slave01",
"deviceState": 0,
"deviceData": {
"40001": 0
}
}]
}
其中,devices为数组类型,每个对象对应一个设备及其数据点值. 本示例中,设备id为Slave01,deviceData中为采集的数据
因为平台数据点表上不支持纯数字,因此在脚本中,将报文的中的数据点标识增加一个x字母作为前缀.如将40001改为x40001.
脚本如下:

脚本:

// 解析接收数据
let ParseHandle = function (topic, package) {
const msg = JSON.parse(package.toString());

const res = []

msg.devices.map(el => {
const id = el.deviceId
const values = {}
const entries = Object.entries(el.deviceData)
entries.map(([key, value]) => {
values["x" + key] = Number(value)
})
const time = msg.timeStamp * 1000
res.push({ id, values, time })
})

return res
}
// 构建发送数据
CommandHandle = function (topic, id, op) {

let sendTopic = `box/${id}/command`
let sendData = `{"${op.tag}":${op.value}}`
return {sendTopic, sendData}
}

示例2-JSON类型2

消息格式

收到的topic
收到的消息内容如下:
{
"d":
[
{ "tag": "PLC1:风机未开", "value": 0 },
{ "tag": "PLC1:泵未开", "value": 0 },
{ "tag": "PLC1:阀未开", "value": 0 },
{ "tag": "PLC1:实时功率", "value": 300 },
{ "tag": "PLC2:风机未开", "value": 0 },
{ "tag": "PLC2:泵未开", "value": 1 },
{ "tag": "PLC2:阀未开", "value": 0 },
{ "tag": "PLC2:实时功率", "value": 500 }
],
"ts": "1691573552353"
}

其中,d为数组类型,每个对象对应一个设备的其数据点标识和值. 以{ "tag": "PLC1:实时功率", "value": 300 }为例,
表示设备PLC1的实时功率数据点的值为300.因此,经过脚本处理后,整理为以下平台接收的格式,分别匹配2个设备:
[
{
id: 'PLC1',
values: { '风机未开': 0, '泵未开': 0, '阀未开': 0, '实时功率': 300 },
time: 1691573552353
},
{
id: 'PLC2',
values: { '风机未开': 0, '泵未开': 1, '阀未开': 0, '实时功率': 500 },
time: 1691573552353
}
]

脚本:

// 解析接收数据

ParseHandle = function (topic, package) {
try {
let msg = JSON.parse(package.toString());

let res = {}

// 解析数据为一个对象,key为资产id,值为属于该资产的数据点
// res示例: {"WH_PLC300":{"LT3003":130,"LT5002":181,"LT5001":365}}
if (msg.d) {
for (let i = 0; i < msg.d.length; i++) {
const el = msg.d[i];
let splits = el.tag.split(":")
if (splits.length === 2) {
let did = splits[0]
let tagId = splits[1]
if (!res[did]) {
res[did] = {}
}
res[did][tagId] = Number(el.value)
}
}
}

console.log(JSON.stringify(res))

// 时间解析为毫秒时间戳
// let date = moment(msg.ts)
let time = Number(msg.ts)

// 将res转为平台要求的返回格式
let arr = []
for (const k in res) {
if (Object.hasOwnProperty.call(res, k)) {
const v = res[k];
arr.push({ id: k, values: v, time })
}
}

return arr;

} catch (e) {
console.error("解析脚本错误:", e)
}
return [];
}
// 构建发送数据
CommandHandle = function (topic, id, op) {

let sendTopic = `box/${id}/command`
let sendData = `{"${op.tag}":${op.value}}`
return {sendTopic, sendData}
}

示例3-非json格式的消息解析

消息格式

收到的topic
收到的消息内容如下:
dt01:00000 04201 00077 -11467 00126 0.24 7.11 -76738
消息是一个字符串, 收到的topic是设备id, 冒号后为7个数据点的值,其中最后有一个值缩小100倍,
处理后的格式为
[{"id":"","values":{"d1":"04201","d2":"00077","d3":"-11467","d4":"00126","d5":"0.24","d6":"7.11","d7":-767.38}}]

脚本:

// 解析接收数据
ParseHandle = function (topic, package) {
let msg = package.toString()
let newPkg = msg.split(' ')
let id = topic
console.log(id)
let values = {
"d1": newPkg[1],
"d2": newPkg[2],
"d3": newPkg[3],
"d4": newPkg[4],
"d5": newPkg[5],
"d6": newPkg[6],
"d7": newPkg[7] * 0.01,
}

return [
{ 'id': id, 'values': values }
]
}
// 构建发送数据
CommandHandle = function (topic, id, op) {

let sendTopic = `box/${id}/command`
let sendData = `{"${op.tag}":${op.value}}`
return {sendTopic, sendData}
}

内置对象

1. 日志logger

ParseHandle = function (topic, package) {

logger.Debug(topic)
logger.Info(topic)
logger.Warn(topic)
logger.Error(topic)

let id = msg.id
let values = msg.values
return [
{id, values}
]
}

2. mqtt客户端mqttClient

ParseHandle = function (topic, package) {

// 发送到该设备表配置的mqtt服务器
mqttClient.publish("要发送的主题","要发送的消息")

let id = msg.id
let values = msg.values
return [
{id, values}
]
}

3. 本内平台api客户端api(驱动4.1.2以上)

方法:

1. queryTableData(tableId,query)

查询指定工作表

参数说明
参数名参数类型参数说明示例值
tableIdString工作表标识例如: student
queryobject过滤条件该字段为 JSON 对象
返回值
参数名参数类型参数说明示例值
successbool是否成功请求是否成功. true: 成功, false: 失败
messagestring信息请求失败时为失败原因
dataobject查询结果包含结果条数和查询到的工作表记录
--countnumber结果条数数字
--dataobject工作表记录数组该字段为 JSON 对象数组
示例
    const response = api.queryTableData("student", {"skip":0,"limit":9999,"project":{"name":1,"age":1,"sex":1},"filter":{"age":{"$gte":1,"$lte":22}},"withCount":true});
if (!response.success) {
console.log("写入数据失败:", response.message);
return;
}
2. saveTableData(tableId, row)

向工作表写入一条数据

参数说明:

参数名参数类型参数说明示例值
tableIdString工作表标识例如: student
rowobject写入数据该字段为 JSON 对象, 内容根据工作表定义填写. 例如: {"name": "小明", "age": 18}

返回值:

参数名参数类型参数说明示例值
successbool是否成功请求是否成功. true: 成功, false: 失败
messagestring信息请求失败时为失败原因
datastring记录ID请求成功时, 为新增记录的的ID

示例:

    const response = api.saveTableData("student", {"name": "小明", "age": 18});
if (!response.success) {
console.log("写入数据失败:", response.message);
return;
}

// 新增记录ID
const rowId = response.data;
3. updateTableData(tableId, nodeId, data)

根据资产标识更新资产数据

参数说明:

参数名参数类型参数说明示例值
tableIdString工作表标识例如: student
nodeIdString记录标识例如: ST10001
dataobject更新内容该字段为 JSON 对象, 要更新的内容. 例如: {"name": "小明", "age": 18}

返回值:

参数名参数类型参数说明示例值
successbool是否成功请求是否成功. true: 成功, false: 失败
messagestring信息请求失败时为失败原因

示例:

    // 更新 name 字段值为 "小明" 所有记录的 age 字段的值为 19
const response = api.updateTableData("student","ST10001", {"name": "小明", "age": 18});
if (!response.success) {
console.log("更新资产失败:", response.message);
return;
}

新建mqtt服务器方法

1. Windows

1.1 复制一份mqtt服务器程序的文件夹

注意: 自带的mosquitto文件夹的名称及里边的文件都不能修改,否则会导致系统无法启动。只能修改复制后的文件夹下的文件。

  1. 在软件安装的目录下找到mosquitto文件夹, 默认在/lib/base下。

  2. 复制mosquitto文件夹并重命名为mosquitto2。

    图片.png

  3. 在mosquitto2文件夹下找到mosquitto.conf文件以记事本方式打开,在211行找到port 1883。将端口进行修改,例如1884。

    图片.png

1.2 在软件平台中添加mqtt的服务器

  1. 进入到运维平台127.0.0.1:13030中,点击服务管理—— 添加服务——高级添加,填写以下内容后保存。注意,输入时前后不能有空格。

    项目内容
    应用名称mqtt2
    命令mosquitto.exe -c mosquitto.conf
    目录./lib/base/mosquitto2

    图片.png

  2. 添加后可以在服务管理中看到服务已经启动。

    图片.png

  3. mqtt密码查找位置

    图片.png 图片.png 图片.png 图片.png

2. Linux

2.1 Linux平台启用mqtt server

找到docker-compose.yml文件位置 图片.png 鼠标右键,选择用记事本编辑 图片.png 图片.png

将下面这段复制到docker-compose.yml里。注意: 对外暴露端口根据实际情况修改,本例中为1884。

  mqttserver:
container_name: mqttserver
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=public
image: airiot/rabbitmq:3.8.3-management-alpine
logging:
driver: json-file
options:
max-size: 100m
max-file: "1"
networks:
- backend
ports:
- 1884:1883
restart: always
ulimits:
nproc: 40960
nofile:
soft: 10240
hard: 30720
volumes:
- /etc/localtime:/etc/localtime:ro

图片.png 配置完成后需要进入平台安装目录下 创建并启动服务:docker-compose up -d 查看mqtt服务是否启动:docker ps | grep mqtt