Skip to main content

Kafka客户端

驱动配置说明

  1. 在系统数据管理界面中选择要配置的设备表,进入表编辑界面选择设备配置TAB页,在设备驱动一栏选择Kafka客户端。无无该选项时,点击右侧的驱动市场按钮在线安装驱动。

image.png image.png

  1. 配置驱动参数

(1)brokers:填写要连接的kafka服务器的node地址,可配置多个。格式为

服务器IP或域名:服务器端口 例如:127.0.0.1:9022

(2)groupid:填写该设备表订阅的消费组,默认自动生成 (3)订阅消息主题:填写该设备表要订阅的消息主题 (4)startOffset:偏移量,订阅消息偏移量,默认末尾读 (5)通讯监控参数:平台通用配置,当某个设备数据点中的最新上数时间距离当前时间超过此参数时,平台判定资产掉线。 (6)接收数据处理脚本:由于协议中未规定具体的消息内容格式,因此需要用户编写自定义脚本,用于解析消息收到的消息内容。 (7)发送命令处理脚本:由于协议中未规定具体的消息内容格式,因此需要用户编写自定义脚本,用于发送消息内容。

使用示例

由于Kafka协议通过主题区分不同种类的消息,因此在使用时,不同的主题需要建立不同的设备表,下边以网关常用的Kafka格式为例,介绍脚本的编写方法。 脚本 脚本语言采用的是JavaScript,一部分为固定格式:

接收数据处理脚本: 参数:data为收到的数据 (1) 格式:

参数类型说明示例值
topic字符串消息主题
partition数字分区信息
offset数字消息偏移位置
key字节数组消息key
value字节数组数据
devices对象数组设备数组

(2) 示例数据: {"topic":"test","partition":0,"offset":0,"key":[],"value":[]} 返回格式:[{"id":"资产编号或设备标识","fields":{"数据点标识":"值"},"time":"毫秒"}}] (1)格式:数组

参数类型说明示例值
array[object]数组返回值[{"id":"001","fields":{"temperature":1.2},"time":1664256913000}}]
id字符串资产编号或设备标识001
fields对象数据{"temperature":1.2}
key字符串数据点标识temperature
valueany数据值1.2
timeint64时间戳1664256913000
function handler(data) {
// data: {"topic":"test","partition":0,"offset":0,"key":[],"value":[]}
console.log("data", JSON.stringify(data));
let arr = [];
let str = Buffer.from(data.value).toString();
console.log("value", str);
let value = JSON.parse(str);
if (value.properties) {
for (let i = 0; i < value.properties.length; i++) {
let dev = value.properties[i];

let fields = {};
if (dev.data) {
for (let j = 0; j < dev.data.length; j++) {
let devData = dev.data[j];
for (let k in devData) {
if (k != "ts" && k != "dq") {
fields[k.replace('.', '_')] = devData[k]
}
}
}
}

let obj = { "id": dev.tid, fields, "time": value.ts * 1000 };
arr.push(obj);
}
}
// arr [{"id":"001","fields":{"temperature":1.2},"time":1664256913000}}]
return arr
}

发送数据处理脚本: 参数:nodeid为资产编号,cmd为指令数据 示例:

{
"name": "test",
"ops": [{
"key": "count1",
"param": "test",
"topic": "test121",
"value": "123"
}],
"params": {
"test": "123"
},
"showName": "测试",
"writeOut": {},
"defaultValue": {
"test": "123"
},
"form": [{
"name": "test",
"type": "string",
"defaultValue ": {
"default ": "123 "
},
"ioway ": "默认写入"
}]
}

返回: (1)格式:

参数类型说明示例值
topic字符串消息主题
partition数字分区信息
balancer字符串可选参数:
LeastBytes、Hash、ReferenceHash、CRC32Balancer、Murmur2Balancer、RoundRobin
key字节数组消息key
value字节数组数据

(2)示例:{"topic":"消息主题","partition":"分区","balancer":"","key":"key值","value":"数据"}

function handler(tableId, deviceId, cmd) {
console.log("run",tableId, deviceId, JSON.stringify(cmd));
let key = new Uint8Array(Buffer.from(cmd.ops[0].key)).buffer;
let value = new Uint8Array(Buffer.from(cmd.params[cmd.name])).buffer;
console.log("key", key);
console.log("value", value);
return {
"topic": cmd.ops[0].topic,
"partition": null,
"balancer": "",
"key": key,
"value": value
};
}