主要内容

  • 1. Operators 简介
  • 2. BaseOperator 简介
  • 3. BashOperator
  • 4. PythonOperator
  • 5. SSHOperator
  • 6. HiveOperator
  • 7. 如何自定义Operator

搭建 airflow 的目的还是为了使用,使用离不开各种 Operators,本文主要介绍以下几点

1. Operators 简介

Operators 允许生成特定类型的任务,这些任务在实例化时成为 DAG 中的任务节点。所有的 Operator 均派生自 BaseOperator,并以这种方式继承许多属性和方法。
Operator 主要有三种类型:

  • 执行一项操作或在远程机器上执行一项操作。
  • 将数据从一个系统移动到另一个系统
  • 类似传感器,是一种特定类型 Operator,它将持续运行,直到满足某种条件。例如在 HDFS 或 S3 中等待特定文件到达,在 Hive 中出现特定的分区或一天中的特定时间,继承自 BaseSensorOperator。

2. BaseOperator 简介

所有的 Operator 都是从 BaseOperator 派生而来,并通过继承获得更多功能。这也是引擎的核心,所以有必要花些时间来理解 BaseOperator 的参数,以了解 Operator 基本特性。

先看一下构造函数的原型:

class airflow.models.BaseOperator(task_id, owner='Airflow', email=None, email_on_retry=True, email_on_failure=True, retries=0, retry_delay=datetime.timedelta(0, 300), retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, schedule_interval=None, depends_on_past=False, wait_for_downstream=False, dag=None, params=None, default_args=None, adhoc=False, priority_weight=1, weight_rule=u'downstream', queue='default', pool=None, sla=None, execution_timeout=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, trigger_rule=u'all_success', resources=None, run_as_user=None, task_concurrency=None, executor_config=None, inlets=None, outlets=None, *args, **kwargs)

这里有很多参数,一些参数通过字面意思已经了解其大至含义,可查阅官方文档查看详细解释,这里不再详述,需要注意的是参数 start_date。start_date 决定了任务第一次运行的时间,最好的实践是设置 start_date 在 schedule_interval 的附近。比如每天跑的任务开始日期设为’2018-09-21 00:00:00’,每小时跑的任务设置为 ‘2018-09-21 05:00:00’,airflow 将 start_date 加上 schedule_interval 作为执行日期。需要注意的是任务的依赖需要及时排除,例如任务 A 依赖任务 B,但由于两者 start_date 不同导致执行日期不同,那么任务 A 的依赖永远不会被满足。如果你需要执行一个日常任务,比如每天下午 2 点开始执行,你可以在 DAG中使用 cron 表达式

schedule_interval="0 14 * * *"

或者考虑使用 TimeSensor 或 TimeDeltaSensor。由于所有的 Operator 都继承自 BaseOperator,因此 BaseOperator的参数也是其他Operator的参数。

3. BashOperator

官方提供的 DAG 示例-tutorial 就是一个典型的 BashOperator,调用 bash 命令或脚本,传递模板参数就可以参考 tutorial

"""
### Tutorial Documentation
Documentation that goes along with the Airflow tutorial located
[here](http://pythonhosted.org/airflow/tutorial.html)
"""
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {'owner': 'airflow','depends_on_past': False,'start_date': airflow.utils.dates.days_ago(2),'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),# 'queue': 'bash_queue',# 'pool': 'backfill',# 'priority_weight': 10,# 'end_date': datetime(2016, 1, 1),# 'wait_for_downstream': False,# 'dag': dag,# 'adhoc':False,# 'sla': timedelta(hours=2),# 'execution_timeout': timedelta(seconds=300),# 'on_failure_callback': some_function,# 'on_success_callback': some_other_function,# 'on_retry_callback': another_function,# 'trigger_rule': u'all_success'
}dag = DAG('tutorial',default_args=default_args,description='A simple tutorial DAG',schedule_interval=timedelta(days=1))# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(task_id='print_date',   #这里也可以是一个 bash 脚本文件bash_command='date',dag=dag)t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""dag.doc_md = __doc__t2 = BashOperator(task_id='sleep',depends_on_past=False,bash_command='sleep 5',dag=dag)templated_command = """
{% for i in range(5) %}echo "{{ ds }}"echo "{{ macros.ds_add(ds, 7)}}"echo "{{ params.my_param }}"
{% endfor %}
"""t3 = BashOperator(task_id='templated',depends_on_past=False,bash_command=templated_command,params={'my_param': 'Parameter I passed in'},dag=dag)t2.set_upstream(t1)
t3.set_upstream(t1)

这里 t1 和 t2 都很容易理解,直接调用的是 bash 命令,其实也可以传入带路径的 bash 脚本, t3 使用了 Jinja 模板,"{% %}" 内部是 for 标签,用于循环操作。"{{ }}" 内部是变量,其中 ds 是执行日期,是 airflow 的宏变量,params.my_param 是自定义变量。根据官方提供的模板,稍加修改即可满足我们的日常工作所需。

4. PythonOperator

PythonOperator 可以调用 Python 函数,由于 Python 基本可以调用任何类型的任务,如果实在找不到合适的 Operator,将任务转为 Python 函数,再使用 PythonOperator 也是一种选择。下面是官方文档给出的 PythonOperator 使用的样例。

from __future__ import print_function
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAGimport time
from pprint import pprintargs = {'owner': 'airflow','start_date': airflow.utils.dates.days_ago(2)
}dag = DAG(dag_id='example_python_operator', default_args=args,schedule_interval=None)def my_sleeping_function(random_base):"""This is a function that will run within the DAG execution"""time.sleep(random_base)def print_context(ds, **kwargs):pprint(kwargs)print(ds)return 'Whatever you return gets printed in the logs'run_this = PythonOperator(task_id='print_the_context',provide_context=True,python_callable=print_context,dag=dag)# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
for i in range(10):task = PythonOperator(task_id='sleep_for_' + str(i),python_callable=my_sleeping_function,op_kwargs={'random_base': float(i) / 10},dag=dag)task.set_upstream(run_this)

通过以上代码我们可以看到,任务 task 及依赖关系都是可以动态生成的,这在实际使用中会减少代码编写数量,逻辑也非常清晰,非常方便使用。PythonOperator 与 BashOperator 基本类似,不同的是 python_callable 传入的是 Python 函数,而后者传入的是 bash 指令或脚本。通过 op_kwargs 可以传入任意多个参数。

5. SSHOperator

在实际的任务调度中,任务大多分布在多台机器上,如何调用远程机器的任务呢,这时可以简单地使用 SSHOperator 来调用远程机器上的脚本任务。SSHOperator 使用 ssh 协议与远程主机通信,需要注意的是 SSHOperator 调用脚本时并不会读取用户的配置文件,最好在脚本中加入以下代码以便脚本被调用时会自动读取当前用户的配置信息。

. ~/.profile
或
. ~/.bashrc

下面是一个 SSHOperator 的任务示例:

task_crm = SSHOperator(ssh_conn_id='ssh_crmetl', # 指定conn_idtask_id='crmetl-filesystem-check',command='/home/crmetl/bin/monitor/filesystem_monitor.sh', # 远程机器上的脚本文件dag=dag
)

这里 ssh_crmetl 是一个连接ID,是在 airflow webserver 界面配置的,配置方法如下:
打开 webserver 点击 Admin 菜单下的 Connections 项,如下图所示:

然后选择 Create 来新建一个 ssh 连接,输入连接 id,ip 地址,用户名密码等信息,如下图所示:

保存之后,即可使用 ssh_crmetl 来调用对应主机上的脚本了。

6. HiveOperator

hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的 sql 查询功能,可以将 sql 语句转换为 MapReduce 任
务进行运行。在 airflow 中调用 hive 任务,首先需要安装依赖

pip install apache-airflow[hive]

下面是使用示例:

t1 = HiveOperator(task_id='simple_query',hql='select * from cities',dag=dag)

常见的 Operator 还有 DockerOperator,OracleOperator,MysqlOperator,DummyOperator,SimpleHttpOperator 等使用方法类似,不再一一介绍。

7. 如何自定义Operator

如果官方的 Operator 仍不满足需求, 那么我们就自己开发一个 Operator。 开发 Operator 比较简单,继承 BaseOperator 并实现 execute 方法即可:

from airflow.models import BaseOperatorclass MyOperator(BaseOperator):def __init__(*args, **kwargs):super(MyOperator, self).__init__(*args, **kwargs)def execute(self, context):###do something here

除了 execute 方法外,还可以实现以下方法:
on_kill: 在 task 被 kill 的时候执行。

airflow 是支持Jinjia模板语言的,那么如何在自定义的 Operator 中加入Jinjia模板语言的支持呢?
其实非常简单,只需要在自定义的Operator类中加入属性

template_fields = (attributes_to_be_rendered_with_jinja)

即可,例如官方的 bash_operator中是这样的:

template_fields = ('bash_command', 'env')

这样,在任务执行之前,airflow 会自动渲染 bash_command 或 env 中的属性再执行任务。

总结:airflow 官方已经提供了足够多,足够实用的 Operator,涉及数据库、分布式文件系统、http连接,远程任务等等,可以参考 airflow 的operators 源码,已基本满足日常工作需要,个性的任务可以通过自定义Operator 来实现,更为复杂的业务可以通过 restful apt 的形式提供接口,然后再使用 SimpleHttpOperator 来实现任务的调用。

airflow 的使用之 Operators 介绍相关推荐

  1. Angular Template expression operators介绍

    Angular template expression language是对JavaScript语法的增强和补充. pipe operator 用法和操作系统里讲到的管道类似. 例子: Title t ...

  2. Airflow搭建与使用

    Airflow 是一个编排.调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化.Airflow 将workflow编排为由tasks ...

  3. DataWorks搬站方案:Airflow作业迁移至DataWorks

    简介:DataWorks提供任务搬站功能,支持将开源调度引擎Oozie.Azkaban.Airflow的任务快速迁移至DataWorks.本文主要介绍如何将开源Airflow工作流调度引擎中的作业迁移 ...

  4. Airflow使用MsSqlHook与数据库交互

    文章目录 Airflow使用MsSqlHook与数据库交互 使用情况说明 编写示例代码 返回结果集示例 思考 Airflow使用MsSqlHook与数据库交互 使用情况说明 使用Microsoft S ...

  5. 任务调度神器 airflow 之初体验

      airflow 是 apache下孵化项目,是纯 Python 编写的一款非常优雅的开源调度平台.github 上有 9161 个星,是非常受欢迎的调度工具.airflow 使用 DAG (有向无 ...

  6. Airflow2.2.5任务调度工具

    Airflow2.2.5任务调度工具 一.Airflow介绍 1.基本概念 Airflow是一个以编程方式创作,可进行调度和监控工作流程的开源平台.基于有向无环图(DAG),airflow可以定义一组 ...

  7. Morgan Stanley面经

    背景 2019年至2021年期间,我在上海eBay工作,周围不少同事都是从摩根斯坦利过来的,总体上看很优秀.加上在极客时间的课程讲师里,就有原来从摩根斯坦利出来的,于是对这个公司产生了浓厚的兴趣,再加 ...

  8. windbg学习笔记

    写与2014年6月3日 Default Workspace 默认工作空间 implicit Workspace 隐含工作空间 Named Workspace 命名工作空间 explicit Works ...

  9. 如何简单实现ELT?

    在商业中,数据通常和业务.企业前景以及财务状况相关,有效的数据管理可以帮助决策者快速有效地从大量数据中分析出有价值的信息.数据集成(Data Integration)是整个数据管理流程中非常重要的一环 ...

  10. airflow2.0.0

    airflow airflow airflow快速开始 airflow 基本使用 文档 airflow启动 实例 运行脚本 web服务器验证tutorial文件 命令行元数据验证tutorial文件 ...

最新文章

  1. (转) 技术揭秘:海康威视PASCAL VOC2012目标检测权威评测夺冠之道
  2. 基础练习 十进制转十六进制
  3. 数据挖掘方法论与工程化思考
  4. Python老男孩 day15 函数(二) 局部变量与全局变量
  5. linux脚本格式模板,Linux Shell 常见的命令行格式简明总结
  6. How to check if a ctrl + enter is pressed on a control?
  7. Ubnutu20.04安装Tensorflow
  8. 机器学习 深度学习 ai_人工智能,机器学习和深度学习。 真正的区别是什么?...
  9. __init__在python中的用法_如何打“我爱你”的摩斯密码
  10. 计算机毕业设计-网上购书系统【代码讲解+安装调试+文档指导】
  11. 直播/点播系统快速搭建指南
  12. Excel如何分组排序
  13. 【PPT】幻灯片放映中常用快捷键
  14. Android 仿照美团城市选择,微信小程序仿美团城市选择
  15. Android Studio||动态改变xml图片位置+背景/旋转+平移/AnimationSet/java读取drawable图
  16. Talk | 清华大学交叉信息研究院助理教授杜韬:利用计算方法探究流固耦合
  17. 你真的懂one-hot编码吗?
  18. 深入理解 Python yield
  19. .NET 6 史上最全攻略
  20. vhdl7我学习得第一个fpga项目——倒计时显示

热门文章

  1. 根据经纬度查找附近的人计算公式
  2. css横向导航栏布局,CSS04--对齐、 布局、导航栏
  3. python爬虫爬取网页图片存储本地_Python爬虫抓取糗百的图片,并存储在本地文件夹...
  4. matlab 点顺时针排序,怎样对平面中的点进行顺时针或者逆时针排序
  5. 频繁gc是什么意思_linux查看是否频繁gc
  6. 推荐一个理解线性代数的视频
  7. 【电子设计大赛】2017 年全国大学生电子设计竞赛 仪器和主要元器件清单
  8. Direct2D 介绍
  9. Flink 1.13(八)CDC
  10. Matlab 多项式展开或化简(即提取公因式