Skip to main content

流程扩展节点开发

本文将会详细介绍如何使用 Python SDK 扩展流程节点. 示例项目目上传至 https://github.com/air-iot/sdk-python-examples/tree/master/flow_extension.

介绍

流程扩展节点 是扩展 流程引擎 中的节点的一种方式. 在流程引擎现有的功能不满足需求时, 可以通过开发流程扩展节点来实现自定义的功能.

info

流程扩展节点 只是扩展流程功能的方式之一. 除了 流程扩展节点 扩展方式之外, 还可以开发一个独立的服务或与现有的服务集成. 例如: 在 数据接口 中添加被调用的目标服务, 然后在流程中使用 数据接口 节点调用该接口, 也可以实现对流程的扩展. 详细信息参考 HTTP数据接口数据库接口

开发步骤

1. 创建项目

根据自己的习惯创建项目.

2. 引入SDK

1. 在项目中引入依赖

安装使用

info

流程扩展节点二次开发相关内容都在 flow_extension 子包内.

3. Schema 定义

每个流程扩展节点需要提供一个 schema 定义, 该 schema 描述了当前扩展节点的输入参数定义. 详细说明点击查看. 示例如下:

{
"type": "object",
"properties": {
"num1": {
"title": "参数1",
"type": "number"
},
"num2": {
"title": "参数2",
"type": "number"
}
},
"required": [
"num1",
"num2"
]
}

4. 实现流程扩展节点接口

SDK 中定义了 流程扩展节点接口, 该接口是平台与扩展节点交互的桥梁. 开发者需要实现这个接口.

流程扩展节点启动时, SDK 会连接平台的 流程引擎 服务, 并接收流程引擎发送的请求. 当流程调用该扩展节点时, 会发送请求给该扩展节点对应的程序. SDK 接收到请求后会调用对应的流程扩展节点实现, 并将处理结果返回给流程引擎.

class FlowExtension:
"""
流程扩展节点接口, 该接口定义了流程扩展节点的基本行为
"""

def id(self) -> str:
"""
流程扩展节点标识. 要求该标识在平台中唯一
:return: 节点标识
"""
pass

def name(self) -> str:
"""
流程扩展节点名称
:return: 节点名称
"""
pass

def start(self):
"""
流程扩展节点启动时执行的动作, 该方法会在节点启动时被调用, 可以在该方法中执行一些初始化操作
:return:
"""
pass

def stop(self):
""""
流程扩展节点停止时执行的动作, 该方法会在节点停止时被调用, 可以在该方法中执行一些清理操作
"""
pass

async def schema(self) -> str:
"""
获取流程扩展节点的 schema, 该方法会在节点被调用时被调用, 该方法必须是异步方法
:return: schema 内容
"""
pass

async def run(self, params: dict) -> dict:
"""
处理流程扩展节点的请求, 该方法会在节点被调用时被调用, 该方法必须是异步方法
:param params: 请求参数. 参数内容由 schema 定义
:return: 执行结果
"""
pass

5. 配置流程扩展节点

流程扩展节点配置主要是扩展节点与平台的连接配置.

flow-engine:
host: 192.168.11.101 # 流程引擎服务地址
port: 2333 # 流程引擎服务端口
connect-timeout: 15s # 连接超时, 可选. 默认: 15s
retry-interval: 30s # 重连间隔, 可选. 默认: 30s
heartbeat-interval: 30s # 心跳间隔, 可选. 默认: 30s
max-threads: 10 # 最大线程数, 可选. 默认: 0, 即取当前主机的CPU核数

windows系统打包发布时的流程扩展节点配置

flow-engine:
host: 127.0.0.1 # 流程引擎服务地址
port: 2333 # 流程引擎服务端口

linux系统打包发布时的流程扩展节点配置

flow-grpc:
host: flow-engine # 流程引擎服务地址
port: 2333 # 流程引擎服务端口
info
  • connect-timeoutretry-intervalheartbeat-interval 通常情况下不需要修改.
  • 每次请求都是异步执行的, max-threads 用来设置异步执行的最大线程数. 如果不设置, 则取当前主机的CPU核数.

6. 打包

流程扩展节点打包就是将开发完成的程序打包为可以在平台部署的服务. 打包方式与 数据接入驱动开发-打包 中的打包方式基本一致.

windows系统流程扩展节点配置文件

# 必填项. 服务名称
Name: myFlowExtNode
# 必填项. 例如: 1.0.0
Version: 1.0.0
# 非必填项.
Description: 扩展节点描述信息
# 流程扩展节点的配置文件名称, 平台在安装流程扩展节点服务时会查找打包文件中查找该文件. 一般固定填写 application.yml
ConfigType: application.yml
# 必填项. 固定为 server
GroupName: server
# 必填项. 启动命令. 还可以添加一些启动参数, 例如: -Xms512m -Xmx1024m
Command: python main.py

linux系统流程扩展节点配置文件

# 必填项. 服务名称
Name: myFlowExtNode
# 必填项. 例如: 1.0.0
Version: 1.0.0
# 非必填项.
Description: 扩展节点描述信息
# 必填项. 固定为 server
GroupName: server
# 固定为 None
Service: None

7. 部署

每个 流程扩展节点 都是一个独立的服务, 在整个平台中只有一个实例, 其部署方式与 流程插件 一致, 部署过程如下:

  1. 登录 运维管理系统, 运维管理系统的默认登录地址为 http://IP:13030/, 将 IP 换成平台地址即可.
  2. 点击左侧菜单栏中的 服务管理 选项, 进入服务管理页面.
  3. 点击页面右上角的 添加服务 按钮, 然后选择 离线添加.
  4. 点击 上传 按扭, 选择刚刚打包好的流程扩展节点压缩包, 然后点击 提交 按钮.

流程扩展节点部署

info

如果 流程扩展节点 部署失败, 可以在 运维管理系统 的首页中查看日志.