python工作流引擎_工作流,活动图和Python协程(一)
UML里面大家用得最多的是类图和序列图,比较少用到活动图(activity diagram)。其实活动图在某些业务场景下也是简单实用的,它相比常规的流程图主要就多一个fork/merge原语,可以说是描述工作流(任务串行/并行,任务依赖)的最简单直接的方式。
Rather, the activity diagram combines ideas from several techniques: the event diagrams of Jim Odell, SDL state modeling techniques, workflow modeling, and Petri nets. These diagrams are particularly useful in connection with workflow and in describing behavior that has a lot of parallel processing.
-- Martin Fowler, UML Distilled
工作流描述语言除了活动图之外,还有BPML等,此不赘述。
不少通用语言都有专门的工作流框架和引擎,通常以功能全面、可靠性强还有吞吐量大为特色。但这类工具往往技术细节多,入门复杂,对于小型任务更多是overkill。另外,Makefile其实也是一个工作流引擎,不过已经脱离了语言运行环境,使用不方便。
自Python 3引入async/await关键词和IO异步协程支持以来,在Python里面定义工作流可以变得非常简单,几乎接近built-in feature。更有启发的是,在扩展工作流特性的时候,我们发现async/await不仅是单纯的语法糖,更本质地是提供了一种不同的语言解释方式。
简易的工作流建模
这里借Martin Fowler的UML Distilled第九章中的几幅插图来演示一下。以该章节的代表性的工作流为例:原书图9.1简化版
该图中主要有以下几种通用元素:开始/结束
单个任务
箭头,表示任务执行的先后顺序(或表示任务的依赖关系)
分叉/合并(fork/merge)
不妨假设这些单个任务在语义上都是IO-bound并支持asyncio接口,我们可以考虑对这些元素按如下建模:
import asyncio as aio
info = print # or better, info = logger.info
async def seq(*coros):
results = []
for coro in coros:
results.append(await coro)
return tuple(results)
async def fork_and_merge(*coros):
return await aio.gather(*coros)
async def task(todo):
info(f"Doing task:{todo}")
await aio.sleep(1)
return todo
其中:seq代表顺序执行;
fork_and_merge代表并行,以及同步并行任务的结果集合;
task代表一般的单个任务;
于是我们可以这样描述该图的工作流定义:
async def flow():
await task('Receive Order')
await fork_and_merge(
seq(task('Send Invoice'),
task('Receive Payment')),
seq(task('Fill Order'),
task('Regular Delivery'))
)
await task('Close Order')
(可以看到,代码结构准确地反映了图中的结构。)
有了工作流定义,我们随时可以构建一个工作流的实例并执行之:
loop = aio.get_event_loop()
loop.run_until_complete(flow())
# output
Doing task: Receive Order
Doing task: Send Invoice
Doing task: Fill Order
Doing task: Receive Payment
Doing task: Regular Delivery
Doing task: Close Order
于是我们有了一套简易的工作流语言,并基于event loop的工作模式天然地得到了并发支持。
任务结果和上下文
虽然我们得到了无比简明的工作流模型,但是该工作流并不支持动态控制流。事实上,Martin Fowler原书里的图9.1是如下这样:原书图9.1
也即,Fill Order可以返回两种结果,而下游工作流则依赖于该结果。于是,我们需要想办法把这个控制逻辑也建模到工作流里面去。
利用协程的可组合性(看向函数式编程)上述工作流可以等效地描述为:原书图9.3
于是把局部的工作流隔离出来,利用Python自带的if-else结构,即可根据任务的结果选择下游任务:
async def process_order_subflow():
order = await get_filled_order()
if order.is_rush:
return await task('Overnight Delivery')
else:
return await task('Regular Delivery')
async def flow():
await task('Receive Order')
await fork_and_merge(
seq(task('Send Invoice'),
task('Receive Payment')),
process_order_subflow())
await task('Close Order')
我们不仅隔离了局部工作流的逻辑,使得它甚至可以单独测试和使用,更好的是我们还隔离了运行环境,比如order这个变量只在这个局部工作流范畴内创建/销毁,而不会泄露到外部运行环境中。这是个很舒服的性质。附:假设项目代码业务模型Order相关功能定义如下:
from dataclasses import dataclass
@dataclass
class Order:
is_rush : bool
# maybe define other properties
async def get_filled_order():
await aio.sleep(2) # simulate user interaction
return Order(bool(random.randint(0, 1)))
实例级别的隐式上下文
上述工作流的实例仅仅反映了动态图中描述的抽象结构,并未考虑到实例参数化问题。设想一个很简单的场景,我们对每一个申请交互的用户创建一个可辨识的工作流实例。于是我们需要给flow声明一个参数,传入一个用户名作为上下文。这时我们希望:该工作流实例的每个子工作流/子任务都可以访问这个上下文;
但我们不希望修改这些子工作流/子任务的签名,并总是自顶向下地显式传入这个上下文对象,这样带来大量重复代码;
借助Python 3.7引入的新魔法上下文变量ContextVar,我们可以从子任务里访问抽象的外部上下文,却能获取实例化的生下文。为此我们作以下微小的改动:
# add parameter `user`, which makes better sense in general
async def get_filled_order(user):
info(f'Retrieving order for{user}')
await aio.sleep(2) # simulate user interaction
return Order(bool(random.randint(0, 1)))
# ...
from contextvars import ContextVar
# declare context var external to the tasks
user_ctx_var = ContextVar('user')
async def task(todo):
# add logging of context var
info(f"Doing task:{todo}({user_ctx_var.get()})")
await aio.sleep(1)
return todo
async def process_order_subflow():
order = await get_filled_order(user_ctx_var.get())
if order.is_rush:
return await task('Overnight Delivery')
else:
return await task('Regular Delivery')
async def flow(user: str):
user_ctx_var.set(user)
await task('Receive Order')
await fork_and_merge(
seq(task('Send Invoice'), task('Receive Payment')),
process_order_subflow())
await task('Close Order')
基于此设定,我们最大化保留代码的原貌,却使得它们能自动适应实例上下文。
运行以下代码可以观察到结果——不同的flow实例,子任务获取到了不同的上下文:
async def main():
await aio.gather(
flow('Sam'),
flow('Joe'),
flow('Alice'),
)
loop.run_until_complete(main())
# output
Doing task: Receive Order (Sam)
Doing task: Receive Order (Joe)
Doing task: Receive Order (Alice)
Doing task: Send Invoice (Sam)
Retrieving order for Sam
Doing task: Send Invoice (Joe)
Retrieving order for Joe
Doing task: Send Invoice (Alice)
Retrieving order for Alice
Doing task: Receive Payment (Sam)
Doing task: Receive Payment (Joe)
Doing task: Receive Payment (Alice)
Doing task: Overnight Delivery (Sam)
Doing task: Overnight Delivery (Joe)
Doing task: Regular Delivery (Alice)
Doing task: Close Order (Sam)
Doing task: Close Order (Joe)
Doing task: Close Order (Alice)
分叉的可选性以及递归
UML Distilled里还列出了这幅图:原书图9.2等效图
该图告诉我们:分叉子进程的存在性是可以依赖于上下文
分叉结构可以递归存在
为支持第一条,我们可以调整fork_and_merge方法,过滤掉None值(代表被上下文决定丢弃的任务),并应用之:
# ...
async def fork_and_merge(*coros):
results = await aio.gather(
*filter(lambda x: x is not None, coros))
return results
# ...
def in_mood_for_wine(table):
good_mood = len(table) & 1 == 0 # simulate random mood
info(f'In good mood:{good_mood}')
return good_mood
async def service(todo):
info(f'Service doing:{todo}')
await aio.sleep(random() * 3)
return todo
async def serve_meal(table):
return await (
fork_and_merge(
seq(
fork_and_merge(
service('Cook Spaghetti'),
service('Mix Carbonara Sauce')),
service('Combine')
),
# optionality depending on context
service('Open Red Wine') if in_mood_for_wine(table) else None)
)
loop = aio.get_event_loop()
loop.run_until_complete(serve_meal('No. 42'))
至于分叉的递归,已由协程的可组合性所自然保证。但这里衍生出一个问题,即分叉的总数并不受控制,很可能几个工作流实例就创建了大量的分叉。
就本例而言,假如餐厅里的服务生数量有限,每个service实例必须由一个服务生来运行,那我们如何表述这种限制?
更进一步,如果有两类服务提供者(厨师+服务生),其中厨师工作方式是blocking的,而服务生工作方式是non-blocking的,又该如何以最简单的方式表述?
(未完待续
python工作流引擎_工作流,活动图和Python协程(一)相关推荐
- 一张图了解python基本语法_一张图认识Python(附基本语法总结)
一张图带你了解Python,更快入门, 一张图认识Python(附基本语法总结) Python基础语法总结: 1.Python标识符 在 Python 里,标识符有字母.数字.下划线组成. 在 Pyt ...
- 阿里工作流引擎_免费开源,一款快速开发模块化脚手架,含工作流引擎
简介 lenosp 一款快速开发模块化脚手架,采用 spring boot 2.0.1.spring.SpringMvc.mybatis.shiro.activiti 工作流.swagger.ehca ...
- spring 工作流引擎_带Spring的简单工作流引擎
spring 工作流引擎 几个月前,在处理一个公司项目时,我们需要开发REST服务,该服务用于根据客户端应用程序发送的数据发送电子邮件. 在开发此服务期间,我们决定创建简单的工作流引擎,该引擎将为发送 ...
- F2工作流引擎之 工作流运转模型(三)
1流程单起点单终止模型 单起点:一个流程定义必须有且唯一起点 单结束点:一个流程定义必须有且唯一结束点. 约定:提单与结束是每个流程必须有的活动,且唯一只有一个提单和结束. 2串行模型 描述:串行(S ...
- 用python画皇冠_手把手教你用 Python 绘制酷炫的桑基图!
原标题:手把手教你用 Python 绘制酷炫的桑基图! 作者 | 周志鹏 责编 | 郭 芮 最近,不止一次收到小伙伴的截图追问: "这个图叫什么???" "这个图真好看! ...
- python 推理引擎_【技术文档】OpenVINO推理引擎示例
推理引擎示例 推理引擎示例应用是简单的控制台应用,显示了如何在应用中利用特定的推理引擎功能,帮助开发人员执行特定的任务,例如加载模型.运行推理.查询特定的设备功能等. 安装英特尔®OpenVINO™工 ...
- python 两点曲线_十行代码,用Python做一个迷你版的美图秀秀
美图秀秀相信大家都不陌生,大家只要操作美图秀秀,就可以P掉图片中脸上的一些瑕疵,让人变得更加的美丽.今天小编就带领大家来借助Python和Flask来实现一个美图秀秀的网页设计,大家只需要通过网页上传 ...
- python画图宽度_手把手教你用 Python 绘制酷炫的桑基图!
作者 | 周志鹏 责编 | 郭 芮 最近,不止一次收到小伙伴的截图追问: "这个图叫什么???" "这个图真好看!!!怎么画啊?" ...... 笔者本没有 ...
- 天津python招聘信息网_【天津-滨海新区Python招聘_最新热搜天津-滨海新区Python人才招聘信息】-前程无忧...
学历要求:硕士|工作经验:3-4年|公司性质:上市公司|公司规模:150-500人 专业:计算机.自动化.软件工程.图形图像学或相关专业 语言:c c++ c# java 数据库 算法 工作经验:3年 ...
最新文章
- R语言构建ElasticNet回归模型实战:基于mtcars数据集
- 物理光学11 衍射的基本概念与惠更斯原理
- BZOJ 1012: [JSOI2008]最大数maxnumber 单调队列/线段树/树状数组/乱搞
- 七 内置锁 wait notify notifyall; 显示锁 ReentrantLock
- Watermaker水位线/水印
- Harmony OS — RadioButton RadioContainer单选按钮单选按钮组
- jQuery新的事件绑定机制on()
- scrapy爬虫—获取script中的data数据
- 【原】115网盘下载地址解析工具(暂停更新)
- 音视频是怎样实现传输的
- Android 集成支付宝支付
- docker用现有容器创建镜像
- 计算机网络与Netty - F2F
- 那个幻想奇特的诗意女孩
- 数字电路复位电路解析
- NUC970 SD卡驱动(SDIO)
- 查找linux内核漏洞查用的方法脚本
- vue引入鼠标点击效果
- 【Popper报错】Popper: modifier “undefined“ provided an invalid “fn“ property
- codeforce 555 div3 题解报告