• 介绍

    • Conductor 优点
    • 为什么不进行点对点编排?
  • 基本概念
    • 工作流定义
    • 任务定义
    • 系统任务
    • 工人任务
    • 工作流任务的生命周期
  • 元数据定义
    • 任务定义
    • 重试逻辑
    • 超时政策
  • 工作流定义
    • 工作流程中的任务
    • 连接输入和输出
    • $ {SOURCE.input / output.JSONPath}
    • JSON路径支持
  • 系统任务的创建
    • (DYNAMIC) 动态任务定义
    • (DECIDE)决策任务定义
    • Fork 并行任务定义
    • Dynamic Fork (动态分支)
    • Join
    • Join 任务输出
    • 子工作流程
    • Wait
    • SQS队列
    • HTTP
      • 参数
    • Event (事件)
    • 支持的接收器
    • 事件任务输入
    • 事件任务输出

本文是对 Conductor 文档的简单翻译,建议你认真阅读,如果阅读后你仍然不知道如何使用,可以继续关注本博客,我会在后续的博客中更新 Conductor 实战

介绍

Conductor是一个微服务的编排引擎

Conductor 优点

Conductor,帮助我们协调基于微服务的流程,具有以下功能:

  • 允许创建复杂的流程/业务流,其中由微服务实现单个任务。
  • 基于JSON DSL的定义执行流程。
  • 为这些流程提供可见性和可追溯性。
  • 在暂停,恢复,重启等周围公开控制语义,以获得更好的devops体验。
  • 允许更多地重用现有的微服务,为管理提供更容易的途径。
  • 用户界面可视化流程。
  • 能够在需要时同步处理所有任务。
  • 能够扩展数百万个并发运行的流程。
  • 由客户端提取的排队服务支持。
  • 能够在HTTP或其他传输上运行,例如gRPC。

为什么不进行点对点编排?

通过点对点任务编排,我们发现随着业务需求和复杂性的增长难以扩展。发布/订阅模型适用于最简单的流程,
但很快就突出了与该方法相关的一些问题:

  • 流程“嵌入”在多个应用程序的代码中。
  • 通常,围绕输入/输出,SLA等存在紧密耦合和假设,使得更难以适应不断变化的需求。
  • 几乎没有办法系统地回答“我们用过程X做了多少”?

基本概念

工作流定义

工作流是使用基于JSON的DSL定义的,包括一组作为工作流的一部分执行的任务。任务是在远程机器上执行的控制任务(fork,条件等)或应用程序任务(例如编码文件)。

任务定义

  • 所有任务都需要在活动工作流程使用之前进行注册。
  • 任务可以在多个工作流程中重复使用。工人任务分为两类:
    • 系统任务
    • 工人任务

系统任务

系统任务在Conductor服务器的JVM内执行,并由Conductor管理,以实现其可执行性和可扩展性。

名称 目的
DYNAMIC 基于任务的输入表达式派生的工作任务,而不是静态定义为计划的一部分
DECIDE 决策任务 - 实现案例……开关样式分叉
FORK 分叉一组并行的任务。计划每个集合并行执行
FORK_JOIN_DYNAMIC 与FORK类似,但FORK_JOIN_DYNAMIC不是在并行执行计划中定义的任务集,而是根据此任务的输入表达式生成并行任务
JOIN 补充FORK和FORK_JOIN_DYNAMIC。用于合并一个或多个并行分支*
SUB_WORKFLOW 将另一个工作流嵌套为子工作流任务。在执行时,它实例化子工作流并等待它完成
EVENT 在支持的事件系统中生成事件(例如,Conductor,SQS)

Conductor提供了一个API来创建在与引擎相同的JVM中执行的用户定义任务。有关详细信息,请参阅WorkflowSystemTask接口。

工人任务

工作人员任务由应用程序实现,并在与Conductor不同的环境中运行。工作人员任务可以用任何语言实现。这些任务通过REST API端点与Conductor服务器通信,以轮询任务发现并执行,并在执行后更新其状态。

工作人员任务由计划中的任务类型SIMPLE标识。

工作流任务的生命周期

元数据定义

任务定义

Conductor维护着一个工作人员任务类型的注册表。在工作流程中使用之前必须注册任务类型。

{"name": "encode_task","retryCount": 3,"timeoutSeconds": 1200,"inputKeys": ["sourceRequestId","qcElementType"],"outputKeys": ["state","skipped","result"],"timeoutPolicy": "TIME_OUT_WF","retryLogic": "FIXED","retryDelaySeconds": 600,"responseTimeoutSeconds": 3600
}
领域 描述 笔记
name 任务类型 唯一
retryCount 任务标记为失败时尝试重试的次数
retryLogic 重试机制 看下面的可能值
timeoutSeconds 以毫秒为单位的时间,在此之后,如果在转换到IN_PROGRESS状态后未完成任务,则将任务标记为TIMED_OUT 如果设置为0,则不会超时
timeoutPolicy 任务的超时策略 看下面的可能值
responseTimeoutSeconds 如果大于0,则在此时间之后未更新状态时,将重新安排任务。当工作人员轮询任务但由于错误/网络故障而无法完成时很有用。
outputKeys 任务输出的键集。用于记录任务的输出

重试逻辑

  • FIXED :重新安排任务后的任务 retryDelaySeconds
  • EXPONENTIAL_BACKOFF:重新安排之后 retryDelaySeconds * attempNo

超时政策

  • RETRY :再次重试该任务
  • TIME_OUT_WF:工作流程标记为TIMED_OUT并终止
  • ALERT_ONLY:注册计数器(task_timeout)

工作流定义

使用基于JSON的DSL定义工作流。

{"name": "encode_and_deploy","description": "Encodes a file and deploys to CDN","version": 1,"tasks": [{"name": "encode","taskReferenceName": "encode","type": "SIMPLE","inputParameters": {"fileLocation": "${workflow.input.fileLocation}"}},{"name": "deploy","taskReferenceName": "d1","type": "SIMPLE","inputParameters": {"fileLocation": "${encode.output.encodeLocation}"}}],"outputParameters": {"cdn_url": "${d1.output.location}"},"schemaVersion": 2
}
领域 描述 笔记
name 工作流程的名称
description 工作流程的描述性名称
version 用于标识架构版本的数字字段。使用递增数字 启动工作流程执行时,如果未指定,则使用具有最高版本的定义
tasks 一系列任务定义,如下所述。
outputParameters 用于生成工作流输出的JSON模板 如果未指定,则将输出定义为上次执行的任务的输出
inputParameters 输入参数列表。用于记录工作流程所需的输入 可选的

工作流程中的任务

tasks工作流中的属性定义要按该顺序执行的任务数组。以下是每项任务所需的强制性最低参数:

领域 描述 笔记
name 任务名称。在开始工作流程之前,必须使用Conductor注册为任务类型
taskReferenceName 别名用于在工作流程中引用任务。必须是独一无二的。
type 任务类型。SIMPLE用于远程工作人员或其中一个系统任务类型执行的任务
description 任务描述 可选的
optional 对或错。设置为true时 - 即使任务失败,工作流也会继续。任务的状态反映为COMPLETED_WITH_ERRORS 默认为 false
inputParameters JSON模板,用于定义给予任务的输入 有关详细信息,请参见“接线输入和输出”

除了这些参数,需要进行具体的任务类型附加参数记录在这里

连接输入和输出

当触发新的执行时,客户端会为工作流提供输入。工作流输入是通过${workflow.input…}表达式提供的JSON有效负载。

基于inputParameters工作流定义中配置的模板,为工作流中的每个任务提供输入。 inputParameters是一个JSON片段,其值包含用于在执行期间映射工作流的输入或输出或其他任务的值的参数。

映射值的语法遵循以下模式:

$ {SOURCE.input / output.JSONPath}

- -
SOURCE 可以是任何任务的“工作流程”或引用名称
input/output 指源的输入或输出
JSONPath JSON路径表达式从源的输入/输出中提取JSON片段

JSON路径支持

Conductor支持JSONPath规范并从此处使用Java实现。

考虑一个任务,其输入配置为使用来自工作流的输入/输出参数和名为loc_task的任务。

{"inputParameters": {"movieId": "${workflow.input.movieId}","url": "${workflow.input.fileLocation}","lang": "${loc_task.output.languages[0]}","http_request": {"method": "POST","url": "http://example.com/${loc_task.output.fileId}/encode","body": {"recipe": "${workflow.input.recipe}","params": {"width": 100,"height": 100 }},"headers": {"Accept": "application/json","Content-Type": "application/json"}}}
}

请将以下内容视为工作流输入

{"movieId": "movie_123","fileLocation":"s3://moviebucket/file123","recipe":"png"
}

并且loc_task的输出如下;

{"fileId": "file_xxx_yyy_zzz","languages": ["en","ja","es"]
}

在安排任务时,Conductor将合并工作流输入和loc_task输出中的值,并按如下方式创建任务输入:

{"movieId": "movie_123","url": "s3://moviebucket/file123","lang": "en","http_request": {"method": "POST","url": "http://example.com/file_xxx_yyy_zzz/encode","body": {"recipe": "png","params": {"width": 100,"height": 100}},"headers": {"Accept": "application/json","Content-Type": "application/json"}}
}

系统任务的创建

(DYNAMIC) 动态任务定义

参数:

名称 描述
dynamicTaskNameParam 任务输入中用于计划任务的值的参数名称。例如,如果参数的值是ABC,则调度的下一个任务是“ABC”类型。

{"name": "user_task","taskReferenceName": "t1","inputParameters": {"files": "${workflow.input.files}","taskToExecute": "${workflow.input.user_supplied_task}"},"type": "DYNAMIC","dynamicTaskNameParam": "taskToExecute"
}

如果使用输入参数user_supplied_task的值作为user_task_2启动工作流,则Conductor将在计划此动态任务时调度user_task_2。

(DECIDE)决策任务定义

决策任务类似于case…switch编程语言中的语句。该任务需要3个参数:

参数:

名称 描述
caseValueParam 任务输入中参数的名称,其值将用作开关。
decisionCases 可以键入值的映射值的caseValueParam值是要执行的任务列表。
defaultCase 在判定案例中找不到匹配值时要执行的任务列表(默认条件)

{"name": "decide_task","taskReferenceName": "decide1","inputParameters": {"case_value_param": "${workflow.input.movieType}"},"type": "DECISION","caseValueParam": "case_value_param","decisionCases": {"Show": [{"name": "setup_episodes","taskReferenceName": "se1","inputParameters": {"movieId": "${workflow.input.movieId}"},"type": "SIMPLE"},{"name": "generate_episode_artwork","taskReferenceName": "ga","inputParameters": {"movieId": "${workflow.input.movieId}"},"type": "SIMPLE"}],"Movie": [{"name": "setup_movie","taskReferenceName": "sm","inputParameters": {"movieId": "${workflow.input.movieId}"},"type": "SIMPLE"},{"name": "generate_movie_artwork","taskReferenceName": "gma","inputParameters": {"movieId": "${workflow.input.movieId}"},"type": "SIMPLE"}]}
}

Fork 并行任务定义

Fork用于调度并行任务集。

参数:

名称 描述
forkTasks 任务列表列表。每个子列表计划并行执行。但是,子列表中的任务是以串行方式安排的。

{"forkTasks": [[{"name": "task11","taskReferenceName": "t11"},{"name": "task12","taskReferenceName": "t12"}],[{"name": "task21","taskReferenceName": "t21"},{"name": "task22","taskReferenceName": "t22"}]]
}

执行时,task11和task21被安排在同一时间执行。

Dynamic Fork (动态分支)

Dynamic fork与FORK_JOIN任务相同。除了在运行时使用任务的输入提供要并行的任务列表。当并行的任务数量不固定并根据输入而变化时很有用。

名称 描述
dynamicForkTasksParam 包含要并行执行的工作流任务配置列表的参数的名称
dynamicForkTasksInputParamName 参数的名称,其值应为带有键的映射,作为分叉任务的引用名称和值作为分叉任务的输入

{"inputParameters": {"dynamicTasks": "${taskA.output.dynamicTasksJSON}","dynamicTasksInput": "${taskA.output.dynamicTasksInputJSON}"}"type": "FORK_JOIN_DYNAMIC","dynamicForkTasksParam": "dynamicTasks","dynamicForkTasksInputParamName": "dynamicTasksInput"
}

将taskA的输出视为:

{"dynamicTasksInputJSON": {"forkedTask1": {"width": 100,"height": 100,"params": {"recipe": "jpg"}},"forkedTask2": {"width": 200,"height": 200,"params": {"recipe": "jpg"}}},"dynamicTasksJSON": [{"name": "encode_task","taskReferenceName": "forkedTask1","type": "SIMPLE"},{"name": "encode_task","taskReferenceName": "forkedTask2","type": "SIMPLE"}]
}

执行时,Dynamic fork 任务将调度两个类型为“encode_task”的并行任务,引用名称为“forkedTask1”和“forkedTask2”,输入由_ dynamicTasksInputJSON_指定

Dynamic Fork and Join

Join任务必须遵循FORK_JOIN_DYNAMIC

工作流定义必须包含一个Join任务定义,后跟FORK_JOIN_DYNAMIC任务。但是,考虑到任务的动态特性,此Join不需要joinOn参数。在完成之前,连接将等待所有并行分支任务完成。

与FORK不同,FORK可以执行并行流,每个fork按顺序执行一系列任务,FORK_JOIN_DYNAMIC仅限于每个fork一个任务。但是,并行任务可以是子工作流,允许更复杂的执行流。

Join

Join任务用于等待fork任务生成的一个或多个任务的完成。

参数

名称 描述
joinOn 任务引用名称列表,JOIN将等待完成。

{"joinOn": ["taskRef1", "taskRef3"]
}

Join 任务输出

Fork任务的输出将是一个JSON对象,其中key是任务引用名称,value是fork任务的输出。

子工作流程

子工作流任务允许在另一个工作流中嵌套工作流。

参数

名称 描述
subWorkflowParam 任务引用名称列表,JOIN将等待完成。

{"name": "sub_workflow_task","taskReferenceName": "sub1","inputParameters": {"requestId": "${workflow.input.requestId}","file": "${encode.output.location}"},"type": "SUB_WORKFLOW","subWorkflowParam": {"name": "deployment_workflow","version": 1}
}

执行时,deployment_workflow使用两个输入参数requestId和file执行a 。生成的工作流程完成后,任务标记为已完成。如果子工作流终止或失败,则任务被标记为失败并在配置时重试。

Wait

Wait 任务被实现为保持在IN_PROGRESS状态的门,除非标记为外部触发器COMPLETED或FAILED由外部触发器标记。要使用Wait任务,请将任务类型设置为WAIT

参数
没有要求

Wait 任务的外部触发器

任务资源端点可用于将任务的状态更新为终止状态。

Contrib模块提供SQS集成,外部系统可以将消息放入服务器侦听的预配置队列中。当消息到达时,它们被标记为COMPLETED或FAILED。

SQS队列

  • 可以使用以下API检索服务器用于更新任务状态的SQS队列:
GET /queue
  • 更新任务状态时,消息需要符合以下规范:

    • 消息必须是有效的JSON字符串。
    • 消息JSON应包含一个名为key的键externalId,该值是一个包含以下键的JSONified字符串:
    • workflowId:工作流程的ID
    • taskRefName:应更新的任务引用名称。
    • 每个队列代表一个特定的任务状态,并相应地标记任务。例如,发送到COMPLETED队列的消息将任务状态标记为COMPLETED。
    • 任务的输出随消息更新。

示例SQS有效负载:

{"some_key": "valuex","externalId": "{\"taskRefName\":\"TASK_REFERENCE_NAME\",\"workflowId\":\"WORKFLOW_ID\"}"
}

HTTP

HTTP任务用于通过HTTP调用另一个微服务。

参数

该任务需要一个输入参数http_request,该参数作为任务输入的一部分,具有以下详细信息:

名称 描述
URI 服务的URI。使用vipAddress或包含服务器地址时可以是部分的。
method HTTP方法。其中一个GET,PUT,POST,DELETE,OPTIONS,HEAD
accept 根据服务器的要求接受标头。
contentType 内容类型 - 支持的类型是text / plain,text / html和application / json
headers 要与请求一起发送的其他http标头的映射。
body 请求正文
vipAddress 使用基于发现的服务URL时。

HTTP任务输出

名称 描述
response JSON主体包含响应(如果存在)
headers 响应标题
statusCode 整数状态代码

任务使用vipAddress输入有效负载

{"http_request": {"vipAddress": "examplevip-prod","uri": "/","method": "GET","accept": "text/plain"}
}

任务使用绝对URL输入

{"http_request": {"uri": "http://example.com/","method": "GET","accept": "text/plain"}
}

该任务被标记为FAILED无法完成请求或远程服务器返回非成功的状态代码。

注意

HTTP任务当前仅支持Content-Type作为application / json,并且能够解析文本以及JSON响应。目前不支持XML输入/输出。但是,如果无法将响应解析为JSON或Text,则将字符串表示形式存储为文本值。

Event (事件)

事件任务提供将事件(消息)发布到Conductor或外部事件系统(如SQS)的功能。事件任务对于为工作流和任务创建基于事件的依赖项非常有用。

参数

名称 描述
sink 生成的事件的合格名称。例如,导体或sqs:sqs_queue_name

{"sink": 'sqs:example_sqs_queue_name'
}

使用Conductor作为接收器生成事件时,事件名称遵循以下结构: conductor::

对于SQS,请使用队列的名称而不是URI。Conductor根据名称查找URI。

警告

使用SQS时,将ContribsModule添加到部署中。需要使用AWSCredentialsProvider为Conductor配置模块,以便能够使用AWS API。

支持的接收器

  • Conductor
  • SQS

事件任务输入

给予事件任务的输入可作为有效负载用于已发布的消息。例如,如果消息被放入SQS队列(接收器是sqs),则消息有效负载将是任务的输入。

事件任务输出

event_produced 生成的事件的名称。

服务编排--Conductor 文档翻译 (介绍与基本概念)相关推荐

  1. EdgeX Foundry第一弹 容器运行时docker与服务编排

    本文为Edgex系列第一篇文章,主要探讨容器化相关内容. 一.了解Edgex为什么先学习容器? 1.应用服务调度.Edgex服务众多,包含应用服务.支持服务.设备服务.安全服务.中间件等,使用容器部署 ...

  2. Web服务请求异步化介绍(概念篇)

    前话 在前面的文章中,先给出了Web服务请求异步处理的压力测试报告,从数据角度描述了支持Web请求异步化的容器在不同并发用户下的处理能力及性能消耗.本文从概念的角度对于应用系统异步化,Web服务请求异 ...

  3. 49学习容器管理平台 Docker Swarm 的基本概念和应用,包括节点管理、服务编排

    Docker Swarm 是 Docker 官方提供的容器编排工具,可以管理多个 Docker 节点,并支持自动化扩展.负载均衡等功能.下面是 Docker Swarm 的基本概念和使用方法,包括节点 ...

  4. Helm - Kubernetes服务编排的利器

    Helm介绍 在Kubernetes中部署容器云应用(容器或微服务编排)是一项有挑战性的工作,Helm就是为了简化在Kubernetes中安装部署容器云应用的一个客户端工具.通过Helm能够帮助开发者 ...

  5. 【须弥SUMERU】宜信分布式安全服务编排实践

    概要 1.分布式安全服务编排概念 2.须弥(Sumeru)关键实现思路 3.应用场景 前言 在笔者理解,安全防御的本质之一是增加攻击者的攻击成本,尤其是时间成本,那么从防御的角度来说,如何尽早和及时地 ...

  6. 万字长文:读懂微服务编排利器Zeebe

    万字长文:读懂微服务编排利器Zeebe 1.工作流与微服务编排 1.1工作流 提到工作流,印象里都是OA系统各种请假审批流.事实上,广义上的工作流是对工作流程及其各操作步骤之间业务规则的抽象.概括.描 ...

  7. Docker 安装 命令 数据卷 应用部署 网络优化 Dockerfile 服务编排Compose 私有仓库

    Docker 1. 初识docker 1.1 Docker是什么 了解Docker的前生LXC LXC与docker的关系 Docker 的特点 1.2 为什么使用Docker Docker的优势 缺 ...

  8. docker-compose之环境配置以及服务编排文件的使用讲解

    本文主要讲解以下两块内容:环境配置的作用及常规使用.服务配置文件的解析以及常规使用 必备知识: 在讲解服务配置文件和环境配置文件之前,首先要对docker以及编排工具compose有一定的了解,然后, ...

  9. 流程编排、如此简单-通用流程编排组件JDEasyFlow介绍

    作者:李玉亮 JDEasyFlow是企业金融研发部自研的通用流程编排技术组件,适用于服务编排.工作流.审批流等场景,该组件已开源(https://github.com/JDEasyFlow/jd-ea ...

最新文章

  1. 理解C#值类型与引用类型(摘录)
  2. Ubuntu 安装OpenCV3.0.0
  3. LeetCode-剑指 Offer 15. 二进制中1的个数
  4. mysql复制的配置
  5. js 用下标获取map值_js map方法处理返回数据,获取指定数据简写方法
  6. Zend_Form 创建、校验和解析表单的基础--(手冊)
  7. 巴洛克式和哥特式的区别
  8. linux 终端 收取邮件,linux mail 命令 (收发邮件)
  9. 2022趋势洞见之“云网端融合”
  10. Deltix Round, Autumn 2021 (open for everyone, rated, Div. 1 + Div. 2)
  11. C++ 使用Poco库实现日志操作
  12. vscode win10笔记本 蓝屏_win10蓝屏错误代码大全详解
  13. window系统换为ubuntu系统
  14. 开源操作系统期末知识总结
  15. 比较SQL Server Always On Cluster Mirroring
  16. 电脑必应输入法怎么卸载?
  17. 【Doxygen使用教程】
  18. SpringMvc参数传递
  19. 信息分析——共享经济服务模式的分析与研究
  20. 微信开发工具制作会动的海绵宝宝

热门文章

  1. 数据挖掘关联规则Apriori算法
  2. 不多BB,新年大红包来了!
  3. java IO操作知识点
  4. 【PCBA方案开发设计】电动打气泵产品方案
  5. 名帖175 苏轼 行书《寒食帖》
  6. 蔡康永的毒鸡汤,大家喝点
  7. ZT和老外吵架必备的108句英语!
  8. 安装Docker并配置阿里云镜像加速器
  9. python实现matlab stretchlim函数和imadjust函数
  10. HBuilder实现App icon右上角数字小红点BadgeNumber