Skip to main content

流程插件开发

本文将会详细介绍如何使用 Python SDK 开发流程插件. 示例项目目上传至 https://github.com/air-iot/sdk-python-examples/tree/master/flow_plugin.

介绍

流程插件 是扩展 流程引擎 中的节点的一种方式. 在流程引擎现有的功能不满足需求时, 可以通过开发流程插件来实现自定义的功能.

info

流程插件 只是扩展流程功能的方式之一. 除了 流程插件 扩展方式之外, 还可以开发一个独立的服务或与现有的服务集成. 例如: 在 数据接口 中添加被调用的目标服务, 然后在流程中使用 数据接口 节点调用该接口, 也可以实现对流程的扩展. 详细信息参考 HTTP数据接口数据库接口

开发步骤

1. 创建项目

根据自己的习惯创建项目.

2. 引入SDK

安装使用

info

流程插件二次开发相关内容都在 flow_plugin 子包内.

3. 实现流程插件接口

SDK 中定义了 流程插件接口, 该接口是平台与插件交互的桥梁. 开发者需要实现这个接口. 接口定义及详细说明见 流程插件接口说明.

流程插件启动时, SDK 会连接平台的 流程引擎 服务, 并接收流程引擎发送的请求. 当流程执行到该插件对应的节点时, 会发送请求给该插件对应的程序. SDK 接收到请求后会调用对应的插件实现, 并将插件处理结果返回给流程引擎.

4. 配置插件

插件配置主要是插件与平台的连接配置.

flow-engine:
host: 192.168.11.101 # 流程引擎服务地址
port: 2333 # 流程引擎服务端口
connect-timeout: 15s # 连接超时
retry-interval: 30s # 重连间隔
heartbeat-interval: 30s # 心跳间隔

windows系统打包发布时的插件配置

flow-engine:
host: 127.0.0.1 # 流程引擎服务地址
port: 2333 # 流程引擎服务端口

linux系统打包发布时的插件配置

flow-engine:
host: flow-engine # 流程引擎服务地址
port: 2333 # 流程引擎服务端口
info

connect-timeoutretry-intervalheartbeat-interval 通常情况下不需要修改.

5. 打包

流程插件打包就是将开发完成的程序打包为可以在平台部署的服务. 打包方式与 数据接入驱动开发-打包 中的打包方式基本一致, 但 service.yml 文件的内容稍有不同.

windows系统插件配置文件

# 必填项. 服务名称
Name: myPlugin
# 必填项. 例如: 1.0.0
Version: 1.0.0
# 非必填项.
Description: 插件描述信息
# 插件的配置文件名称, 平台在安装插件服务时会查找打包文件中查找该文件. 一般固定填写 config.yml
ConfigType: config.yml
# 必填项. 固定为 server
GroupName: server
# 必填项. 启动命令.
Command: python main.py

linux系统插件配置文件

# 必填项. 服务名称
Name: myPlugin
# 必填项. 例如: 1.0.0
Version: 1.0.0
# 非必填项.
Description: 插件描述信息
# 必填项. 固定为 server
GroupName: server
# 固定为 None
Service: None

6. 部署

流程插件 的部署方式与 数据接入驱动 不同, 每个流程插件都是一个独立的服务, 在整个平台中只有一个实例. 部署过程如下:

  1. 登录 运维管理系统, 运维管理系统的默认登录地址为 http://IP:13030/, 将 IP 换成平台地址即可.
  2. 点击左侧菜单栏中的 服务管理 选项, 进入服务管理页面.
  3. 点击页面右上角的 添加服务 按钮, 然后选择 离线添加.
  4. 点击 上传 按扭, 选择刚刚打包好的流程插件压缩包, 然后点击 提交 按钮.

流程插件部署

info

如果 流程插件 部署失败, 可以在 运维管理系统 的首页中查看日志.

流程插件接口说明

class FlowPlugin:
"""
流程插件接口
"""

@abstractmethod
def get_name(self) -> str:
"""
插件名称
:return:
"""
pass

@abstractmethod
def get_type(self) -> FlowPluginType:
"""
插件类型
:return:
"""
pass

@abstractmethod
def on_connection_state_changed(self, state: bool):
"""
当与流程引擎的连接状态发生变化时调用
:param state: 变化后的连接状态
:return:
"""
pass

@abstractmethod
def start(self):
"""
当插件启动时调用.
该方法只会调用一次, 并且在 'on_connection_state_changed' 方法之前执行
:return:
"""
pass

@abstractmethod
def stop(self):
"""
当流程插件服务停止时执行的操作. 该方法只会调用一次
:return:
"""
pass

@abstractmethod
async def execute(self, request: FlowTask) -> FlowResult:
"""
执行流程插件
:param request: 请求信息
:return: 执行结果
:raise Exception: 执行失败时抛出异常
"""
pass

FlowTask 定义

class FlowTask:
"""
流程插件任务请求信息
"""

# 当前流程所属项目ID
projectId: str
# 当前流程ID
flowId: str
# 流程实例ID
job: str
# 当前节点ID
elementId: str
# 节点实例ID
elementJob: str
# 节点配置信息
config: bytes

FlowResult 定义

class FlowResult:
"""
流程插件执行结果
"""
# 说明信息
message: str
# 详细信息
details: str
# 执行结果数据
data: dict