流程扩展节点开发
本文将会详细介绍如何使用 Java SDK
扩展流程节点. 示例项目目上传至 https://github.com/air-iot/sdk-java-examples/tree/master/flow-extension-example.
介绍
流程扩展节点
是扩展 流程引擎
中的节点的一种方式. 在流程引擎现有的功能不满足需求时, 可以通过开发流程扩展节点来实现自定义的功能.
开发步骤
1. 创建项目
该过程同 流程插件开发-创建项目 中的创建项目过程一致.
2. 引入SDK
以 maven
为例, 在 pom.xml
中引入 sdk-flow-extension-starter
依赖.
<dependencies>
<dependency>
<groupId>io.github.air-iot</groupId>
<artifactId>sdk-flow-extension-starter</artifactId>
<version>4.x.x</version>
</dependency>
</dependencies>
3. Schema 定义
每个流程扩展节点需要提供一个 schema
定义, 该 schema
描述了当前扩展节点的输入参数定义. 详细说明点击查看. 示例如下:
{
"type": "object",
"properties": {
"num1": {
"title": "参数1",
"type": "number"
},
"num2": {
"title": "参数2",
"type": "number"
}
},
"required": [
"num1",
"num2"
]
}
4. 实现流程扩展节点接口
SDK
中定义了 流程扩展节点接口
, 该接口是平台与扩展节点交互的桥梁. 开发者需要实现这个接口, 并且将实现类注入到 spring
容器中.
接口定义及详细说明见 流程扩展节点接口说明.
流程扩展节点启动时, SDK
会连接平台的 流程引擎
服务, 并接收流程引擎发送的请求.
当流程调用该扩展节点时, 会发送请求给该扩展节点对应的程序. SDK
接收到请求后会调用对应的流程扩展节点实现, 并将处理结果返回给流程引擎.
一个程序中可以包含多个流程扩展节点. 但是每个流程扩展节点的ID和名称必须唯一.
5. 配置流程扩展节点
流程扩展节点配置主要是扩展节点与平台的连接配置.
flow-engine:
host: 192.168.11.101 # 流程引擎服务地址
port: 2333 # 流程引擎服务端口
connect-timeout: 15s # 连接超时, 可选. 默认: 15s
retry-interval: 30s # 重连间隔, 可选. 默认: 30s
heartbeat-interval: 30s # 心跳间隔, 可选. 默认: 30s
max-threads: 10 # 最大线程数, 可选. 默认: 0, 即取当前主机的CPU核数
windows系统打包发布时的流程扩展节点配置
flow-engine:
host: 127.0.0.1 # 流程引擎服务地址
port: 2333 # 流程引擎服务端口
linux系统打包发布时的流程扩展节点配置
flow-grpc:
host: flow-engine # 流程引擎服务地址
port: 2333 # 流程引擎服务端口
connect-timeout
、retry-interval
和heartbeat-interval
通常情况下不需要修改.- 每次请求都是异步执行的,
max-threads
用来设置异步执行的最大线程数. 如果不设置, 则取当前主机的CPU核数.
6. 打包
流程扩展节点打包就是将开发完成的程序打包为可以在平台部署的服务. 打包方式与 数据接入驱动开发-打包 中的打包方式基本一致.
windows系统流程扩展节点配置文件
# 必填项. 服务名称
Name: myFlowExtNode
# 必填项. 例如: 1.0.0
Version: 1.0.0
# 非必填项.
Description: 扩展节点描述信息
# 流程扩展节点的配置文件名称, 平台在安装流程扩展节点服务时会查找打包文件中查找该文件. 一般固定填写 application.yml
ConfigType: application.yml
# 必填项. 固定为 server
GroupName: server
# 必填项. 启动命令. 还可以添加一些启动参数, 例如: -Xms512m -Xmx1024m
Command: java -jar myFlowExtNode.jar
linux系统流程扩展节点配置文件
# 必填项. 服务名称
Name: myFlowExtNode
# 必填项. 例如: 1.0.0
Version: 1.0.0
# 非必填项.
Description: 扩展节点描述信息
# 必填项. 固定为 server
GroupName: server
# 固定为 None
Service: None
7. 部署
每个 流程扩展节点
都是一个独立的服务, 在整个平台中只有一个实例, 其部署方式与 流程插件
一致, 部署过程如下:
- 登录
运维管理系统
, 运维管理系统的默认登录地址为http://IP:13030/
, 将IP
换成平台地址即可. - 点击左侧菜单栏中的
服务管理
选项, 进入服务管理页面. - 点击页面右上角的
添加服务
按钮, 然后选择离线添加
. - 点击
上传
按扭, 选择刚刚打包好的流程扩展节点压缩包, 然后点击提交
按钮.
如果 流程扩展节点
部署失败, 可以在 运维管理系统
的首页中查看日志.
流程扩展节点接口说明
/**
* 流程扩展节点接口
*/
public interface FlowExtension<Request> {
/**
* 获取扩展节点标识
* <br>
* 节点标识在整个平台中必须唯一, 流程引擎根据流程扩展节点标识来区分不同的节点.
*
* @return 节点标识
*/
String getId();
/**
* 扩展节点名称
*
* @return 节点名称
*/
String getName();
/**
* 与流程引擎连接状态发生变化时触发
* <br>
* 注: 该方法在每次连接状态发生变化时都会被调用
*
* @param connected 当前与流程引擎的连接状态. {@code true} 连接已建立, {@code false} 连接已断开
*/
default void onConnectionStateChange(boolean connected) {
}
/**
* 当流程流程扩展节点服务启动时执行的操作
* <br>
* 可以在此方法中执行一些初始化操作, 如建立连接, 初始化对象等.
* <br>
* 注: 该方法只会调用一次, 并且在 {@link #onConnectionStateChange(boolean)} 之前执行
*/
default void onStart() {
}
/**
* 当流程流程扩展节点服务停止时执行的操作
* <br>
* 可以在此方法中执行一些清理操作, 如关闭连接等.
* <br>
* 注: 该方法只会调用一次.
*/
default void onStop() {
}
/**
* 获取扩展节点的 schema 定义
* <br>
* 如果抛出异常则视为请求执行失败. 否则视为请求执行成功.
*
* @return schema 定义信息
* @throws FlowExtensionException 如果请求执行失败
*/
String schema() throws FlowExtensionException;
/**
* 执行请求
*
* @param request 请求参数
* @return 请求执行结果, 必须为可以序列化为 JSON 的对象, 例如: Map, 自定义 Class 等.
* @throws FlowExtensionException 如果请求执行异常
*/
Object run(Request request) throws FlowExtensionException;
}
Request 泛型说明
FlowExtension
接口中的泛型 Request
为请求参数类型, 用于接收流程引擎发送的请求参数. 该泛型的类型由 schema
定义决定.
该泛型可以为 Map
或自定义类型, 示例如下:
/**
* 使用 Map 作为请求参数类型, 注意: Map 的 key 必须为 String.
*/
public class MyExtension1 implements FlowExtension<Map<String, Object>> {
// ...
}
public class MyRequest {
private int base;
private int pow;
// ...
}
/**
* 使用自定义类型作为请求参数类型
*/
public class MyExtension1 implements FlowExtension<MyRequest> {
// ...
}