Skip to main content

数据接入驱动开发

本文将会详细介绍如何使用 Node SDK 开发自定义数据接入驱动.

介绍

数据接入驱动 是为实现从不同的协议、设备或其它平台系统采集数据而开发的特定程序. 每个 数据接入驱动 程序需要根据协议的特点实现数据采集功能,然后将采集到的数据通过 Node SDK 提供的接口发送到平台.

Node SDK 提供了 数据接入驱动 开发的相关内容. 包括驱动的接口定义, 以及与平台交互功能. 开发者只需要实现接口中的方法.

开发步骤

1. 创建项目

  1. 创建目录
  2. 进入项目执行初始化
npm init
  1. 创建index.js文件

2. 引入SDK

const {App, Driver} = require('@airiot/sdk-nodejs/driver')

3. 定义schema

数据接入驱动 需要定义一个 schema 用于描述驱动的配置信息. schema 是一个类似于 json 格式的对象, 详细格式说明见 数据接入驱动schema说明. 以下是一个简单的示例:

({
"driver": {
"properties": {
"settings": {
"title": "实例配置",
"type": "object",
"properties": {
"server": {
"type": "string",
"title": "服务器",
"descripption": "MQTT 服务器地址. 例如: tcp://127.0.0.1:1883"
},
"username": {
"type": "string",
"title": "用户名",
},
"password": {
"type": "string",
"title": "密码",
"fieldType": "password"
},
"clientId": {
"type": "string",
"title": "客户端ID"
},
"topic": {
"type": "string",
"title": "主题",
"descripption": "接收数据的主题. 例如: /data/#"
},
"parseScript": {
"type": "string",
"title": "数据处理脚本",
"fieldType": "deviceScriptEdit",
"description": "消息处理脚本. 函数名必须为 'handler'",
"defaultScript": "/**\n" +
" * 数据处理脚本, 处理从 mqtt 接收到的数据.\n" +
" *\n" +
" * @param {string} topic 消息主题\n" +
" * @param {string} message 消息内容\n" +
" * @return 消息解析结果\n" +
" */\n" +
"function handler(topic, message) {\n" +
"\t\n" +
"\t// 脚本返回值必须为对象数组\n" +
"\t// \tid: 资产编号\n" +
"\t//\ttime: 时间戳(毫秒)\n" +
"\t// fields: 数据点数据. 该字段为 JSON 对象, key 为数据点标识, value 为数据点的值\n" +
"\treturn [\n" +
"\t\t{\"table\": \"T10001\", \"id\": \"SN10001\", \"time\": new Date().getTime(), \"fields\": {\"key1\": \"this is a string value\", \"key2\": true, \"key3\": 123.456}}\n" +
"\t];\n" +
"}"
},
"commandScript": {
"type": "string",
"title": "指令处理脚本",
"fieldType": "deviceScriptEdit",
"description": "指令处理脚本. 函数名必须为 'handler'",
"defaultScript": "/**\n" +
" * 指令处理脚本. 发送指令时会将指令内容传递给脚本, 然后由指定返回最终要发送的信息.\n" +
" *\n" +
" * @param {string} 工作表标识\n" +
" * @param {string} 资产编号\n" +
" * @param {object} 命令内容\n" +
" * @return {object} 最终要发送的消息, 及目标 topic\n" +
" */\n" +
"function handler(tableId, deviceId, command) {\n" +
"\t\n" +
"\t// 脚本返回值必须为下面对象结构\n" +
"\t//\t\ttopic: 消息发送的目标 topic\n" +
"\t//\t\tpayload: 消息内容\n" +
"\treturn {\n" +
"\t\t\"topic\": \"cmd/\" + deviceId,\n" +
"\t\t\"payload\": \"发送内容\"\n" +
"\t};\n" +
"}"
},
"network": {
"type": "object",
"title": "通讯监控参数",
"properties": {
"timeout": {
"title": "通讯超时时间(s)",
"description": "经过多长时间仪表还没有任何数据上传,认定为通讯故障",
"type": "number"
}
}
}
},
"required": ["server", "username", "password", "topic", "parseScript", "commandScript"]
}
}
},
"model": {
"properties": {
"settings": {
"title": "模型配置",
"type": "object",
"properties": {
"network": {
"type": "object",
"title": "通讯监控参数",
"properties": {
"timeout": {
"title": "通讯超时时间(s)",
"description": "经过多长时间仪表还没有任何数据上传,认定为通讯故障",
"type": "number"
}
}
}
}
},
"tags": {
"title": "数据点",
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string",
"title": "标识",
"description": "数据点的标识, 用于在数据点列表中唯一标识数据点"
},
"name": {
"type": "string",
"title": "名称",
"description": "数据点的名称"
}
},
"required": ["id", "name"]
}
},
"commands": {
"title": "命令",
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"title": "名称"
},
"ops": {
"type": "array",
"title": "指令",
"items": {
"type": "object",
"properties": {
"topic": {
"type": "string",
"title": "主题",
"description": "发送消息的主题. 例如: /cmd/control",
},
"message": {
"type": "string",
"title": "消息",
"description": "发送的消息. 例如: {\"cmd\":\"start\"}",
},
"qos": {
"type": "number",
"title": "QoS",
"description": "消息质量. 0,1,2",
"enum": [0, 1, 2],
"enum_title": ["QoS0", "QoS1", "QoS2"]
},
},
"required": ["name", "message"]
}
}
}
}
}
}
},
"device": {
"properties": {
"settings": {
"title": "设备配置",
"type": "object",
"properties": {
"customDeviceId": {
"type": "string",
"title": "设备编号",
"descripption": "自定义设备编号. 如果未定义则使用平台中的资产编号"
},
"network": {
"type": "object",
"title": "通讯监控参数",
"properties": {
"timeout": {
"title": "通讯超时时间(s)",
"description": "经过多长时间仪表还没有任何数据上传,认定为通讯故障",
"type": "number"
}
}
}
},
"required": []
}
}
}
})

4. 根据schema返回对应配置

在上一步骤中, 通过 schema 定义了驱动的相关配置, 包括 驱动配置数据点配置指令配置.

整体格式说明

驱动从平台接收到的配置信息整体格式如下:

{
"id": "nodedemo",
"name": "NodeSDKDemo",
"driverType": "node-demo",
"device": {
"commands": [],
"settings": {
"clientId": "node-demo",
"commandScript": "function handler(tableId, deviceId, command) {\n return {\"topic\": \"cmd/\" + tableId + \"/\" + deviceId, \"payload\": JSON.stringify(command.params)}\n }",
"parseScript": "function handler(topic, message) {\n let arr = JSON.parse(message.toString())\n let topics = topic.split(\"/\");\n let field = {}\n arr.forEach(ele => {\n field[ele.key] = ele.value\n })\n // 脚本返回值必须为对象数组\n // \tid: 资产编号\n //\ttime: 时间戳(毫秒)\n // fields: 数据点数据. 该字段为 JSON 对象, key 为数据点标识, value 为数据点的值\n return [\n {\"table\": topics[1], \"id\": topics[2], \"time\": new Date().getTime(), \"fields\": field}\n ]\n}",
"password": "public",
"server": "mqtt://localhost:1883",
"topic": "test/#",
"username": "admin"
}
},
"tables": [{
"id": "nodedriverdemo",
"device": {
"driver": "node-demo",
"groupId": "nodedemo",
"settings": {
"clientId": "node-deemo",
"commandScript": "/**\n * 指令处理脚本. 发送指令时会将指令内容传递给脚本, 然后由指定返回最终要发送的信息.\n *\n * @param {string} 工作表标识\n * @param {string} 资产编号\n * @param {object} 命令内容\n * @return {object} 最终要发送的消息, 及目标 topic\n */\nfunction handler(tableId, deviceId, command) {\n\t// 脚本返回值必须为下面对象结构\n\t//\t\ttopic: 消息发送的目标 topic\n\t//\t\tpayload: 消息内容\n\treturn {\n\t\t\"topic\": \"cmd/\" + deviceId,\n\t\t\"payload\": \"发送内容\"\n\t};\n}",
"parseScript": "/**\n * 数据处理脚本, 处理从 mqtt 接收到的数据.\n *\n * @param {string} topic 消息主题\n * @param {string} message 消息内容\n * @return 消息解析结果\n */\nfunction handler(topic, message) {\n\t// 脚本返回值必须为对象数组\n\t// \tid: 资产编号\n\t//\ttime: 时间戳(毫秒)\n\t// fields: 数据点数据. 该字段为 JSON 对象, key 为数据点标识, value 为数据点的值\n\treturn [\n\t\t{\"id\": \"SN10001\", \"time\": new Date().getTime(), \"fields\": {\"key1\": \"this is a string value\", \"key2\": true, \"key3\": 123.456}}\n\t];\n}",
"password": "public",
"server": "mqtt://localhost:1883",
"topic": "test/#",
"username": "admin"
},
"tags": [{
"id": "p1",
"name": "数据点1"
}],
"commands": [{
"defaultValue": {
"c1": 2
},
"form": [{
"arrayValue": null,
"defaultValue": {
"default": 2
},
"ifRepeat": null,
"ioway": "默认写入",
"mod": null,
"name": "c1",
"objectValue": null,
"objectValue2": null,
"select": null,
"select2": null,
"tableValue": null,
"tableValue2": null,
"tag": null,
"tagValue": null,
"type": "number"
}],
"name": "c1",
"ops": [{
"param": "c1"
}],
"showName": "指令1",
"tag": null,
"writeOut": {
"arrayValue": null,
"defaultValue": null,
"ifRepeat": null,
"mod": null,
"objectValue": null,
"objectValue2": null,
"select": null,
"select2": null,
"tableValue": null,
"tableValue2": null,
"tag": null,
"tagValue": null
}
}],
"events": null
},
"devices": [{
"id": "nodesdk1",
"name": "nodesdk1",
"device": {
"driver": "",
"groupId": "",
"settings": null,
"tags": null,
"commands": null,
"events": null
},
"disable": false,
"off": false
}]
}]
}

上述格式中的 devicetables.devicetables.devices.device 分别为 驱动实例配置模型配置(工作表的设备配置)资产配置(设备的设备配置).

其中 settings驱动配置信息, 与 schema 中的 settings 对应. tags数据点配置信息, 与 schema 中的 tags 对应. commands指令配置信息, 与 schema 中的 commands 对应.

info

驱动实例模型资产 中的 settings 可以不相同, 但 tagscommands 必须相同. 例如, 可以把统一的配置信息放在 驱动实例 中, 把不同的配置信息放在 模型资产 中.

5. 实现驱动接口

SDK 中定义了 数据接入驱动接口, 该接口是平台控制驱动的桥梁. 开发者需要实现这个接口.

class Driver {
/**
* @name: schema
* @msg: 查询返回驱动配置schema js内容
* @param app
* @param callback (err,"string") 驱动配置schema,返回字符串
*/
schema(app, callback) {}

/**
* @name: start
* @msg: 驱动启动
* @param app
* @param driverConfig 包含实例、模型及设备数据
* @param callback (err)
*/
start(app, driverConfig, callback) {}

/**
* @name: run
* @msg: 运行指令,向设备写入数据
* @param app
* @param command 指令参数 {"table":"表标识","id":"设备编号","serialNo":"流水号","command":{}} command 指令内容
* @param callback (err,result) result自定义返回的格式或者空
*/
run(app, command, callback) {}

/**
* @name: batchRun
* @msg: 批量运行指令,向多设备写入数据
* @param app
* @param command 指令参数 {"table":"表标识", "ids": ["设备编号"], "serialNo": "流水号", 'command': {}} command 指令内容
* @param callback (err,result) result自定义返回的格式或者空
*/
batchRun(app, command, callback) {}

/**
* @name: writeTag
* @msg: 数据点写入
* @param app
* @param command 指令参数 {"table":"表标识","id":"设备编号","serialNo":"流水号","command":{}} command 指令内容
* @param callback (err,result) result自定义返回的格式或者空
*/
writeTag(app, command, callback) {}

/**
* @name: debug
* @msg: 调试驱动
* @param app
* @param debugConfig object 调试参数
* @param callback (err,result) result调试结果,自定义返回的格式
*/
debug(app, debugConfig, callback) {}

/**
* @name: debug
* @msg: 代理接口
* @param app
* @param type 请求接口标识
* @param header 请求头
* @param data 请求数据
* @param callback (err,result) result) result响应结果,自定义返回的格式
*/
httpProxy(app, type, header, data, callback) {}

/**
* @name: stop
* @msg: 驱动停止处理
* @param app
* @param callback (err)
*/
stop(app, callback) {}

/**
* @name: getVersion
* @msg: 驱动版本
* @return: string 驱动版本号
*/
getVersion() {}
}
info

注: 向平台上报采集到的数据时, 必须通过 驱动与平台交互接口 中的 writePoint 方法发送, 不能直接调用 MQTT 客户端发送. 因为 SDK 会对发送的数据进行一些处理, 包括有效范围处理、数值映射、缩放比例、小数位等处理.

6. 配置驱动

这里所说 驱动配置 主要是一些静态配置信息(与 schema 中定义的配置无关), 其中包括 平台配置信息, 驱动配置信息自定义配置信息. 这些信息一般通过配置文件(rc)、环境变量、命令行参数等方式传入. 其中一些配置信息由平台启动驱动时通过命令行参数传入.

这些配置信息, 在开发过程中可以根据实际情况进行调整. 但是在打包时必须按照平台的要求进行配置. 打包时的配置信息见 驱动配置说明.

平台配置信息

平台配置信息 主要为平台的连接信息. 包括: MQTT 连接信息, 驱动管理服务 连接信息. 内容如下:

{
"project": "所属项目ID",
"serviceId": "驱动实例ID",
"mq": {
"type": "mqtt",
"mqtt": {
"host": "平台mqtt服务器ip地址",
"port": "平台mqtt服务器端口"
}
},
"driver-grpc": {
"host": "驱动管理服务ip地址",
"port": "驱动管理服务端口"
},
"logger": {
"level": "debug"
},
"driver": {
"id": "驱动ID",
"name": "驱动名称"
}
}
info

相关服务的端口号可在运维管理系统中查看.

驱动配置信息

驱动配置信息 主要包括 驱动ID, 驱动名称, 驱动实例ID, 所属项目ID.

  • 驱动ID 为驱动的唯一标识, 必须在平台中唯一.
  • 驱动名称 为该驱动在平台中的显示名称.
  • 驱动实例ID 为该驱动实例的唯一标识. 同一个驱动可以创建多个实例, 每个实例的 驱动ID 相同但 驱动实例ID 唯一. 该信息由平台在 驱动管理 中创建驱动实例时生成.
  • 所属项目ID 每个驱动实例都属于一个项目, 该驱动实例只会拿到该项目中的模型和设备信息.
{
"project": "所属项目ID",
"serviceId": "驱动实例ID",
"driver": {
"id": "驱动ID",
"name": "驱动名称"
}
}
info
  1. 驱动ID驱动名称 需要在配置中手动定义.

  2. 驱动实例ID所属项目ID 在开发过程中, 需要将这些信息手动配置. 在打包时无须定义, 在平台中安装驱动时这些信息会由平台通过命令行参数传入.

驱动配置说明

以下是完整的驱动配置文件, 请参考该配置文件进行配置.

{
"project": "所属项目ID",
"serviceId": "驱动实例ID",
"mq": {
"type": "mqtt",
"mqtt": {
"host": "127.0.0.1",
"port": 1883
}
},
"driver-grpc": {
"host": "127.0.0.1",
"port": 9224
},
"logger": {
"level": "debug"
},
"driver": {
"id": "驱动ID",
"name": "驱动名称"
}
}

windows系统打包发布时的驱动配置

{
"mq": {
"type": "mqtt",
"mqtt": {
"host": "localhost",
"port": 1883
}
},
"driver-grpc": {
"host": "localhost",
"port": 9224
},
"driver": {
"id": "node-driver-mqtt-demo",
"name": "Node驱动例子"
}
}

linux系统打包发布时的驱动配置

{
"driver": {
"id": "node-driver-mqtt-demo",
"name": "Node驱动例子"
}
}