流程插件开发
本文将会详细介绍如何使用 Python SDK
开发流程插件. 示例项目目上传至 https://github.com/air-iot/sdk-python-examples/tree/master/flow_plugin.
介绍
流程插件
是扩展 流程引擎
中的节点的一种方式. 在流程引擎现有的功能不满足需求时, 可以通过开发流程插件来实现自定义的功能.
info
开发步骤
1. 创建项目
根据自己的习惯创建项目.
2. 引入SDK
info
流程插件二次开发相关内容都在 flow_plugin
子包内.
3. 实现流程插件接口
SDK
中定义了 流程插件接口
, 该接口是平台与插件交互的桥梁. 开发者需要实现这个接口.
接口定义及详细说明见 流程插件接口说明.
流程插件启动时, SDK
会连接平台的 流程引擎
服务, 并接收流程引擎发送的请求.
当流程执行到该插件对应的节点时, 会发送请求给该插件对应的程序. SDK
接收到请求后会调用对应的插件实现, 并将插件处理结果返回给流程引擎.
4. 配置插件
插件配置主要是插件与平台的连接配置.
flow-engine:
host: 192.168.11.101 # 流程引擎服务地址
port: 2333 # 流程引擎服务端口
connect-timeout: 15s # 连接超时
retry-interval: 30s # 重连间隔
heartbeat-interval: 30s # 心跳间隔
windows系统打包发布时的插件配置
flow-engine:
host: 127.0.0.1 # 流程引擎服务地址
port: 2333 # 流程引擎服务端口
linux系统打包发布时的插件配置
flow-engine:
host: flow-engine # 流程引擎服务地址
port: 2333 # 流程引擎服务端口
info
connect-timeout
、retry-interval
和 heartbeat-interval
通常情况下不需要修改.
5. 打包
流程插件打包就是将开发完成的程序打包为可以在平台部署的服务. 打包方式与 数据接入驱动开发-打包 中的打包方式基本一致, 但 service.yml
文件的内容稍有不同.
windows系统插件配置文件
# 必填项. 服务名称
Name: myPlugin
# 必填项. 例如: 1.0.0
Version: 1.0.0
# 非必填项.
Description: 插件描述信息
# 插件的配置文件名称, 平台在安装插件服务时会查找打包文件中查找该文件. 一般固定填写 config.yml
ConfigType: config.yml
# 必填项. 固定为 server
GroupName: server
# 必填项. 启动命令.
Command: python main.py
linux系统插件配置文件
# 必填项. 服务名称
Name: myPlugin
# 必填项. 例如: 1.0.0
Version: 1.0.0
# 非必填项.
Description: 插件描述信息
# 必填项. 固定为 server
GroupName: server
# 固定为 None
Service: None
6. 部署
流程插件
的部署方式与 数据接入驱动
不同, 每个流程插件都是一个独立的服务, 在整个平台中只有一个实例. 部署过程如下:
- 登录
运维管理系统
, 运维管理系统的默认登录地址为http://IP:13030/
, 将IP
换成平台地址即可. - 点击左侧菜单栏中的
服务管理
选项, 进入服务管理页面. - 点击页面右上角的
添加服务
按钮, 然后选择离线添加
. - 点击
上传
按扭, 选择刚刚打包好的流程插件压缩包, 然后点击提交
按钮.
info
如果 流程插件
部署失败, 可以在 运维管理系统
的首页中查看日志.
流程插件接口说明
class FlowPlugin:
"""
流程插件接口
"""
@abstractmethod
def get_name(self) -> str:
"""
插件名称
:return:
"""
pass
@abstractmethod
def get_type(self) -> FlowPluginType:
"""
插件类型
:return:
"""
pass
@abstractmethod
def on_connection_state_changed(self, state: bool):
"""
当与流程引擎的连接状态发生变化时调用
:param state: 变化后的连接状态
:return:
"""
pass
@abstractmethod
def start(self):
"""
当插件启动时调用.
该方法只会调用一次, 并且在 'on_connection_state_changed' 方法之前执行
:return:
"""
pass
@abstractmethod
def stop(self):
"""
当流程插件服务停止时执行的操作. 该方法只会调用一次
:return:
"""
pass
@abstractmethod
async def execute(self, request: FlowTask) -> FlowResult:
"""
执行流程插件
:param request: 请求信息
:return: 执行结果
:raise Exception: 执行失败时抛出异常
"""
pass
FlowTask 定义
class FlowTask:
"""
流程插件任务请求信息
"""
# 当前流程所属项目ID
projectId: str
# 当前流程ID
flowId: str
# 流程实例ID
job: str
# 当前节点ID
elementId: str
# 节点实例ID
elementJob: str
# 节点配置信息
config: bytes
FlowResult 定义
class FlowResult:
"""
流程插件执行结果
"""
# 说明信息
message: str
# 详细信息
details: str
# 执行结果数据
data: dict