Kafka客户端
驱动配置说明
- 在系统数据管理界面中选择要配置的设备表,进入表编辑界面选择设备配置TAB页,在设备驱动一栏选择Kafka客户端。无无该选项时,点击右侧的驱动市场按钮在线安装驱动。
- 配置驱动参数
(1)brokers:填写要连接的kafka服务器的node地址,可配置多个。格式为
服务器IP或域名:服务器端口 例如:127.0.0.1:9022
(2)groupid:填写该设备表订阅的消费组,默认自动生成 (3)订阅消息主题:填写该设备表要订阅的消息主题 (4)startOffset:偏移量,订阅消息偏移量,默认末尾读 (5)通讯监控参数:平台通用配置,当某个设备数据点中的最新上数时间距离当前时间超过此参数时,平台判定资产掉线。 (6)接收数据处理脚本:由于协议中未规定具体的消息内容格式,因此需要用户编写自定义脚本,用于解析消息收到的消息内容。 (7)发送命令处理脚本:由于协议中未规定具体的消息内容格式,因此需要用户编写自定义脚本,用于发送消息内容。
使用示例
由于Kafka协议通过主题区分不同种类的消息,因此在使用时,不同的主题需要建立不同的设备表,下边以网关常用的Kafka格式为例,介绍脚本的编写方法。 脚本 脚本语言采用的是JavaScript,一部分为固定格式:
接收数据处理脚本: 参数:data为收到的数据 (1) 格式:
参数 | 类型 | 说明 | 示例值 |
---|---|---|---|
topic | 字符串 | 消息主题 | |
partition | 数字 | 分区信息 | |
offset | 数字 | 消息偏移位置 | |
key | 字节数组 | 消息key | |
value | 字节数组 | 数据 | |
devices | 对象数组 | 设备数组 |
(2) 示例数据: {"topic":"test","partition":0,"offset":0,"key":[],"value":[]} 返回格式:[{"id":"资产编号或设备标识","fields":{"数据点标识":"值"},"time":"毫秒"}}] (1)格式:数组
参数 | 类型 | 说明 | 示例值 |
---|---|---|---|
array[object] | 数组 | 返回值 | [{"id":"001","fields":{"temperature":1.2},"time":1664256913000}}] |
id | 字符串 | 资产编号或设备标识 | 001 |
fields | 对象 | 数据 | {"temperature":1.2} |
key | 字符串 | 数据点标识 | temperature |
value | any | 数据值 | 1.2 |
time | int64 | 时间戳 | 1664256913000 |
function handler(data) {
// data: {"topic":"test","partition":0,"offset":0,"key":[],"value":[]}
console.log("data", JSON.stringify(data));
let arr = [];
let str = Buffer.from(data.value).toString();
console.log("value", str);
let value = JSON.parse(str);
if (value.properties) {
for (let i = 0; i < value.properties.length; i++) {
let dev = value.properties[i];
let fields = {};
if (dev.data) {
for (let j = 0; j < dev.data.length; j++) {
let devData = dev.data[j];
for (let k in devData) {
if (k != "ts" && k != "dq") {
fields[k.replace('.', '_')] = devData[k]
}
}
}
}
let obj = { "id": dev.tid, fields, "time": value.ts * 1000 };
arr.push(obj);
}
}
// arr [{"id":"001","fields":{"temperature":1.2},"time":1664256913000}}]
return arr
}
发送数据处理脚本: 参数:nodeid为资产编号,cmd为指令数据 示例:
{
"name": "test",
"ops": [{
"key": "count1",
"param": "test",
"topic": "test121",
"value": "123"
}],
"params": {
"test": "123"
},
"showName": "测试",
"writeOut": {},
"defaultValue": {
"test": "123"
},
"form": [{
"name": "test",
"type": "string",
"defaultValue ": {
"default ": "123 "
},
"ioway ": "默认写入"
}]
}
返回: (1)格式:
参数 | 类型 | 说明 | 示例值 |
---|---|---|---|
topic | 字符串 | 消息主题 | |
partition | 数字 | 分区信息 | |
balancer | 字符串 | 可选参数: | |
LeastBytes、Hash、ReferenceHash、CRC32Balancer、Murmur2Balancer、RoundRobin | |||
key | 字节数组 | 消息key | |
value | 字节数组 | 数据 |
(2)示例:{"topic":"消息主题","partition":"分区","balancer":"","key":"key值","value":"数据"}
function handler(tableId, deviceId, cmd) {
console.log("run",tableId, deviceId, JSON.stringify(cmd));
let key = new Uint8Array(Buffer.from(cmd.ops[0].key)).buffer;
let value = new Uint8Array(Buffer.from(cmd.params[cmd.name])).buffer;
console.log("key", key);
console.log("value", value);
return {
"topic": cmd.ops[0].topic,
"partition": null,
"balancer": "",
"key": key,
"value": value
};
}