算法服务开发
本文将会详细介绍如何使用 Python SDK
开发算法服务, 实现算法集成. 示例项目目上传至 https://github.com/air-iot/sdk-python-examples/tree/master/algorithm.
介绍
算法服务
是扩展或将已有 算法
集成到平台的一种方式. 在平台现有的功能不满足需求时, 可以通过开发算法服务来实现自定义的功能.
开发步骤
1. 创建项目
根据自己的习惯创建项目.
2. 引入SDK
info
算法服务二次开发相关内容都在 algorithm
子包内.
3. 定义 schema
schema
定义描述了该算法程序支持哪些算法函数, 以及每个算法函数的输入和输出参数定义. 每个算法程序可以包含多个算法函数, 每个算法函数的名称必须唯一, 详细说明参考算法schema说明. 示例如下:
[
{
"title": "函数1-加法",
"function": "add",
"input": {
"type": "object",
"properties": {
"num1": {
"title": "参数1",
"type": "number"
},
"num2": {
"title": "参数2",
"type": "number"
}
},
"required": [
"num1",
"num2"
]
},
"output": {
"type": "object",
"properties": {
"num1": {
"title": "结果",
"type": "number"
}
}
}
},
{
"title": "函数2-绝对值",
"function": "abs",
"input": {
"type": "object",
"properties": {
"num1": {
"title": "参数1",
"type": "number"
}
},
"required": [
"num1"
]
},
"output": {
"type": "object",
"properties": {
"res": {
"title": "结果",
"type": "number"
}
}
}
},
{
"title": "函数3-获取当前系统时间",
"function": "now",
"input": {
"type": "object",
"properties": {},
"required": []
},
"output": {
"type": "object",
"properties": {
"sysdate": {
"title": "当前系统时间",
"type": "string"
}
}
}
},
{
"title": "函数4-接收Map",
"function": "recvMap",
"input": {
"type": "object",
"properties": {
"name": {
"title": "姓名",
"type": "string"
}
},
"required": ["name"]
},
"output": {
"type": "object",
"properties": {
"result": {
"title": "输出结果",
"type": "string"
}
}
}
},
{
"title": "函数4-接收字符串",
"function": "recvString",
"input": {
"type": "object",
"properties": {
"name": {
"title": "姓名",
"type": "string"
}
},
"required": ["name"]
},
"output": {
"type": "object",
"properties": {
"result": {
"title": "输出结果",
"type": "string"
}
}
}
}
]
4. 实现算法接口
SDK
中定义了 算法服务接口
, 开发者需要实现这个接口.
class AlgorithmApp:
"""
算法应用程序基类, 通过继承该类可以实现一个算法应用程序.
"""
@abstractmethod
def id(self) -> str:
"""
算法应用程序ID
:return:
"""
pass
@abstractmethod
def name(self) -> str:
"""
算法应用程序名称
:return:
"""
@abstractmethod
def start(self):
"""
算法应用程序启动时执行的方法. 可以在此方法中进行一些初始化操作.
:return:
"""
pass
@abstractmethod
def stop(self):
"""
算法应用程序停止时执行的方法. 可以在此方法中进行一些清理操作.
:return:
"""
pass
@abstractmethod
async def schema(self) -> str:
"""
算法应用程序的 schema 定义.
:return:
"""
pass
@abstractmethod
async def run(self, project_id: str, function: str, params: dict[str, any]) -> [dict[str, any] | object]:
"""
算法执行方法. 该方法会在算法执行请求到达时被调用.
:param project_id: 请求执行算法的项目ID
:param function: 请求执行的算法函数名
:param params: 请求执行的算法参数
:return: 算法执行结果
"""
pass
@algorithm_function 注解
SDK
提供了 @algorithm_function
注解, 用于标识实现类中的方法对应 schema 的算法函数. 该注解只能用在方法上.
使用该注解修饰的方法, 必须满足以下条件:
- 必须为
async
异步方法. - 必须带有 2 个参数
- 第 1 个参数必须为
str
类型, 用于接收发起请求的项目ID - 第 2 个参数必须为
dict
类型, 用于接收请求参数 - 该函数的返回值的类型必须是
dict
算法调用请求执行流程
- 当平台调用该算法服务时,
SDK
会解析请求中的function
信息. - 根据
function
的值, 在实现类中查找是否有使用@algorithm_function("算法函数名")
注解修饰的方法. 如果找到了, 则会调用该方法, 并将方法的返回值作为算法执行结果返回给平台. - 如果没有找到使用
@algorithm_function("算法函数名")
修饰的方法时, 则会调用run
方法, 并将function
参数的值作为run
方法的第二个参数传入, 由开发者根据function
参数值执行对应的算法逻辑. 最后将run
方法的返回值作为算法执行结果返回给平台.
示例
import datetime
from airiot_python_sdk.algorithm import AlgorithmApp, algorithm_function
class MyAlgorithmApp(AlgorithmApp):
"""
自定义算法应用程序
"""
def id(self) -> str:
return "PythonAlgorithmSDKExample"
def name(self) -> str:
return "Python算法SDK示例程序"
def start(self):
logger.info("Python算法示例程序已启动")
def stop(self):
logger.info("Python算法示例程序已停止")
async def schema(self) -> str:
with open('schema.js', 'r', encoding="utf-8") as f:
return f.read()
async def run(self, project_id: str, function: str, params: dict[str, any]) -> [dict[str, any] | object]:
if function == "pow":
return {"result": params["num1"] ** params["num2"]}
rase Exception(f"不支持的算法函数 '{function}'")
@algorithm_function(name="add")
async def add(self, project_id: str, params: dict[str, any]) -> [dict[str, any] | object]:
num1 = params["num1"]
num2 = params["num2"]
return {"num1": num1 + num2}
@algorithm_function(name="abs")
async def abs(self, project_id: str, params: dict[str, any]) -> [dict[str, any] | object]:
num = params["num1"]
return {"num": abs(num)}
@algorithm_function(name="now")
async def now(self, project_id: str, params: dict[str, any]) -> [dict[str, any] | object]:
return {"sysdate": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
5. 算法配置信息
算法配置信息定义了算法的基本信息以及平台算法服务的连接信息. 整体配置如下:
algorithm:
id: 自定义算法标识 # 必填
name: 自定义算法名称 # 必填
max-threads: 10 # 可选. 最大线程数, 默认: 0, 表示取 CPU 核心数
algorithm-grpc: # 平台算法服务连接配置
host: localhost # 必填. 算法服务地址
port: 9236 # 必填. 算法服务端口, 默认: 9236
6. 打包
具体打包步骤请参考 流程插件开发-打包 中的步骤.
windows系统打包发布时的算法配置
algorithm:
id: 自定义算法标识
name: 自定义算法名称
max-threads: 10
algorithm-grpc:
host: 127.0.0.1
port: 9236
linux系统打包发布时的算法配置
algorithm:
id: 自定义算法标识
name: 自定义算法名称
max-threads: 10
algorithm-grpc:
host: algorithm
port: 9236
7. 部署
具体部署步骤请参考 流程插件开发-部署 中的步骤.