airflow sql_alchemy_conn mysql_airflow的安装和使用 - 完全版
之前试用了azkaban一小段时间,虽然上手快速方便,但是功能还是太简单,不够灵活。
Airflow使用代码来管理任务,这样应该是最灵活的,决定试一下。
我是python零基础,在使用airflow的过程中可谓吃尽了苦头。。好歹最后实现所有要求,两三周的时间没有白费
看完这篇文章,可以达到如下目标:
安装airflow
如何修改界面右上角显示的时间到当前时区
如何添加任务
调试任务python代码
如何启动spark任务
如何限定任务同时执行的个数
如何手动触发任务时传入参数
如何在airflow界面上重新运行任务
如何查看任务log及所有任务的运行记录
如何在任务失败时发邮件(腾讯企业邮箱)
如何在任务失败时发消息到企业微信
以下过程已经过去了有一段时间,当时记录的也不一定很全面,如果有的不能执行,请留言告知。
安装airflow
系统:Ubuntu 16
python: 3.7
airflow版本:1.10.10
保持pip3到最新版本
pip3 install --upgrade pip
安装使用pip3
切换到root用户执行: pip3 install apache-airflow
你以为敲完这条命令就可以去把个妹或者撩个汉再回来就装好了,请坐下。
我碰到的错误:
Python.h not found
运行
sudo apt-get install python3.7-dev
某些依赖版本不对:
ERROR: pendulum 1.4.4 has requirement python-dateutil<3.0.0.0,>=2.6.0.0, but you'll have python-dateutil 2.4.2 which is incompatible.
ERROR: pandas 0.25.3 has requirement python-dateutil>=2.6.1, but you'll have python-dateutil 2.4.2 which is incompatible.
运行
pip install python-dateutil --upgrade
哪个包版本不对,更新哪个
数据库使用mysql
相信你看这个文章的时候应该不会还没有尝试装过airflow,所以airflow.cfg这个文件已经有了,在哪也很清楚
修改airflow.cfg:
sql_alchemy_conn = mysql://airflow:password@jjh1810:3306/airflow
使用root用户连接到mysql:
create user 'airflow'@'%' identified by '123';
grant all privileges on airflow.* to 'airflow'@'%';
flush privileges;
set explicit_defaults_for_timestamp = 1; --这一行至关重要
再使用airflow用户登录mysql:
create database airflow CHARACTER SET = utf8mb4;
初始化数据库
airflow initdb
这时候会报mysql依赖问题,如:
No module named '_mysql'
这个时候终于可以启动airflow了:
** 启的时候不要使用root用户,回到普通用户 **
airflow webserver -p 8080
airflow scheduler
如何修改界面右上角显示的时间到当前时区
相信应该所有人都会干这个事情:
哟?airflow里有个时区的配置,改了应该就好了
default_timezone = Asia/Shanghai
然后去刷一下页面
还是UTC嘛,这配置骗人的吗?
那么看这一篇文章吧:
Airflow 修改时区
** 改的时候注意:** python的代码是根据缩进来区别代码块的,所以拷代码的时候一定要注意缩进没有问题
如何添加任务
在~/airflow下创建dags文件夹,把.py文件放入即可
airflow启动了一个叫 DagFileProcessorManager 的进程来扫描dags目录,一但有文件个数变更,或者内容变更都会很快被检测到
这个进程有相应的log文件,可以提供一些文件处理错误信息
调试任务python代码
关闭schedule
这个时候已经开始写任务的python代码了,对于python小白与刚开始接触airflow的小哥或老哥来说,简直就是痛不欲生
有一个配置在调试的时候比较实用,就是关掉任务的schudle,只有手动触发才会执行。
把dag的schedule_interval设置为None
schedule_interval=None
python小白实用技巧
还有python代码里单引号和双引号是等价的,如果有引号嵌套可以分开使用,避免麻烦的转义,如:
hour = '{{ dag_run.conf["hour"] }}'
Jinja template
反正我第一眼看到这个东西,特别是官方教程里那一大块的模板文本的时候,心里只有一个字: WTF?!
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
其实也不是很复杂,这个玩意理解了以后还是比较方便的。除了在代码中使用普通的python变量或者airflow内置变量之外,很多时候在字符串中也需要使用airflow的内置变量会比较灵活和方便,Jinja template提供的就是这个功能。
如何启动spark任务
airflow是很强大很灵活的,官方提供了功能丰富的各种插件,各种JobOperator。来,简单好用童叟无欺的SparkSubmitOperator了解一下?
我的需求很简单,可以提交任务到不同的spark集群。这样就要求不能使用机器默认的hadoop和spark环境变量,必须为每个任务指定独立的配置文件。不知道是不是有大牛一次性成功的,反正我是试了无数次,一句话在心里不停的重复:“这什么吊东西!”
小可愚钝,google能搜出来的都看过了,怎么都不行,死活都不行,主要是环境变量不对。
调用linux脚本执行spark-submit是最灵活方便的办法
转念一想,还是传统的spark提交方式最好用啊,就是执行sh脚本利用spark-submit进行提交,这样spark就与airflow无关了,而且不管是环境变量还是参数配置都最灵活,传入参数也很方便。
这样只要使用普通的BashOperator就可以了,而且airflow应该专注如何调度任务,而不是还要兼顾任务的配置,就算SparkSubmitOperator可以工作,也是使用sh脚本的方式更好。
如何限定任务同时执行的个数
像spark任务通常使用的资源都会比较多,如果dag执行开始时间到当前时间间隔很长,或是暂停很长时间再开启,那么一开启的时候schedule会瞬间创建大量任务,提交到默认的pool,这个pool默认的大小是128。这样肯定是大家不希望看到的。
一个解决办法,为每个spark任务创建单独的pool,大小设置为1,这样一个spark任务一次就只能有一个在运行状态,后面都排队。
界面上操作:[Admin] -> [Pools],slots设为1。
然后在spark task的operator里添加参数:pool='PoolName'
如何手动触发任务时传入参数
假设任务是每小时触发一次,处理24小时前的数据,也就是今天8点处理昨天8点这一个小时的数据。除了schedule自动执行的逻辑,我们还希望可以手动触发任务,并且指定某个小时重新处理。
** 注: ** 这个功能只有1.10.10才支持,就是在界面上点击 [Trigger DAG] 的时候可以填入参数(固定为Json格式)。
先来看一下最终的结果
hour='{{ dag_run.conf["hour"] if dag_run.conf["hour"] else (execution_date - macros.timedelta(hours=24)).strftime("%Y%m%d%H") }}'
这里使用了Jinja template,通过dag_run对象可以获取界面上传入的参数,如果没有这个参数(也就是通过schedule触发),那么执行默认的逻辑(这里是24之前),并且格式化时间与界面输入保持一致。
如何在airflow界面上重新运行任务
这个功能默认的Executor是不支持的,需要安装CeleryExecutor,而CeleryExecutor一个存放消息的框架,这里我们选择rabbitmq。
假定rabbitmq已经装好。
安装请看官方文档:Celery Executor
配置
executor = CeleryExecutor
borker_url = amqp://user:password@host:port
** 注:** 如果rabbitmq是集群模式,这里也是挑一台出来使用。指定所有节点我还没有配置成功,如果有会配置的,请留言告知。
如何在界面上重跑任务呢?
界面上点击dag进入dag管理界面,点击[Tree View]。
Task每次运行都会用一个小方块来表示,点击小方块,再点击 [Run] 按钮就可以了。
** 注:** Tree View 这里最多只显示固定数量的历史记录,如果再早的时间只能通过点击 [Trigger DAG] 再指定参数运行。
任务运行时间的问题
这里有一个关键的问题,在界面上点击8个小时以前任务执行,那么任务触发的时候,运行的是8个小时之前的时间,还是当前时间呢?
如果我们是通过之前的hour变量的来指定时间的,那任务运行的时间就是8个小时之前,任务当时触发的时间。为什么呢?
我们在Jinja template里使用的变量 dag_run, execute_date这个并不是运行时变量,每次task触发,相关的上下文信息都会存到数据库里。所以8个小时之后我们再重新运行task的时候,是从数据库中读取当时的上下文信息,而不是现在的信息。
如何查看任务log及所有任务的运行记录
查看所有任务的运行记录
DAG界面里的 [Graph View] -> 点击任务 -> [Task Instances]
主菜单里的 [Browser] -> [Task Instances]
查看log
这就比较简单了
点击 [Tree View] 里的小方块,可以查看log
Task Intances 列表最后一列,也可以查看log
如何在任务失败时发邮件(腾讯企业邮箱)
首先DAG的default_args需要配置
'email':['name@mail.com'],
'email_on_failure': True
修改airflow.cfg
smtp_host = smtp.exmail.qq.com
smtp_starttls = False
smtp_ssl = True
smtp_port = 465
smtp_mail_from = name@mail.com
smtp_user = name@mail.com
smtp_password = password
实话说,这些配置都搞了好久才试出来,这种体验简直让人欲哭无泪。当然,身为一个码畜哭个什么,到这里已经被python和airflow的种问题折磨很多天了,素质三连走起来。
首先 smtp_ssl = True, smtp_port = 465 是一个重点。再次smtp_mail_from和smtp_user都使用同一个有效的邮箱地址。
如何在任务失败时发消息到企业微信
有时候觉得发邮件可能还不够,想把失败消息发到企业微信,这样更能及时的发现问题。
添加企业微信依赖
airflow官方支持钉钉的通知,企业微信通知就是根据这个改动而来,代码有两个py文件:airflow企业微信支持
把这两个py文件放到 dags 目录,也就是和dag的py文件放在一起。
使用方法:
3. 在企业微信群中创建机器人
右键点击群
选择 [Add Group Robot],并创建
获取机器人的key:右键 [View Information],可以得到一个URL
https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxx-xx-xx
这个key的值就是机器人的ID
在airflow中创建企业微信的连接:[主菜单] -> [Admin] -> [Connections],配置填写:
Conn Id: wechat_robot
Conn Type: HTTP
Host: https://qyapi.weixin.qq.com
Password: 前面得到的key值,也就是机器人的ID
在代码中使用
代码中import WechatOperator
from wechat_operator import WechatOperator
创建 failure call 方法:
def failure_callback(context):
dagConf = context['dag_run'].conf
taskInst = context['task_instance']
hour = None
if 'hour' in dagConf:
hour = dagConf['hour']
else:
hour = (taskInst.execution_date - timedelta(hours=24)).strftime('%Y%m%d%H')
message = 'Airflow任务失败:\n' \
'DAG: {}\n' \
'Task: {}\n' \
'时间: {}\n' \
.format(taskInst.dag_id,
taskInst.task_id,
hour)
return WechatOperator(
task_id='wechat_task',
wechat_conn_id='wechat_robot',
message_type='text',
message=message,
at_all=True,
).execute(context)
这个代码应该还是很好懂的,主要是为了创建 WechatOperator 对象。
有个逻辑来重新获取执行时间(这里必须使用代码,而不能直接使用Jinja template),为的是在通知里面可以直接看到是哪个时间出错了。
default_args添加 failure callback配置
'on_failure_callbak': failure_callback
结束语
到这里,总算是搭建好一个可以正式投入生产使用的环境了。
Airflow虽然很灵活,但是想真正满足生产需求,还是经历了不少痛苦。特别是要求会使用python,加上airflow官方文档也不是很详细,这两点导致入门曲线太陡峭了。
airflow sql_alchemy_conn mysql_airflow的安装和使用 - 完全版相关推荐
- airflow sql_alchemy_conn mysql_airflow使用mysql数据库,LocalExecutor并发调度
mysql-airflow 在mysql上执行 create database airflow; -- 创建数据库 GRANT all privileges on airflow.* TO 'airf ...
- 简述安装Tsurugi Linux发行版
Tsurugi Linux是一个基于Ubuntu Linux的数字取证和事件响应(DFIR)开源项目.本文介绍了如何将该发行版用作操作系统上的虚拟机. 数字取证和事件响应是两种互补的活动,不仅需要深入 ...
- Windows下安装Z3的Python3版
文章目录 Windows下安装Z3的Python3版 pip 安装(不推荐,很慢) 使用微软官方构建好的DLL(推荐,快速) Windows下安装Z3的Python3版 GitHub官方仓库地址:Z3 ...
- DSVS7050签名服务器的网站,吉大正元数字签名服务器-安装部署手册(COM版 VCTK_S接口)2.1.1.doc...
吉大正元数字签名服务器-安装部署手册(COM版 VCTK_S接口)2.1.1 数字签名服务器v2.1.1 安装部署手册 (VSTK接口 COM版) V2.1.1 长 春 吉 大 正 元 信 息 技 术 ...
- 安装RVDS2.2破解版
安装RVDS2.2破解版 1. 我的RVDS2.2文件时iso文件,所以需要先安装虚拟光驱DAEMON Tools 2. 通过安装的DAEMON Tools来加载RVDS2.2 ...
- nodejs下载安装教程(XP版)
Node.js 下载安装教程(XP版) 参考自:https://www.cnblogs.com/zhouyu2017/p/6485265.html(win10版) 一.安装环境 Windows Xp( ...
- mac电脑安装python_【mac电脑怎么安装python】mac版python安装教程
mac 下怎么安装python 搭建开发环境 调用了 TThreadPoolServer 的 serve 方法后,server 进入阻塞监听状态,塞在 TServerSocket 的 accept 方 ...
- Visual C++6.0安装教程(win10版)及“应用程序无法正常启动(0x0150002)”解决办法
Visual C++6.0安装教程(win10版)及"应用程序无法正常启动(0x0150002)"解决办法
- CentOS 8安装JFrog Artifactory社区版解决方案
CentOS 8安装JFrog Artifactory社区版解决方案 基本概念 解决方案 基础环境 安装Artifactory 通过zip包方式安装 下载Artifactory 安装Artifacto ...
最新文章
- K12(在线学习的平台)
- web自动化测试从入门到持续集成(selenium webdriver)
- 红茶一杯话Binder(传输机制篇_下)
- adobe photoshop cs5已停止工作
- 牛客 - 建通道(思维)
- C语言开发笔记(七)const和指针
- E - More is better (并查集)
- 用java编写录音机类_java实现录音机
- vsftp如何确定地址_VSFTP配置参数详解
- 进程调度时间片轮转c语言,时间片轮转调度算法的C语言模拟实现
- 十个3D 游戏引擎,html h5 js
- python压缩HTML文件,python压缩javascript文件代码
- php判断是否是浏览器请求,php 判断请求是否来自“手机浏览器”
- linux增加分区大小,新增硬盘扩容Linux下的分区大小
- Android11/Android Q分屏功能实现
- NBA运动员球员数据分析
- 为什么用线程池?解释下线程池参数?
- 伪装游戏软件变成计算机,注意!Steam上这款游戏是伪装的病毒:把玩家PC变成矿机...
- GATT协议学习笔记
- 浅谈黑盒测试和白盒测试
热门文章
- 2021年大数据Kafka(十一):❤️Kafka的消费者负载均衡机制和数据积压问题❤️
- 2021年大数据Spark(十八):Spark Core的RDD Checkpoint
- gitflow分支管理模型
- Python import 的用法
- R.Java文件报错
- android studio 通过界面快速查看md5
- ios5中apple增加了解析JSON的api——NSJSONSerialization。
- 2022-2028年中国音像制品行业投资分析及前景预测报告
- MySQL 学习笔记(16)— 子查询(单行单列、一行多列、多行多列、 ALL、ANY、SOME 运算符、EXISTS 操作符)
- vs2015添加管理员权限