十八、可视化任务调度系统airflow
最近工作需要,使用airflow搭建了公司的ETL系统,顺带在公司分享了一次airflow,整理成文,Enjoy!
1. airflow 介绍
1.1 airflow 是什么
Airflow is a platform to programmatically author, schedule and monitor workflows.
airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。airflow 将workflow编排为tasks组成的DAGs,调度器在一组workers上按照指定的依赖关系执行tasks。同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警系统。
1.2 airflow 核心概念
- DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序。
- Operators:可以简单理解为一个class,描述了DAG中一个具体的task具体要做的事。其中,airflow内置了很多operators,如
BashOperator
执行一个bash 命令,PythonOperator
调用任意的Python 函数,EmailOperator
用于发送邮件,HTTPOperator
用于发送HTTP请求,SqlOperator
用于执行SQL命令...同时,用户可以自定义Operator,这给用户提供了极大的便利性。 - Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
- Task Instance:task的一次运行。task instance 有自己的状态,包括"running", "success", "failed", "skipped", "up for retry"等。
- Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如
TaskA >> TaskB
,表明TaskB依赖于TaskA。
通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 workflow了。
1.3 其它概念
- Connections: 管理外部系统的连接信息,如外部MySQL、HTTP服务等,连接信息包括
conn_id
/hostname
/login
/password
/schema
等,可以通过界面查看和管理,编排workflow时,使用conn_id
进行使用。 - Pools: 用来控制tasks执行的并行数。将一个task赋给一个指定的
pool
,并且指明priority_weight
,可以干涉tasks的执行顺序。 - XComs:在airflow中,operator一般(not always)是原子的,也就是说,他们一般独立执行,同时也不需要和其他operator共享信息,如果两个operators需要共享信息,如filename之类的, 推荐将这两个operators组合成一个operator。如果实在不能避免,则可以使用XComs (cross-communication)来实现。XComs用来在不同tasks之间交换信息。
- Trigger Rules:指task的触发条件。默认情况下是task的直接上游执行成功后开始执行,airflow允许更复杂的依赖设置,包括
all_success
(所有的父节点执行成功),all_failed
(所有父节点处于failed或upstream_failed状态),all_done
(所有父节点执行完成),one_failed
(一旦有一个父节点执行失败就触发,不必等所有父节点执行完成),one_success
(一旦有一个父节点执行成功就触发,不必等所有父节点执行完成),dummy
(依赖关系只是用来查看的,可以任意触发)。另外,airflow提供了depends_on_past
,设置为True时,只有上一次调度成功了,才可以触发。
2. 示例
先来看一个简单的DAG。图中每个节点表示一个task,所有tasks组成一个DAG,各个tasks之间的依赖关系可以根据节点之间的线看出来。
DAGs
2.1 实例化DAG
# -*- coding: UTF-8 -*-## 导入airflow需要的modules
from airflow import DAG
from datetime import datetime, timedeltadefault_args = {'owner': 'lxwei','depends_on_past': False, # 如上文依赖关系所示'start_date': datetime(2018, 1, 17), # DAGs都有个参数start_date,表示调度器调度的起始时间'email': ['lxwei@github.com'], # 用于alert'email_on_failure': True,'email_on_retry': False,'retries': 3, # 重试策略'retry_delay': timedelta(minutes=5)
}dag = DAG('example-dag', default_args=default_args, schedule_interval='0 0 * * *')
在创建DAGs时,我们可以显示的给每个Task传递参数,但通过default_args,我们可以定义一个默认参数用于创建tasks。
注意,schedule_interval 跟官方文档不一致,官方文档的方式已经被deprecated。
2.2 定义依赖关系
这个依赖关系是我自己定义的,key表示某个taskId,value里的每个元素也表示一个taskId,其中,key依赖value里的所有task。
"dependencies": {"goods_sale_2": ["goods_sale_1"], # goods_sale_2 依赖 goods_sale1"shop_sale_1_2": ["shop_sale_1_1"],"shop_sale_2_2": ["shop_sale_2_1"],"shop_sale_2_3": ["shop_sale_2_2"],"etl_task": ["shop_info", "shop_sale_2_3", "shop_sale_realtime_1", "goods_sale_2", "shop_sale_1_2"],"goods_sale_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_sale_1_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_sale_realtime_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_sale_2_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_info": ["timelySalesCheck", "productDaySalesCheck"]
}
2.3 定义tasks和依赖关系
首先,实例化operators,构造tasks。如代码所示,其中,EtlTask
、MySQLToWebDataTransfer
、MySQLSelector
是自定义的三种Operator,根据taskType实例化operator,并存放到taskDict中,便于后期建立tasks之间的依赖关系。
for taskConf in tasksConfs:taskType = taskConf.get("taskType")if taskType == "etlTask":task = EtlTask(task_id=taskConf.get("taskId"),httpConnId=httpConn,etlId=taskConf.get("etlId"),dag=dag)taskDict[taskConf.get("taskId")] = taskelif taskType == "MySQLToWebDataTransfer":task = MySqlToWebdataTransfer(task_id = taskConf.get("taskId"),sql= taskConf.get("sql"),tableName=taskConf.get("tableName"),mysqlConnId =mysqlConn,httpConnId=httpConn,dag=dag)taskDict[taskConf.get("taskId")] = taskelif taskType == "MySQLSelect":task = StatusChecker(task_id = taskConf.get("taskId"),mysqlConnId = mysqlConn,sql = taskConf.get("sql"),dag = dag)taskDict[taskConf.get("taskId")] = taskelse:logging.error("error. TaskType is illegal.")
构建tasks之间的依赖关系,其中,dependencies中定义了上面的依赖关系,A >> B
表示A是B的父节点,相应的,A << B
表示A是B的子节点。
for sourceKey in dependencies:destTask = taskDict.get(sourceKey)sourceTaskKeys = dependencies.get(sourceKey)for key in sourceTaskKeys:sourceTask = taskDict.get(key)if (sourceTask != None and destTask != None):sourceTask >> destTask
3. 常用命令
命令行输入airflow -h
,得到帮助文档
backfill Run subsections of a DAG for a specified date range list_tasks List the tasks within a DAG clear Clear a set of task instance, as if they never ran pause Pause a DAG unpause Resume a paused DAG trigger_dag Trigger a DAG run pool CRUD operations on pools variables CRUD operations on variables kerberos Start a kerberos ticket renewer render Render a task instance's template(s) run Run a single task instance initdb Initialize the metadata database list_dags List all the DAGs dag_state Get the status of a dag run task_failed_deps Returns the unmet dependencies for a task instancefrom the perspective of the scheduler. In other words,why a task instance doesn't get scheduled and thenqueued by the scheduler, and then run by an executor). task_state Get the status of a task instance serve_logs Serve logs generate by worker test Test a task instance. This will run a task withoutchecking for dependencies or recording it's state inthe database. webserver Start a Airflow webserver instance resetdb Burn down and rebuild the metadata database upgradedb Upgrade the metadata database to latest version scheduler Start a scheduler instance worker Start a Celery worker node flower Start a Celery Flower version Show the version connections List/Add/Delete connections
其中,使用较多的是backfill、run、test、webserver、scheduler。其他操作在web界面操作更方便。另外,initdb 用于初始化metadata,使用一次即可;resetdb会重置metadata,清除掉数据(如connection数据), 需要慎用。
4. 问题
在使用airflow过程中,曾把DAGs里的task拆分得很细,这样的话,如果某个task失败,重跑的代价会比较低。但是,在实践中发现,tasks太多时,airflow在调度tasks会很低效,airflow一直处于选择待执行的task的过程中,会长时间没有具体task在执行,从而整体执行效率大幅降低。
5. 总结
airflow 很好很强大。如果只是简单的ETL之类的工作,可以很容易的编排。调度灵活,而且监控和报警系统完备,可以很方便的投入生产环节。
6. 参阅
airflow 官网
github
十八、可视化任务调度系统airflow相关推荐
- 【R语言数据科学】(十八):系统聚类和K-Means聚类
[R语言数据科学]
- [Python从零到壹] 五十八.图像增强及运算篇之图像锐化Sobel、Laplacian算子实现边缘检测
欢迎大家来到"Python从零到壹",在这里我将分享约200篇Python系列文章,带大家一起去学习和玩耍,看看Python这个有趣的世界.所有文章都将结合案例.代码和作者的经验讲 ...
- [Python从零到壹] 三十八.图像处理基础篇之图像几何变换(平移缩放旋转)
欢迎大家来到"Python从零到壹",在这里我将分享约200篇Python系列文章,带大家一起去学习和玩耍,看看Python这个有趣的世界.所有文章都将结合案例.代码和作者的经验讲 ...
- [Python从零到壹] 四十八.图像增强及运算篇之形态学开运算、闭运算和梯度运算
欢迎大家来到"Python从零到壹",在这里我将分享约200篇Python系列文章,带大家一起去学习和玩耍,看看Python这个有趣的世界.所有文章都将结合案例.代码和作者的经验讲 ...
- ROS探索总结(十六)(十七)(十八)(十九)——HRMRP机器人的设计 构建完整的机器人应用系统 重读tf 如何配置机器人的导航功能
ROS探索总结(十六)--HRMRP机器人的设计 1. HRMRP简介 HRMRP(Hybrid Real-time Mobile Robot Platform,混合实时移动机器人平台 ...
- 无人驾驶汽车系统入门(二十八)——基于VoxelNet的激光雷达点云车辆检测及ROS实现
无人驾驶汽车系统入门(二十八)--基于VoxelNet的激光雷达点云车辆检测及ROS实现 前文我们提到使用SqueezeSeg进行了三维点云的分割,由于采用的是SqueezeNet作为特征提取网络,该 ...
- OpenCV学习笔记(三十六)——Kalman滤波做运动目标跟踪 OpenCV学习笔记(三十七)——实用函数、系统函数、宏core OpenCV学习笔记(三十八)——显示当前FPS OpenC
OpenCV学习笔记(三十六)--Kalman滤波做运动目标跟踪 kalman滤波大家都很熟悉,其基本思想就是先不考虑输入信号和观测噪声的影响,得到状态变量和输出信号的估计值,再用输出信号的估计误差加 ...
- 罗永浩:我今年四十八岁,还可以承受无数次的失败;iOS14 或将推出系统级「小程序」功能; PyCharm新版发布| 极客头条...
整理 | 屠敏 头图 | CSDN 下载自视觉中国 快来收听极客头条音频版吧,智能播报由标贝科技提供技术支持. 「极客头条」-- 技术人员的新闻圈! CSDN 的读者朋友们早上好哇,「极客头条」来啦, ...
- 【Visual C++】游戏开发四十八 浅墨DirectX教程十六 三维地形系统的实现
分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! 本系列文 ...
- 嵌入式系统设计师学习笔记二十八:嵌入式程序设计③——高级程序设计语言
嵌入式系统设计师学习笔记二十八:嵌入式程序设计③--高级程序设计语言 解释程序和编译程序 编译器的工作阶段示意图 语法错误:非法字符,关键字或标识符拼写错误 语法错误:语法结构出错,if--endif ...
最新文章
- Prometheus+Granfana
- PCA(主成分分析)+SVD(奇异值分解)+区别
- 【转载】Linux安装配置Qt
- 【Spring-AOP-学习笔记-3】@Before前向增强处理简单示例
- EF 更新条目时出错。有关详细信息,请参见内部异常。
- Activiti保存.png 流程图片文件且解决idea中保存图片时显示中文乱码的解决方法
- 深度对话“百度超级链” | 链上存证,司法效率助推器
- 企业认证CMMI都需要那些流程?
- 【Elasticsearch教程8】Mapping字段类型之keyword
- 高阶导数的运算法则 与 莱布尼茨公式
- Linux服务器开发,手把手设计实现epoll
- 用python爬虫制作图片下载器(超有趣!)
- go 实现抓包 ,盗取浏览器接口信息|反反爬虫
- 华为校招java题目_20200812 华为校招笔试 java
- 2019年的敬业福要贬值了,来看看百分百获取敬业福的终极大招。
- Java 实现分布式定时任务
- 【重大喜讯】六度共识云通过华为云鲲鹏生态兼容认证
- luogu4711 「化学」相对分子质量
- 安卓通讯录管理软件_安卓系统50个你不知道的使用窍门!每个都值得你去收藏!...
- 【Axure视频教程】日期时间函数