airflow

Operators:

基本可以理解为一个抽象化的task, Operator加上必要的运行时上下文就是一个task. 有三类Operator:

  1. Sensor(传感监控器), 监控一个事件的发生.
  2. Trigger(或者叫做Remote Excution), 执行某个远端动作, (我在代码中没有找到这个类别)
  3. Data transfer(数据转换器), 完成数据转换

Tasks: task代表DAG中的一个节点, 它其实是一个BaseOperator子类.

Task instances, 即task的运行态实例, 它包含了task的status(成功/失败/重试中/已启动)

Job: Airflow中Job很少提及, 但在数据库中有个job表, 需要说明的是Job和task并不是一回事, Job可以简单理解为Airflow的批次, 更准确的说法是同一批被调用task或dag的统一代号. 有三类Job, 分别SchedulerJob/BackfillJob/LocalTaskJob, 对于SchedulerJob和BackfillJob, job指的是指定dag这次被调用的运行时代号, LocalTaskJob是指定task的运行时代号.

Hooks:

Hook是airflow与外部平台/数据库交互的方式, 一个Hook类就像是一个JDBC driver一样. airflow已经实现了jdbc/ftp/http/webhdfs很多hook. 要访问RDBMS数据库 有两类Hook可供选择, 基于原生Python DBAPI的Hook和基于JDBC的Hook

Connections:

我们的Task需要通过Hook访问其他资源, Hook仅仅是一种访问方式, 就像是JDBC driver一样, 要连接DB, 我们还需要DB的IP/Port/User/Pwd等信息. 这些信息不太适合hard code在每个task中, 可以把它们定义成Connection, airflow将这些connection信息存放在后台的connection表中. 我们可以在WebUI的Admin->Connections管理这些连接.

Variables:

Variable 没有task_id/dag_id属性, 往往用来定义一些系统级的常量或变量, 我们可以在WebUI或代码中新建/更新/删除Variable. 也可以在WebUI上维护变量.
Variable 的另一个重要的用途是, 我们为Prod/Dev环境做不同的设置,

XComs:

XCom和Variable类似, 用于Task之间共享一些信息. XCom 包含task_id/dag_id属性, 适合于Task之间传递数据, XCom使用方法比Variables复杂些. 比如有一个dag, 两个task组成(T1->T2), 可以在T1中使用xcom_push()来推送一个kv, 在T2中使用xcom_pull()来获取这个kv.

分支的支持:

airflow有两个基于PythonOperator的Operator来支持dag分支功能.

ShortCircuitOperator, 用来实现流程的判断. Task需要基于ShortCircuitOperator, 如果本Task返回为False的话, 其下游Task将被skip; 如果为True的话, 其下游Task将会被正常执行. 尤其适合用在其下游都是单线节点的场景.

BranchPythonOperator, 用来实现Case分支. Task需要基于BranchPythonOperator, airflow会根据本task的返回值(返回值是某个下游task的id),来确定哪个下游Task将被执行, 其他下游Task将被skip.

Trigger Rules:

可以为dag中的每个task都指定它的触发条件, 这里的触发条件有两个维度, 以T1&T2->T3 这样的dag为例:

  • 一个维度是: 要根据dag上次运行T3的状态确定本次T3是否被调用, 由
    DAG的default_args.depends_on_past参数控制, 为True时, 只有上次T3运行成功, 这次T3才会被触发
  • 另一个维度是: 要根据前置T1和T2的状态确定本次T3是否被调用, 由T3.trigger_rule参数控制, 有下面6种情形, 缺省是all_success.

dag提交-python配置任务

  • DAG 基本参数配置

    default_args = {'owner': 'airflow','depends_on_past': False,   # 是否依赖上一个自己的执行状态 'start_date': datetime.datetime(2019, 1, 1),'email': ['wangzhenjun@gmail.com'], # 需要在airflow.cfg中配置下发件邮箱'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': datetime.timedelta(minutes=5),# 'end_date': datetime(2020, 1, 1),   # 结束时间,注释掉也就会一直执行下去
    }
    
  • DAG对象

    • 设置dag的执行周期:schedule_interval.该参数可以接收cron 表达式和datetime.timedelta对象,另外airflow还预置了一些调度周期。
    preset Description cron
    None Don’t schedule, use for exclusively “externally triggered” DAGs
    @once Schedule once and only once
    @hourly Run once an hour at the beginning of the hour 0 * * * *
    @daily Run once a day at midnight 0 0 * * *
    @weekly Run once a week at midnight on Sunday morning 0 0 * * 0
    @monthly Run once a month at midnight of the first day of the month 0 0 1 * *
    @yearly Run once a year at midnight of January 1 0 0 1 1 *
    t1 = BashOperator(   #任务类型是bash
    task_id='echoDate', #任务id
    bash_command='echo date > /home/datefile', #任务命令
    dag=dag)
    
  • 完整样例

    # coding: utf-8from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime, timedelta# 定义默认参数
    default_args = {'owner': 'wangzhenjun',  # 拥有者名称'depends_on_past': False,   # 是否依赖上一个自己的执行状态'start_date': datetime(2019, 1, 15, 10, 00),  # 第一次开始执行的时间,为格林威治时间,为了方便测试,一般设置为当前时间减去执行周期'email': ['wangzhenjun01@corp.netease.com'],  # 接收通知的email列表'email_on_failure': True,  # 是否在任务执行失败时接收邮件'email_on_retry': True,  # 是否在任务重试时接收邮件'retries': 3,  # 失败重试次数'retry_delay': timedelta(seconds=5)  # 失败重试间隔
    }# 定义DAG
    dag = DAG(dag_id='hello_world',  # dag_iddefault_args=default_args,  # 指定默认参数# schedule_interval="00, *, *, *, *"  # 执行周期,依次是分,时,天,月,年,此处表示每个整点执行schedule_interval=timedelta(minutes=1)  # 执行周期,表示每分钟执行一次
    )"""
    1.通过PythonOperator定义执行python函数的任务
    """
    # 定义要执行的Python函数1
    def hello_world_1():current_time = str(datetime.today())with open('/root/tmp/hello_world_1.txt', 'a') as f:f.write('%s\n' % current_time)assert 1 == 1  # 可以在函数中使用assert断言来判断执行是否正常,也可以直接抛出异常
    # 定义要执行的Python函数2
    def hello_world_2():current_time = str(datetime.today())with open('/root/tmp/hello_world_2.txt', 'a') as f:f.write('%s\n' % current_time)# 定义要执行的task 1
    t1 = PythonOperator(task_id='hello_world_1',  # task_idpython_callable=hello_world_1,  # 指定要执行的函数dag=dag,  # 指定归属的dagretries=2,  # 重写失败重试次数,如果不写,则默认使用dag类中指定的default_args中的设置
    )
    # 定义要执行的task 2
    t2 = PythonOperator(task_id='hello_world_2',  # task_idpython_callable=hello_world_2,  # 指定要执行的函数dag=dag,  # 指定归属的dag
    )t2.set_upstream(t1)  # t2依赖于t1;等价于 t1.set_downstream(t2);同时等价于 dag.set_dependency('hello_world_1', 'hello_world_2')
    # 表示t2这个任务只有在t1这个任务执行成功时才执行,
    # 或者
    t1 >> t2"""
    2.通过BashOperator定义执行bash命令的任务
    """
    hello_operator = BashOperator(   #通过BashOperator定义执行bash命令的任务task_id='sleep_task',depends_on_past=False,bash_command='echo `date` >> /home/py/test.txt',dag=dag
    )
    """
    其他任务处理器:
    3.EmailOperator : 发送邮件
    4.HTTPOperator : 发送 HTTP 请求
    5.SqlOperator : 执行 SQL 命令
    """
    

airflow的学习使用相关推荐

  1. 史上最全的“大数据”学习资源

    2019独角兽企业重金招聘Python工程师标准>>> 资源列表: 关系数据库管理系统(RDBMS) 框架 分布式编程 分布式文件系统 文件数据模型 Key -Map 数据模型 键- ...

  2. ApacheCN 学习资源汇总 2019.3

    [主页] apachecn.org [Github]@ApacheCN 暂时下线: 社区 暂时下线: cwiki 知识库 自媒体平台 微博:@ApacheCN 知乎:@ApacheCN CSDN 简书 ...

  3. 独家 | 一份数据工程师必备的学习资源,干货满满(附链接)

    作者:PRANAV DAR 翻译:张玲 校对:车前子 本文约6500字,建议阅读15分钟. 本文首先详细介绍了数据工程的职责.与数据科学家之间的差别以及其不同的工作角色,然后重点列出了很多与核心技能相 ...

  4. 听听阿里老哥对算法工程师技术学习路线的建议

    点击上方,选择星标或置顶,不定期资源大放送! 阅读大概需要15分钟 Follow小博主,每天更新前沿干货 来源丨https://zhuanlan.zhihu.com/p/192633890 前言 知乎 ...

  5. 干货丨不可错过的大数据学习资源推荐

    今天为大家推荐一些翻译整理的大数据相关的非常棒的学习资源,希望能给大家一些帮助. 关系数据库管理系统(RDBMS) MySQL:世界最流行的开源数据库: PostgreSQL:世界最先进的开源数据库: ...

  6. 谁是深度学习框架一哥?2022 年,PyTorch 和 TensorFlow 再争霸

    来源:机器学习研究组订阅 用PyTorch还是TensorFlow,对于大部分深度学习从业者来说真是一个头疼的问题.最近Reddit上有个帖子从三个方面对比了两个框架,结果竟然是平手? 你用PyTor ...

  7. 【Spark深入学习 -14】Spark应用经验与程序调优

    ----本节内容------- 1.遗留问题解答 2.Spark调优初体验 2.1 利用WebUI分析程序瓶颈 2.2 设置合适的资源 2.3 调整任务的并发度 2.4 修改存储格式 3.Spark调 ...

  8. python教程是用什么博客写的-Python 有哪些好的学习资料或者博客?

    Python语言本身的话,建议不要去看很多资料.我推荐以下三本,分别对应入门.进阶.专家三个级别:官方文档 : 直接看最新的就行了,没必要看python2的老文档.入门的话没必要全看完,直接看语言部分 ...

  9. LiveVideoStack线上分享第四季(五):基于Airflow的视频编码平台

    10月17日 19:30,LiveVideoStack线上分享第四季,第五期,我们邀请到了Hotstar 视频编码高级工程师 常谦详细介绍如何基于Airflow构建视频编码平台,通过直观地展示.跟进批 ...

最新文章

  1. Linux cuda cudann的安装
  2. 自学python语言-自学Python1.1-简介
  3. 字符串阵列分别输出元素的索引,原值和长度
  4. 算法提高课-搜索-DFS之连通性模型-AcWing 1112. 迷宫:dfs和bfs两种解法
  5. 连云港职业技术学院有计算机系吗,连云港职业技术学院电子信息工程技术专业...
  6. MyBatisPLus3.x中代码生成器自定义数据库表字段类型转换
  7. 分布式锁(Redisson)-从零开始,深入理解与不断优化
  8. linux怎么同时查看两个文件,MultiTail - 在单个Linux终端中同时监视多个文件
  9. php 栈实现历史记录后退,栈:如何实现浏览器的前进和后退功能
  10. 区间DP初探 P1880 [NOI1995]石子合并
  11. 新装系统或者安装显卡出现的卡住,循环登录等问题
  12. 三星Galaxy Fold全球翻车后 推迟发售时间进一步改进
  13. okhttp框架学习
  14. 3,graph语法学习
  15. 如何用ASP读写TXT文本文件中的内容
  16. 山特UPS电源注意事项
  17. codesensor:将代码转化为ast后再转化为文本向量
  18. 无心插柳OR志在必得?阿里推“来往”的意图
  19. 怎么把m4a转换成mp3格式?
  20. USACO 2018 January Contest

热门文章

  1. [英语阅读]沙特禁不雅车牌 USA首当其中
  2. UUID实现自定义随机数长度
  3. 第一次使用Oracle连接登陆、新建oracle用户、PLSQL Developer连接oracle数据库、Navicat for Oracle连接数据库
  4. highside 介绍
  5. mpi4py 中的收集操作
  6. 嘻哈艺术家和设计师Karan使用ThisIsKay.xyz来凸显他的音乐
  7. 编辑距离算法详解:Levenshtein Distance算法
  8. 基于java的记事本工具
  9. java中正斜杠与反斜杠
  10. 深圳市高层次人才奖励补贴及申报条件重点介绍,补贴600万