Skip to main content

算法服务开发

本文将会详细介绍如何使用 Python SDK 开发算法服务, 实现算法集成. 示例项目目上传至 https://github.com/air-iot/sdk-python-examples/tree/master/algorithm.

介绍

算法服务 是扩展或将已有 算法 集成到平台的一种方式. 在平台现有的功能不满足需求时, 可以通过开发算法服务来实现自定义的功能.

开发步骤

1. 创建项目

根据自己的习惯创建项目.

2. 引入SDK

安装使用

info

算法服务二次开发相关内容都在 algorithm 子包内.

3. 定义 schema

schema 定义描述了该算法程序支持哪些算法函数, 以及每个算法函数的输入和输出参数定义. 每个算法程序可以包含多个算法函数, 每个算法函数的名称必须唯一, 详细说明参考算法schema说明. 示例如下:

[
{
"title": "函数1-加法",
"function": "add",
"input": {
"type": "object",
"properties": {
"num1": {
"title": "参数1",
"type": "number"
},
"num2": {
"title": "参数2",
"type": "number"
}
},
"required": [
"num1",
"num2"
]
},
"output": {
"type": "object",
"properties": {
"num1": {
"title": "结果",
"type": "number"
}
}
}
},
{
"title": "函数2-绝对值",
"function": "abs",
"input": {
"type": "object",
"properties": {
"num1": {
"title": "参数1",
"type": "number"
}
},
"required": [
"num1"
]
},
"output": {
"type": "object",
"properties": {
"res": {
"title": "结果",
"type": "number"
}
}
}
},
{
"title": "函数3-获取当前系统时间",
"function": "now",
"input": {
"type": "object",
"properties": {},
"required": []
},
"output": {
"type": "object",
"properties": {
"sysdate": {
"title": "当前系统时间",
"type": "string"
}
}
}
},
{
"title": "函数4-接收Map",
"function": "recvMap",
"input": {
"type": "object",
"properties": {
"name": {
"title": "姓名",
"type": "string"
}
},
"required": ["name"]
},
"output": {
"type": "object",
"properties": {
"result": {
"title": "输出结果",
"type": "string"
}
}
}
},
{
"title": "函数4-接收字符串",
"function": "recvString",
"input": {
"type": "object",
"properties": {
"name": {
"title": "姓名",
"type": "string"
}
},
"required": ["name"]
},
"output": {
"type": "object",
"properties": {
"result": {
"title": "输出结果",
"type": "string"
}
}
}
}
]

4. 实现算法接口

SDK 中定义了 算法服务接口, 开发者需要实现这个接口.

class AlgorithmApp:
"""
算法应用程序基类, 通过继承该类可以实现一个算法应用程序.
"""

@abstractmethod
def id(self) -> str:
"""
算法应用程序ID
:return:
"""
pass

@abstractmethod
def name(self) -> str:
"""
算法应用程序名称
:return:
"""

@abstractmethod
def start(self):
"""
算法应用程序启动时执行的方法. 可以在此方法中进行一些初始化操作.
:return:
"""
pass

@abstractmethod
def stop(self):
"""
算法应用程序停止时执行的方法. 可以在此方法中进行一些清理操作.
:return:
"""
pass

@abstractmethod
async def schema(self) -> str:
"""
算法应用程序的 schema 定义.
:return:
"""
pass

@abstractmethod
async def run(self, project_id: str, function: str, params: dict[str, any]) -> [dict[str, any] | object]:
"""
算法执行方法. 该方法会在算法执行请求到达时被调用.
:param project_id: 请求执行算法的项目ID
:param function: 请求执行的算法函数名
:param params: 请求执行的算法参数
:return: 算法执行结果
"""
pass

@algorithm_function 注解

SDK 提供了 @algorithm_function 注解, 用于标识实现类中的方法对应 schema 的算法函数. 该注解只能用在方法上. 使用该注解修饰的方法, 必须满足以下条件:

  • 必须为 async 异步方法.
  • 必须带有 2 个参数
  • 第 1 个参数必须为 str 类型, 用于接收发起请求的项目ID
  • 第 2 个参数必须为 dict 类型, 用于接收请求参数
  • 该函数的返回值的类型必须是 dict

算法调用请求执行流程

  1. 当平台调用该算法服务时, SDK 会解析请求中的 function 信息.
  2. 根据 function 的值, 在实现类中查找是否有使用 @algorithm_function("算法函数名") 注解修饰的方法. 如果找到了, 则会调用该方法, 并将方法的返回值作为算法执行结果返回给平台.
  3. 如果没有找到使用 @algorithm_function("算法函数名") 修饰的方法时, 则会调用 run 方法, 并将 function 参数的值作为 run 方法的第二个参数传入, 由开发者根据 function 参数值执行对应的算法逻辑. 最后将 run 方法的返回值作为算法执行结果返回给平台.

示例

import datetime

from airiot_python_sdk.algorithm import AlgorithmApp, algorithm_function

class MyAlgorithmApp(AlgorithmApp):
"""
自定义算法应用程序
"""

def id(self) -> str:
return "PythonAlgorithmSDKExample"

def name(self) -> str:
return "Python算法SDK示例程序"

def start(self):
logger.info("Python算法示例程序已启动")

def stop(self):
logger.info("Python算法示例程序已停止")

async def schema(self) -> str:
with open('schema.js', 'r', encoding="utf-8") as f:
return f.read()

async def run(self, project_id: str, function: str, params: dict[str, any]) -> [dict[str, any] | object]:
if function == "pow":
return {"result": params["num1"] ** params["num2"]}
rase Exception(f"不支持的算法函数 '{function}'")

@algorithm_function(name="add")
async def add(self, project_id: str, params: dict[str, any]) -> [dict[str, any] | object]:
num1 = params["num1"]
num2 = params["num2"]
return {"num1": num1 + num2}

@algorithm_function(name="abs")
async def abs(self, project_id: str, params: dict[str, any]) -> [dict[str, any] | object]:
num = params["num1"]
return {"num": abs(num)}

@algorithm_function(name="now")
async def now(self, project_id: str, params: dict[str, any]) -> [dict[str, any] | object]:
return {"sysdate": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}

5. 算法配置信息

算法配置信息定义了算法的基本信息以及平台算法服务的连接信息. 整体配置如下:

algorithm:
id: 自定义算法标识 # 必填
name: 自定义算法名称 # 必填
max-threads: 10 # 可选. 最大线程数, 默认: 0, 表示取 CPU 核心数
algorithm-grpc: # 平台算法服务连接配置
host: localhost # 必填. 算法服务地址
port: 9236 # 必填. 算法服务端口, 默认: 9236

6. 打包

具体打包步骤请参考 流程插件开发-打包 中的步骤.

windows系统打包发布时的算法配置

algorithm:
id: 自定义算法标识
name: 自定义算法名称
max-threads: 10
algorithm-grpc:
host: 127.0.0.1
port: 9236

linux系统打包发布时的算法配置

algorithm:
id: 自定义算法标识
name: 自定义算法名称
max-threads: 10
algorithm-grpc:
host: algorithm
port: 9236

7. 部署

具体部署步骤请参考 流程插件开发-部署 中的步骤.