python etl 大猩猩_Airflow教程-使用Airflow实现ETL调度
一、Airflow是什么
airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。airflow 将workflow编排为由tasks组成的DAGs(有向无环图),调度器在一组workers上按照指定的依赖关系执行tasks。同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警系统。
二、Airflow的核心概念
DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序。
Operators:airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令...同时,用户可以自定义Operator,这给用户提供了极大的便利性。可以理解为用户需要的一个操作,是Airflow提供的类
Tasks:Task 是 Operator的一个实例
Task Instance:由于Task会被重复调度,每次task的运行就是不同的task instance了。Task instance 有自己的状态,包括"running", "success", "failed", "skipped", "up for retry"等。
Task Relationships:DAGs中的不同Tasks之间可以有依赖关系
三、使用AirFlow完成天级的任务调度
说了这么多抽象的概念,估计看官还是云里雾里,下面就直接举个例子来说明吧。
1. 安装airflow
Airflow可以约等于只支持linux和mac,Windows上极其难装,笔者放弃了.
安装也很简单,以下代码来自官方文档,使用了Python的pip管理:
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
# install from pypi using pip
pip install apache-airflow
# initialize the database
airflow initdb
# start the web server, default port is 8080
airflow webserver -p 8080
# start the scheduler
airflow scheduler
# visit localhost:8080 in the browser and enable the example dag in the home page
安装好了以后访问localhost:8080即可访问ui界面
2. 基本配置
需要创建~/airflow/dags目录,这个目录是默认的存放DAG的地方,想修改的话可以修改~/airflow/airflow.cfg文件
修改airflow的数据库
airflow会使用sqlite作为默认的数据库,此情况下airflow进行调度的任务都只能单个的执行.在调度任务量不大的情况下,可以使用sqlite作为backend.如果想scale out的话,需要修改配置文件,官方推荐使用mysql或者postgresql作为backend数据库.
3. 使用PostgresOperator执行SQL完成ETL任务
通过搜集信息,了解到PostgresOperator能执行SQL,并且还支持传参数.能解决大多数ETL任务中的传参问题.传参使用的是Python的Jinjia模块.
创建DAG
首先创建一个test_param_sql.py文件.内容如下:
from datetime import datetime, timedelta
import airflow
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Variable
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 7, 26), #start_date会决定这个DAG从哪天开始生效
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
# Variable是Airflow提供的用户自定义变量的功能,在UI界面的Admin -> Variable下可以进行增删改查,此处笔者定义了sql_path作为存放sql文件的地方
tmpl_search_path = Variable.get("sql_path")
dag = airflow.DAG(
'test_param_sql',
schedule_interval=timedelta(days=1), # schedule_interval是调度的频率
template_searchpath=tmpl_search_path,
default_args=args,
max_active_runs=1)
test_param_sql = PostgresOperator(
task_id='test_param_sql',
postgres_conn_id='postgres_default',
sql='param_sql.sql',
dag=dag,
params={'period': '201905'},
pool='pricing_pool')
match_finish = DummyOperator(
task_id='match_finish',
dag=dag
)
test_param_sql >> match_finish
准备要执行的Sql文件
创建test_sql.sql文件.
SQL文件会被Jinjia解析,可以使用一些宏来实现时间的替换 例
{{ ds }} 会被转换为当天的 YYYY-MM-DD 格式的日期
{{ ds_nodash }} 会被转换为当天的 YYYYMMDD的格式的日期
在本例里则是通过{{params.period}} 取到了 DAG上传入的参数,
insert into test.param_sql_test
select * from test.dm_input_loan_info_d
where period = {{params.period}};
整体的目录结构如下
dags/
test_param_sql.py
sql/
test_sql.sql
测试dag是否正确
可以使用 airflow test dag_id task_id date 进行测试,测试会执行Operator,Operator指定的行为会进行调度. 但是不会将执行的行为记录到Airflow的数据库里
发布
把文件放到~/airflow/dags目录下,sql文件不要放在dags目录下,可以找其他地方(比如同级目录),配置好上文说到的Variable,能找到即可.笔者的理解是,airflow会扫描dags目录下的内容,并尝试解析成dag,如果有不能成功解析的内容,ui界面上会有错误提示,导致dag显示不出来等问题.
其他有用的信息
如何在dag.py里引入其他的本地python模块
需要把本地的python模块放到一个zip文件里,例如:
my_dag1.py
my_dag2.py
package1/init.py
package1/functions.py
然后把这个zip文件放到dags目录下,才能被正确解析
pooling可以控制任务的并行度,如果给DAG指定了一个不存在的pooling,任务会一直处于scheduled的状态,不继续进行
python etl 大猩猩_Airflow教程-使用Airflow实现ETL调度相关推荐
- python etl 大猩猩_用于ETL的Python数据转换工具详解
ETL的考虑 做 数据仓库系统,ETL是关键的一环.说大了,ETL是数据整合解决方案,说小了,就是倒数据的工具.回忆一下工作这么些年来,处理数据迁移.转换的工作倒 还真的不少.但是那些工作基本上是一次 ...
- Blender中的Python脚本介绍学习教程
Blender中的Python脚本介绍学习教程 MP4 |视频:h264,1280×720 |音频:AAC,48000 Hz 语言:英语+中英文字幕(根据原英文字幕机译更准确)|大小解压后:1.63 ...
- 分享一个python cookbook的在线教程地址
分享一个python cookbook的在线教程地址: http://python3-cookbook.readthedocs.org/zh_CN/latest/ 翻译者:熊能 转载于:https:/ ...
- python商业爬虫教程_廖雪峰老师的Python商业爬虫课程 Python网络爬虫实战教程 体会不一样的Python爬虫课程...
廖雪峰老师的Python商业爬虫课程 Python网络爬虫实战教程 体会不一样的Python爬虫课程 1.JPG (53.51 KB, 下载次数: 1) 2019-8-9 08:15 上传 2.JPG ...
- scrapy爬虫储存到mysql_详解Python之Scrapy爬虫教程NBA球员数据存放到Mysql数据库
获取要爬取的URL 爬虫前期工作 用Pycharm打开项目开始写爬虫文件 字段文件items # Define here the models for your scraped items # # S ...
- python是什么编程教程-Python 从不懂到入门
基础知识 什么是编程语言 可以简单的理解为一种计算机和人都能识别的语言 什么是 Python 一种编程语言 安装 Python 开发环境 何为开发环境 简单理解为运行 Python 的平台 去 Pyt ...
- python自学平台-Python学习交流平台与教程推荐
Python学习交流平台与教程推荐 目录 一.有编程问题怎么办? 1 Stack Overflow 2 Github 3 CSDN-专业IT技术社区 二.Python教程 1 跟我读Python文档 ...
- python编程入门经典教程-2020年5个经典python编程入门视频教程推荐学习
近三年,python在一二线城市,越来越火热了!企业的需求也是python编程学习者的动力!python学习网整理了适合于零基础朋友的5个python编程入门视频课程,这些都是python编程入门经典 ...
- python画曲线-python绘制动态曲线教程
从txt种获取数据 并且通过动态曲线显示 import numpy as np import matplotlib.pyplot as plt import matplotlib.animation ...
最新文章
- weui移动商城源码.zip_商城网站建设二次开发需要多久?难度大吗?
- MUI:字符串和json数据的相互转换
- (转).Net/C# 获取字节流编码
- C语言中函数可变参数解析
- golang管道channel的遍历和关闭:应该使用for...range来遍历
- [精品]CSAPP Bomb Lab 解题报告(三)
- 如何使用React和Redux前端创建Rails项目
- qtablewidget 选中不改变背景_C4D制作不一样的核壳结构
- node-webki 基本程序结构
- sqlserver 遇到以零作除数错误的处理 不报错的解决方法
- azw3转换为pdf_干货:如何Java 将 Word 文档转换为 PDF
- HTML固定内容的css代码,普通文本重内容的HTML/CSS设计
- python语言写九九乘法表_怎么使用Python语言写一个九九乘法表?
- 基于mysql的可视化日志管理——loganalyzer
- android videoview 拉伸,android - 在ExoPlayer中轻按全屏时,视频会拉伸和旋转 - 堆栈内存溢出...
- 我的深圳真实驾考经历
- 王者荣耀微信转qq服务器,王者荣耀转区qq转微信可以吗 qq转区微信可以吗
- PM的基石(1) - PMP (国际项目管理师)
- 网游点卡接口说明文档
- 投标任性,围标串标一查实,扫黑除恶,现在投标必须签订一样协议