UML里面大家用得最多的是类图和序列图,比较少用到活动图(activity diagram)。其实活动图在某些业务场景下也是简单实用的,它相比常规的流程图主要就多一个fork/merge原语,可以说是描述工作流(任务串行/并行,任务依赖)的最简单直接的方式。

Rather, the activity diagram combines ideas from several techniques: the event diagrams of Jim Odell, SDL state modeling techniques, workflow modeling, and Petri nets. These diagrams are particularly useful in connection with workflow and in describing behavior that has a lot of parallel processing.

-- Martin Fowler, UML Distilled

工作流描述语言除了活动图之外,还有BPML等,此不赘述。

不少通用语言都有专门的工作流框架和引擎,通常以功能全面、可靠性强还有吞吐量大为特色。但这类工具往往技术细节多,入门复杂,对于小型任务更多是overkill。另外,Makefile其实也是一个工作流引擎,不过已经脱离了语言运行环境,使用不方便。

自Python 3引入async/await关键词和IO异步协程支持以来,在Python里面定义工作流可以变得非常简单,几乎接近built-in feature。更有启发的是,在扩展工作流特性的时候,我们发现async/await不仅是单纯的语法糖,更本质地是提供了一种不同的语言解释方式。

简易的工作流建模

这里借Martin Fowler的UML Distilled第九章中的几幅插图来演示一下。以该章节的代表性的工作流为例:原书图9.1简化版

该图中主要有以下几种通用元素:开始/结束

单个任务

箭头,表示任务执行的先后顺序(或表示任务的依赖关系)

分叉/合并(fork/merge)

不妨假设这些单个任务在语义上都是IO-bound并支持asyncio接口,我们可以考虑对这些元素按如下建模:

import asyncio as aio

info = print # or better, info = logger.info

async def seq(*coros):

results = []

for coro in coros:

results.append(await coro)

return tuple(results)

async def fork_and_merge(*coros):

return await aio.gather(*coros)

async def task(todo):

info(f"Doing task:{todo}")

await aio.sleep(1)

return todo

其中:seq代表顺序执行;

fork_and_merge代表并行,以及同步并行任务的结果集合;

task代表一般的单个任务;

于是我们可以这样描述该图的工作流定义:

async def flow():

await task('Receive Order')

await fork_and_merge(

seq(task('Send Invoice'),

task('Receive Payment')),

seq(task('Fill Order'),

task('Regular Delivery'))

)

await task('Close Order')

(可以看到,代码结构准确地反映了图中的结构。)

有了工作流定义,我们随时可以构建一个工作流的实例并执行之:

loop = aio.get_event_loop()

loop.run_until_complete(flow())

# output

Doing task: Receive Order

Doing task: Send Invoice

Doing task: Fill Order

Doing task: Receive Payment

Doing task: Regular Delivery

Doing task: Close Order

于是我们有了一套简易的工作流语言,并基于event loop的工作模式天然地得到了并发支持。

任务结果和上下文

虽然我们得到了无比简明的工作流模型,但是该工作流并不支持动态控制流。事实上,Martin Fowler原书里的图9.1是如下这样:原书图9.1

也即,Fill Order可以返回两种结果,而下游工作流则依赖于该结果。于是,我们需要想办法把这个控制逻辑也建模到工作流里面去。

利用协程的可组合性(看向函数式编程)上述工作流可以等效地描述为:原书图9.3

于是把局部的工作流隔离出来,利用Python自带的if-else结构,即可根据任务的结果选择下游任务:

async def process_order_subflow():

order = await get_filled_order()

if order.is_rush:

return await task('Overnight Delivery')

else:

return await task('Regular Delivery')

async def flow():

await task('Receive Order')

await fork_and_merge(

seq(task('Send Invoice'),

task('Receive Payment')),

process_order_subflow())

await task('Close Order')

我们不仅隔离了局部工作流的逻辑,使得它甚至可以单独测试和使用,更好的是我们还隔离了运行环境,比如order这个变量只在这个局部工作流范畴内创建/销毁,而不会泄露到外部运行环境中。这是个很舒服的性质。附:假设项目代码业务模型Order相关功能定义如下:

from dataclasses import dataclass

@dataclass

class Order:

is_rush : bool

# maybe define other properties

async def get_filled_order():

await aio.sleep(2) # simulate user interaction

return Order(bool(random.randint(0, 1)))

实例级别的隐式上下文

上述工作流的实例仅仅反映了动态图中描述的抽象结构,并未考虑到实例参数化问题。设想一个很简单的场景,我们对每一个申请交互的用户创建一个可辨识的工作流实例。于是我们需要给flow声明一个参数,传入一个用户名作为上下文。这时我们希望:该工作流实例的每个子工作流/子任务都可以访问这个上下文;

但我们不希望修改这些子工作流/子任务的签名,并总是自顶向下地显式传入这个上下文对象,这样带来大量重复代码;

借助Python 3.7引入的新魔法上下文变量ContextVar,我们可以从子任务里访问抽象的外部上下文,却能获取实例化的生下文。为此我们作以下微小的改动:

# add parameter `user`, which makes better sense in general

async def get_filled_order(user):

info(f'Retrieving order for{user}')

await aio.sleep(2) # simulate user interaction

return Order(bool(random.randint(0, 1)))

# ...

from contextvars import ContextVar

# declare context var external to the tasks

user_ctx_var = ContextVar('user')

async def task(todo):

# add logging of context var

info(f"Doing task:{todo}({user_ctx_var.get()})")

await aio.sleep(1)

return todo

async def process_order_subflow():

order = await get_filled_order(user_ctx_var.get())

if order.is_rush:

return await task('Overnight Delivery')

else:

return await task('Regular Delivery')

async def flow(user: str):

user_ctx_var.set(user)

await task('Receive Order')

await fork_and_merge(

seq(task('Send Invoice'), task('Receive Payment')),

process_order_subflow())

await task('Close Order')

基于此设定,我们最大化保留代码的原貌,却使得它们能自动适应实例上下文。

运行以下代码可以观察到结果——不同的flow实例,子任务获取到了不同的上下文:

async def main():

await aio.gather(

flow('Sam'),

flow('Joe'),

flow('Alice'),

)

loop.run_until_complete(main())

# output

Doing task: Receive Order (Sam)

Doing task: Receive Order (Joe)

Doing task: Receive Order (Alice)

Doing task: Send Invoice (Sam)

Retrieving order for Sam

Doing task: Send Invoice (Joe)

Retrieving order for Joe

Doing task: Send Invoice (Alice)

Retrieving order for Alice

Doing task: Receive Payment (Sam)

Doing task: Receive Payment (Joe)

Doing task: Receive Payment (Alice)

Doing task: Overnight Delivery (Sam)

Doing task: Overnight Delivery (Joe)

Doing task: Regular Delivery (Alice)

Doing task: Close Order (Sam)

Doing task: Close Order (Joe)

Doing task: Close Order (Alice)

分叉的可选性以及递归

UML Distilled里还列出了这幅图:原书图9.2等效图

该图告诉我们:分叉子进程的存在性是可以依赖于上下文

分叉结构可以递归存在

为支持第一条,我们可以调整fork_and_merge方法,过滤掉None值(代表被上下文决定丢弃的任务),并应用之:

# ...

async def fork_and_merge(*coros):

results = await aio.gather(

*filter(lambda x: x is not None, coros))

return results

# ...

def in_mood_for_wine(table):

good_mood = len(table) & 1 == 0 # simulate random mood

info(f'In good mood:{good_mood}')

return good_mood

async def service(todo):

info(f'Service doing:{todo}')

await aio.sleep(random() * 3)

return todo

async def serve_meal(table):

return await (

fork_and_merge(

seq(

fork_and_merge(

service('Cook Spaghetti'),

service('Mix Carbonara Sauce')),

service('Combine')

),

# optionality depending on context

service('Open Red Wine') if in_mood_for_wine(table) else None)

)

loop = aio.get_event_loop()

loop.run_until_complete(serve_meal('No. 42'))

至于分叉的递归,已由协程的可组合性所自然保证。但这里衍生出一个问题,即分叉的总数并不受控制,很可能几个工作流实例就创建了大量的分叉。

就本例而言,假如餐厅里的服务生数量有限,每个service实例必须由一个服务生来运行,那我们如何表述这种限制?

更进一步,如果有两类服务提供者(厨师+服务生),其中厨师工作方式是blocking的,而服务生工作方式是non-blocking的,又该如何以最简单的方式表述?

(未完待续

python工作流引擎_工作流,活动图和Python协程(一)相关推荐

  1. 一张图了解python基本语法_一张图认识Python(附基本语法总结)

    一张图带你了解Python,更快入门, 一张图认识Python(附基本语法总结) Python基础语法总结: 1.Python标识符 在 Python 里,标识符有字母.数字.下划线组成. 在 Pyt ...

  2. 阿里工作流引擎_免费开源,一款快速开发模块化脚手架,含工作流引擎

    简介 lenosp 一款快速开发模块化脚手架,采用 spring boot 2.0.1.spring.SpringMvc.mybatis.shiro.activiti 工作流.swagger.ehca ...

  3. spring 工作流引擎_带Spring的简单工作流引擎

    spring 工作流引擎 几个月前,在处理一个公司项目时,我们需要开发REST服务,该服务用于根据客户端应用程序发送的数据发送电子邮件. 在开发此服务期间,我们决定创建简单的工作流引擎,该引擎将为发送 ...

  4. F2工作流引擎之 工作流运转模型(三)

    1流程单起点单终止模型 单起点:一个流程定义必须有且唯一起点 单结束点:一个流程定义必须有且唯一结束点. 约定:提单与结束是每个流程必须有的活动,且唯一只有一个提单和结束. 2串行模型 描述:串行(S ...

  5. 用python画皇冠_手把手教你用 Python 绘制酷炫的桑基图!

    原标题:手把手教你用 Python 绘制酷炫的桑基图! 作者 | 周志鹏 责编 | 郭 芮 最近,不止一次收到小伙伴的截图追问: "这个图叫什么???" "这个图真好看! ...

  6. python 推理引擎_【技术文档】OpenVINO推理引擎示例

    推理引擎示例 推理引擎示例应用是简单的控制台应用,显示了如何在应用中利用特定的推理引擎功能,帮助开发人员执行特定的任务,例如加载模型.运行推理.查询特定的设备功能等. 安装英特尔®OpenVINO™工 ...

  7. python 两点曲线_十行代码,用Python做一个迷你版的美图秀秀

    美图秀秀相信大家都不陌生,大家只要操作美图秀秀,就可以P掉图片中脸上的一些瑕疵,让人变得更加的美丽.今天小编就带领大家来借助Python和Flask来实现一个美图秀秀的网页设计,大家只需要通过网页上传 ...

  8. python画图宽度_手把手教你用 Python 绘制酷炫的桑基图!

    作者 | 周志鹏 责编 | 郭   芮 最近,不止一次收到小伙伴的截图追问: "这个图叫什么???" "这个图真好看!!!怎么画啊?" ...... 笔者本没有 ...

  9. 天津python招聘信息网_【天津-滨海新区Python招聘_最新热搜天津-滨海新区Python人才招聘信息】-前程无忧...

    学历要求:硕士|工作经验:3-4年|公司性质:上市公司|公司规模:150-500人 专业:计算机.自动化.软件工程.图形图像学或相关专业 语言:c c++ c# java 数据库 算法 工作经验:3年 ...

最新文章

  1. R语言构建ElasticNet回归模型实战:基于mtcars数据集
  2. 物理光学11 衍射的基本概念与惠更斯原理
  3. BZOJ 1012: [JSOI2008]最大数maxnumber 单调队列/线段树/树状数组/乱搞
  4. 七 内置锁 wait notify notifyall; 显示锁 ReentrantLock
  5. Watermaker水位线/水印
  6. Harmony OS — RadioButton RadioContainer单选按钮单选按钮组
  7. jQuery新的事件绑定机制on()
  8. scrapy爬虫—获取script中的data数据
  9. 【原】115网盘下载地址解析工具(暂停更新)
  10. 音视频是怎样实现传输的
  11. Android 集成支付宝支付
  12. docker用现有容器创建镜像
  13. 计算机网络与Netty - F2F
  14. 那个幻想奇特的诗意女孩
  15. 数字电路复位电路解析
  16. NUC970 SD卡驱动(SDIO)
  17. 查找linux内核漏洞查用的方法脚本
  18. vue引入鼠标点击效果
  19. 【Popper报错】Popper: modifier “undefined“ provided an invalid “fn“ property
  20. codeforce 555 div3 题解报告

热门文章

  1. 了解汽车上的OBD-II接口
  2. Uniswap V2里的手续费换算
  3. 《STL》— UVa10815 Andy's First Dictionary
  4. 区块链落地就看这一“会”了
  5. Python | 阿尔法基本图形绘制
  6. Excel中引用方法
  7. JavaScript纯前端解析Excel文件
  8. 家里电脑总是显示宽带连接服务器,电脑宽带连接
  9. 教师资格证报名使用IE浏览器
  10. react路由传参的几种方式