开源工作流引擎 Workflow Core 的研究和使用教程
开源工作流引擎 Workflow Core 的研究和使用教程
目录
- 开源工作流引擎 Workflow Core 的研究和使用教程
- 一,工作流对象和使用前说明
- 二,IStepBuilder 节点
- 三,工作流节点的逻辑和操作
- 容器操作
- 普通节点
- 事件
- 条件体和循环体
- 节点的异步或多线程
- 用于事务的操作
- 四,条件或开关
- 迭代
- 条件判断
- 节点并发
- 五,其它
一,工作流对象和使用前说明
为了避免歧义,事先约定。
工作流有很多节点组成,一个节点成为步骤点(Step)。
1,IWorkflow / IWorkflowBuilder
Workflow Core 中,用于构建工作流的类继承 IWorkflow
,代表一条有任务规则的工作流,可以表示工作流任务的开始或者 Do() 方法,或工作流分支获取其它方法。
IWorkflow 有两个同名接口:
public interface IWorkflow<TData>where TData : new(){string Id { get; }int Version { get; }void Build(IWorkflowBuilder<TData> builder);}public interface IWorkflow : IWorkflow<object>{}
Id:此工作流的唯一标识符;
Version:此工作流的版本。
void Build
:在此方法内构建工作流。
工作流运作过程中,可以传递数据。有两种传递方法:使用泛型,从运行工作流时就要传入;使用 object 简单类型,由单独的步骤产生并且传递给下一个节点。
IWorkflowBuilder 是工作流对象,构建一个具有逻辑规则的工作流。可以构建复杂的、具有循环、判断的工作流规则,或者并行或者异步处理工作流任务。
一个简单的工作流规则:
public class DeferSampleWorkflow : IWorkflow{public string Id => "DeferSampleWorkflow";public int Version => 1;public void Build(IWorkflowBuilder<object> builder){builder.StartWith(context =>{// 开始工作流任务Console.WriteLine("Workflow started");return ExecutionResult.Next();}).Then<SleepStep>().Input(step => step.Period, data => TimeSpan.FromSeconds(20)).Then(context =>{Console.WriteLine("workflow complete");return ExecutionResult.Next();});}}
2,EndWorkflow
此对象表示当前工作流任务已经结束,可以表示主工作流或者工作流分支任务的完成。
/// Ends the workflow and marks it as completeIStepBuilder<TData, TStepBody> EndWorkflow();
因为工作流是可以出现分支的,每个工作流各自独立工作,每个分支都有其生命周期。
3,容器
ForEach
、While
、If
、When
、Schedule
、Recur
是步骤容器。都返回IContainerStepBuilder<TData, Schedule, TStepBody>
Parallel、Saga是步骤的容器,都返回 IStepBuilder<TData, Sequence>
。
ForEach、While、If、When、Schedule、Recur 返回类型的接口:
public interface IContainerStepBuilder<TData, TStepBody, TReturnStep>where TStepBody : IStepBodywhere TReturnStep : IStepBody{/// The block of steps to executeIStepBuilder<TData, TReturnStep> Do(Action<IWorkflowBuilder<TData>> builder);
Parallel、Saga :
/// Execute multiple blocks of steps in parallelIParallelStepBuilder<TData, Sequence> Parallel();/// Execute a sequence of steps in a containerIStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);
也就是说,ForEach、While、If、When、Schedule、Recur 是真正的容器。
按照我的理解,继承了 IContainerStepBuilder
的,是一个容器,一个流程下的一个步骤/容器;因为 Workflow Core 作者对接口的命名很明显表达了 This a container
。
因为里面包含了一组操作,可以说是一个步骤里面包含了一个流程,这个流程由一系列操作组成,它是线性的,是顺序的。里面是一条工作流(Workflow)。
而 Parllel、Saga,相当于步骤点的容器。
更直观的理解是电路,继承 IContainerStepBuilder 的是串联设备的容器,是顺序的;
Parllel 是并联电路/设备的一个容器,它既是一个开关,使得一条电路变成多条并流的电路,又包含了这些电路的电器。里面可以产生多条工作流,是多分支的、不同步的、独立的。
1
从实现接口上看,ForEach、While、If、When、Schedule、Recur、Parllel 都实现了 Do()
方法,而 Saga 没有实现。
关于 Saga,后面说明。
4,工作流的步骤点
实现接口如下:
IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;IStepBuilder<TData, InlineStepBody> StartWith(Func<IStepExecutionContext, ExecutionResult> body);IStepBuilder<TData, ActionStepBody> StartWith(Action<IStepExecutionContext> body);IEnumerable<WorkflowStep> GetUpstreamSteps(int id);IWorkflowBuilder<TData> UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
方法名称 |
说明 |
---|---|
StartWith |
任务的开始,必须调用此方法 |
GetUpstreamSteps |
获取上一个步骤(StepBody)的ID |
UseDefaultErrorBehavior |
不详 |
StepBody 是一个节点,IStepBuilder 构建一个节点,只有通过 StartWith,才能开始一个工作流、一个分支、异步任务等。
UseDefaultErrorBehavior
笔者没有使用到,不敢瞎说。貌似与事务有关,当一个步骤点发生异常时,可以终止、重试等。
二,IStepBuilder 节点
IStepBuilder 表示一个节点,或者说一个容器,里面可以含有其它操作,例如并行、异步、循环等。
1,设置属性的方法
Name:设置此步骤点的名称;
id:步骤点的唯一标识符。
/// Specifies a display name for the stepIStepBuilder<TData, TStepBody> Name(string name);/// Specifies a custom Id to reference this stepIStepBuilder<TData, TStepBody> Id(string id);
2,设置数据
前面说到,工作流每个步骤点传递数据有两种方式。
TData(泛型) 是工作流中,随着流传的数据,这个对象会在整个工作流程生存。
例如 Mydata
class RecurSampleWorkflow : IWorkflow<MyData>{public string Id => "recur-sample";public int Version => 1;public void Build(IWorkflowBuilder<MyData> builder){...}}public class MyData{public int Counter { get; set; }}
3,Input / Output
为当前步骤点(StepBody)设置数据,亦可为 TData 设置数据。
两类数据:每个步骤点都可以拥有很多字段、属性和方法等;工作流流转 TData。
Input、Output 是设置这些数据的具体方法。
IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, TInput>> value);IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, IStepExecutionContext, TInput>> value);IStepBuilder<TData, TStepBody> Input(Action<TStepBody, TData> action);IStepBuilder<TData, TStepBody> Output<TOutput>(Expression<Func<TData, TOutput>> dataProperty, Expression<Func<TStepBody, object>> value);
三,工作流节点的逻辑和操作
容器操作
1,Saga
用于在容器中执行一系列操作。
/// Execute a sequence of steps in a containerIStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);
虽然注释说明 “用于在容器中执行一系列操作”,但实际上它不是一个真正的”容器“。
因为它没有继承 IContainerStepBuilder
,也没有实现 Do()
。
但是它返回的 Sequence
实现了ContainerStepBody
。
如果说真正的容器相当于一条长河流中的一个湖泊(可以容纳和储水),而 Saga 可能只是某一段河流的命名,而不是具体的湖泊。
或者说 static void Main(string[] args)
里面的代码太多了,新建一个方法体,把部分代码放进去。总不能把所有代码写在一个方法里吧?那么创建一个类,把代码分成多个部分,放到不同方法中,增强可读性。本质还是没有变。
Saga 可以用来处理事务,进行重试或回滚等操作。后面说明。
普通节点
1,Then
用于创建下一个节点,创建一个普通节点。可以是主工作流的节点(最外层)、或者作为循环、条件节点里的节点、作为节点中节点的节点。
IStepBuilder<TData, TStep> Then<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;IStepBuilder<TData, TStep> Then<TStep>(IStepBuilder<TData, TStep> newStep) where TStep : IStepBody;IStepBuilder<TData, InlineStepBody> Then(Func<IStepExecutionContext, ExecutionResult> body);IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);
2,Attach
Then 作为普通节点,按顺序执行。操作对象是类型、StepBody。
Attach 也是普通节点,无特殊意义,通过 id 来指定要执行 StepBody 。可以作为流程控制的跳转。
相当于 goto 语句。
/// Specify the next step in the workflow by IdIStepBuilder<TData, TStepBody> Attach(string id);
事件
1,WaitFor
用于定义事件,将当前节点作为事件节点,然后在后台挂起,工作流会接着执行下一个节点。在工作流停止前,可以通过指定 标识符(Id) 触发事件。在一个工作流中,每个事件的标识符都是唯一的。
IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, IStepExecutionContext, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
条件体和循环体
1,End
意思应该是结束一个节点的运行。
如果在 When 中使用,相当于 break。
IStepBuilder<TData, TStep> End<TStep>(string name) where TStep : IStepBody;
使用例子
builder.StartWith<RandomOutput>(x => x.Name("Random Step")).When(0).Then<TaskA>().Then<TaskB>() .End<RandomOutput>("Random Step").When(1).Then<TaskC>().Then<TaskD>().End<RandomOutput>("Random Step");
2,CancelCondition
在一个条件下过早地取消此步骤的执行。
应该相当于 contiune。
/// Prematurely cancel the execution of this step on a conditionIStepBuilder<TData, TStepBody> CancelCondition(Expression<Func<TData, bool>> cancelCondition, bool proceedAfterCancel = false);
节点的异步或多线程
1,Delay
延迟执行,使得当前节点延时执行。并非是阻塞当前的工作流运行。Delay 跟在节点后面,使得这个节点延时运行。可以理解成异步,工作流不会等待此节点执行完毕,会直接执行下一个节点/步骤。
/// Wait for a specified periodIStepBuilder<TData, Delay> Delay(Expression<Func<TData, TimeSpan>> period);
2,Schedule
预定执行。将当前节点设置一个时间,将在一段时间后执行。Schedule 不会阻塞工作流。
Schedule 是非阻塞的,工作流不会等待Schedule执行完毕,会直接执行下一个节点/步骤。
/// Schedule a block of steps to execute in parallel sometime in the futureIContainerStepBuilder<TData, Schedule, TStepBody> Schedule(Expression<Func<TData, TimeSpan>> time);
例子
builder.StartWith(context => Console.WriteLine("Hello")).Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule.StartWith(context => Console.WriteLine("Doing scheduled tasks"))).Then(context => Console.WriteLine("Doing normal tasks"));
3,Recur
用于重复执行某个节点,直至条件不符。
Recur 是非阻塞的,工作流不会等待 Rezur 执行完毕,会直接执行下一个节点/步骤。
/// Schedule a block of steps to execute in parallel sometime in the future at a recurring intervalIContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);
用于事务的操作
相当于数据库中的事务,流程中某些步骤发生异常时的时候执行某些操作。
例如:
builder.StartWith(context => Console.WriteLine("Begin")).Saga(saga => saga.StartWith<Task1>().CompensateWith<UndoTask1>().Then<Task2>().CompensateWith<UndoTask2>().Then<Task3>().CompensateWith<UndoTask3>()).OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5)).Then(context => Console.WriteLine("End"));
1,CompensateWith
如果此步骤引发未处理的异常,则撤消步骤;如果发生异常,则执行。
可以作为节点的 B计划。当节点执行任务没有问题时, CompensateWith 不会运行;如果节点发生错误,就会按一定要求执行 CompensateWith 。
/// Undo step if unhandled exception is thrown by this stepIStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body);IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);
2,CompensateWithSequence
如果此步骤引发未处理的异常,则撤消步骤;如果发生异常,则执行。与 CompensateWith 的区别是,传入参数前者是 Func,后者是 Action。
CompensateWith
的内部实现了 CompensateWith
,是对 CompensateWith
的封装。
/// Undo step if unhandled exception is thrown by this stepIStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);
3,OnError
用于事务操作,表示发生错误时如果回滚、设置时间等。一般与 Saga 一起使用。
OnError 是阻塞的。
/// Configure the behavior when this step throws an unhandled exceptionIStepBuilder<TData, TStepBody> OnError(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
OnError 可以捕获一个容器内,某个节点的异常,并执行回滚操作。如果直接在节点上使用而不是容器,可以发生回滚,然后执行下个节点。如果作用于容器,那么可以让容器进行重新运行,等一系列操作。
OnError 可以与 When、While 等节点容器一起使用,但他们本身带有循环功能,使用事务会让代码逻辑变得奇怪。
Saga 没有条件判断、没有循环,本身就是一个简单的袋子,是节点的容器。因此使用 Saga 作为事务操作的容器,十分适合,进行回滚、重试等一系列操作。
四,条件或开关
迭代
1,ForEach
迭代,也可以说是循环。内部使用 IEnumerable 来实现。
与 C# 中 Foreach 的区别是,C# 中是用来迭代数据;
而工作流中 ForEach 用来判断元素个数,标识应该循环多少次。
ForEach 是阻塞的。
/// Execute a block of steps, once for each item in a collection in a parallel foreachIContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);
示例
builder.StartWith<SayHello>().ForEach(data => new List<int>() { 1, 2, 3, 4 }).Do(x => x.StartWith<DisplayContext>().Input(step => step.Item, (data, context) => context.Item).Then<DoSomething>()).Then<SayGoodbye>();
最终会循环5次。
条件判断
1,When
条件判断,条件是否真。
When 是阻塞的。
When 可以捕获上一个节点流转的数据(非 TData)。
/// Configure an outcome for this step, then wire it to another step[Obsolete]IStepOutcomeBuilder<TData> When(object outcomeValue, string label = null);/// Configure an outcome for this step, then wire it to a sequenceIContainerStepBuilder<TData, When, OutcomeSwitch> When(Expression<Func<TData, object>> outcomeValue, string label = null);
前一个方法例如
When(0)
,会捕获 return ExecutionResult.Outcome(value);
的值,判断是否相等。但是这种方式已经过时。
需要使用表达式来判断。例如
.When(data => 1)
.When(data => data.value==1)
2,While
条件判断,条件是否真。与When有区别,When可以捕获 ExecutionResult.Outcome(value);
。
While 是阻塞的。
/// Repeat a block of steps until a condition becomes trueIContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition);
示例
builder.StartWith<SayHello>().While(data => data.Counter < 3).Do(x => x.StartWith<DoSomething>().Then<IncrementStep>().Input(step => step.Value1, data => data.Counter).Output(data => data.Counter, step => step.Value2)).Then<SayGoodbye>();
3,If
条件判断,是否符合条件。
If是阻塞的。
/// Execute a block of steps if a condition is trueIContainerStepBuilder<TData, If, If> If(Expression<Func<TData, bool>> condition);
When、While、If的区别是,When、While 是条件是否为真,If是表达式是否为真。
实质上,是语言上的区别,与代码逻辑无关。
真假用 When/While,条件判断、表达式判断用 If 。
节点并发
1,Parallel
并行任务。作为容器,可以在里面设置多组任务,这些任务将会同时、并发运行。
Parallel 是阻塞的。
/// Execute multiple blocks of steps in parallelIParallelStepBuilder<TData, Sequence> Parallel();
示例:
.StartWith<SayHello>().Parallel().Do(then => then.StartWith<PrintMessage>().Input(step => step.Message, data => "Item 1.1").Then<PrintMessage>().Input(step => step.Message, data => "Item 1.2")).Do(then =>then.StartWith<PrintMessage>().Input(step => step.Message, data => "Item 2.1").Then<PrintMessage>().Input(step => step.Message, data => "Item 2.2").Then<PrintMessage>().Input(step => step.Message, data => "Item 2.3")).Do(then =>then.StartWith<PrintMessage>().Input(step => step.Message, data => "Item 3.1").Then<PrintMessage>().Input(step => step.Message, data => "Item 3.2")).Join().Then<SayGoodbye>();
有三个 Do,代表三个并行任务。三个 Do 是并行的,Do 内的代码,会按顺序执行。
Paeallel 的 Do:
public interface IParallelStepBuilder<TData, TStepBody>where TStepBody : IStepBody{IParallelStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder);IStepBuilder<TData, Sequence> Join();}
比起 ForEach、When、While、If,除了有 Do,还有 Join 方法。
对于其它节点类型来说,Do直接构建节点。
对于Parallel来说,Do收集任务,最终需要Join来构建节点和运行任务。
五,其它
写得长不好看,其它内容压缩一下。
数据传递和依赖注入
Workflow Core 支持对每个步骤点进行依赖注入。
支持数据持久化
Workflow Core 支持将构建的工作流存储到数据库中,以便以后再次调用。
支持 Sql Server、Mysql、SQLite、PostgreSQL、Redis、MongoDB、AWS、Azure、
Elasticsearch、RabbitMQ... ....
支持动态调用和动态生成工作流
你可以通过 C# 代码构建工作流,或者通过 Json、Yaml 动态构建工作流。
可以利用可视化设计器,将逻辑和任务生成配置文件,然后动态传递,使用 Workflow Core 动态创建工作流。
篇幅有限,不再赘述。
开源工作流引擎 Workflow Core 的研究和使用教程相关推荐
- 几种开源工作流引擎的简单比较(转)
摘要:目前开源工作流引擎用的最多的是jbpm , 各种特性都不错, 文档也比较多, 下面只简单列举一下 目前开源工作流引擎用的最多的是jbpm , 各种特性都不错, 文档也比较多, 下面只简单列举一下 ...
- 分布式开源工作流引擎有什么特点?
在竞争越来越激烈的社会中,拥有提质增效的办公软件,可以为企业带来更可观的市场价值.分布式开源工作流引擎在企业数字化发展进程中深受欢迎,在帮助企业提升办公效率上发挥了重要的作用.今天,我们就一起里盘点下 ...
- java bpm 开源_几种Java开源工作流引擎的简单比较
摘要:目前开源工作流引擎用的最多的是jbpm , 各种特性都不错, 文档也比较多, 下面只简单列举一下目前开源工作流引擎用的最多的是jbpm , 各种特性都不错, 文档也比较多, 下面只简单列举一下其 ...
- Java三大主流开源工作流引擎技术分析
首先,这个评论是我从网上,书中,搜索和整理出来的,也许有技术点上的错误点,也许理解没那么深入.但是我是秉着学习的态度加以评论,学习,希望对大家有用,进入正题! 三大主流工作流引擎:Shark,oswo ...
- 三大主流开源工作流引擎技术分析与市场预测
1.从<功夫>说起 时下的新新人类看到我,一定会认为在下是个十足的老古董,这不,<功夫>这样的片子我到今年2月底才看.不过看过<功夫>,我想的一定比一般的人多:周星 ...
- Java四大主流开源工作流引擎分析Shark,osworkflow,jbpm,jflow
首先,这个评论是我从网上,书中,搜索和整理出来的,也许有技术点上的错误点,也许理解没那么深入.但是我是秉着学习的态度加以评论,学习,希望对大家有用,进入正题! 四大主流工作流引擎:Shark,oswo ...
- java开源工作流引擎优势是什么?
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言 一.流程引擎的自我介绍 二.java开源工作流引擎优势介绍 总结 前言 IBPSv3.5是流辰信息团队面向Java开发 ...
- 哪个开源工作流引擎更好?Flowable or Camunda ?
哪个开源工作流引擎更好? lecture:波哥 当下在国内大家可以选择的开源的工作流引擎还是比较多的,但是对于具体选择用哪个产品,各自的优缺点有哪些其实并不是太清楚,为此波哥今天专门给大家来整理总 ...
- Java三大主流开源工作流引擎分析
Java三大主流开源工作流引擎分析 首先,这个评论是我从网上,书中,搜索和整理出来的,也许有技术点上的错误点,也许理解没那么深入.但是我是秉着学习的态度加以评论,学习,希望对大家有用,进入正题! 三大 ...
- 几种开源工作流引擎的简单比较
目前开源工作流引擎用的最多的是jbpm , 各种特性都不错, 文档也比较多, 下面只简单列举一下 其他几种工作流引擎的特性. Apache ODE Enhydra Shark jflow Open ...
最新文章
- PyQt5 技巧篇-窗口置顶设置,如何使窗口始终显示在最前面
- asp.net获取客户端信息
- 数学建模清风第三次直播:excel在数学建模中的应用
- Taro多端开发实现原理与项目实战(一)
- 【BZOJ-1952】城市规划 [坑题] 仙人掌DP + 最大点权独立集(改)
- linux系统如何安装vesta,[linux服务器]安装Vesta Control Panel
- FreeBSD 安装axel提高ports的安装速度
- c++ 虚函数实现原理
- 将时间戳转为中国标准时间
- SVN客户端安装以及操作流程
- Unity URP 手撸一个自己的PBR材质
- 解决:HotSeat短信图标提醒有误
- huyuhang-python-day03
- 【VBA研究】用Ping命令测试IP地址是否通达
- Hadoop/Hive-学习笔记【中级篇】
- 9、MyBatis的动态SQL
- 3D游戏建模布线方法
- 最简单的海报制作工具:Poster Forge
- 【论文翻译】-- GaitSet: Regarding Gait as a Set for Cross-View Gait Recognition
- HTML5视频监控技术预研
热门文章
- 【分享】揭发天气秀、桌面秀、雪狐等号称资源占用小的桌面软件的流氓行为!
- U3D资源导出至Laya
- VirtualBox系统虚拟盘格式转换vdi/vhd/vmdk
- java制作小鱼吃大鱼_大鱼吃小鱼游戏(Java编写)
- [渝粤教育] 新乡医学院 医学微生物学 参考 资料
- python测试之道进阶_深入学习AB测试(一)-AB Testing With Python[项目实战]
- USB转串口芯片CH340G的使用,3.3V或5V供电电路
- osm 搭建离线地图_开源地图OSM
- MongoDB数据库重命名
- 苹果授权登录(Sign in with Apple)-JAVA后端开发