AirFlow调度平台简介

airflow 是一个编排、调度和监控工作流的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。airflow将工作流编排为tasks组成的有向无环图(DAGs),调度器在一组workers上按照指定的依赖关系执行tasks。同时,airflow提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警系统

AirFlow基础概念

Airflow主要是将工作流的相关信息定义到一个Python文件中,airflow根据文件中的定义信息执行工作流,在Airflow pipeline定义中,主要涉及两个类: DAG,Operator

DAG : 有向无环图,它将定义的任务按照依赖关系组织起来

Operator:用来描述每个任务具体做的事,airflow内置了很多operator,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令…同时,用户可以自定义Operator,这给用户提供了极大的便利性。

通过DAG和Operator结合起来就可以构建复杂的工作流了

Talend简介

Talend是一个开源的ELT任务构建工具,可以通过简单拖拽的方式设计复杂的ETL任务并自动生成Java代码,设计完成后可以通过构建导出ETL任务java源码和可直接执行jar包及执行jar包的shell和bat文件。

接下来,我们通过Talend 桌面软件构建一个简单的ELT任务,然后通过airflow UI执行这个ETL任务

Airflow调度ETL任务实例

1.通过Talend 设计、构建ETL任务

打开Talend设计如下简单的ETL任务,将一个CSV文件导入到数据库中:

然后,将这个ETL任务构建并导出,目录结构如下:

我们使用在linux环境中执行shell文件的方式执行ETL任务。

2.定义Airflow pipeline

我们在airflow 指定的dags目录下新建python文件来定义airflow pipeline

代码如下:

import airflow

from datetime import datetime

from airflow.operators.bash_operator import BashOperator

from airflow.models import DAG

from airflow.utils.email import send_email

#shell命令执行.sh文件

job_commd = "/home/airflow/jobs/AirFlowTestJob_0.1/AirFlowTestJob/AirFlowTestJob_run.sh '/home/airflow/jobs/AirFlowTestJob_0.1/AirFlowTestJob/'"

#dag中的一些参数设置

default_args = {

'owner': 'Airflow',

'depends_on_past': False,

'start_date': datetime(2018, 7, 23),

'email': ['VALID_EMAIL_ID'],

'email_on_failure': True,

'email_on_success': True,

'retries':3,

}

dag = DAG('csv_to_data', default_args=default_args)

#定义ETL任务,多个任务时定义多个Operator

t1 = BashOperator(

task_id='csv_to_database',

bash_command=job_commd,

dag=dag

)

3.调度执行ETL任务

完成以上工作,我们启动airflow webserver服务器后,便可以在airflow UI界面看到我们定义的ETl任务csv_to_data,可以进行相应操作。

airflow实现Java定时任务,AirFlow定时调度执行Talend ETL任务相关推荐

  1. AirFlow调度执行Talend ETL任务

    AirFlow调度平台简介 airflow 是一个编排.调度和监控工作流的平台,由Airbnb开源,现在在Apache Software Foundation 孵化.airflow将工作流编排为tas ...

  2. java 多线程——一个定时调度的例子

    java 多线程 目录: Java 多线程--基础知识 Java 多线程 -- synchronized关键字 java 多线程--一个定时调度的例子 java 多线程--quartz 定时调度的例子 ...

  3. java quartz timer_Java 定时调度TimerQuartz

    目录 三.Quartz 一.Java定时任务介绍 在Java中,用得比较多的有两种,一个是Timer,一个是Quartz: 其中Timer是这是jdk自带的类库,一般用来实现简单的定时调度,由一个后台 ...

  4. linux调度不执行,linux crond.d定时调度执行一段时间后不执行

    问题一:/etc/cron.d下的定时任务执行一段时间后不执行 背景:在/etc/cron.d下加了一个定时任务,名为dispute_cron(使用root用户编辑),内容: 28 15 * * * ...

  5. java定时任务cron表达式每周执行一次的坑

    java springboot 利用schedule执行定时任务是很常用的功能,有一个很常用的网站就是在线Cron表达式生成器,但是在这个网站最近遇到一个坑. 我要每周四执行一次,我把我写的表达式在这 ...

  6. 【java】java定时任务1秒调度一次会怎么样

    文章目录 1.概述 2.源码 M.扩展 1.概述 因为需要,我在代码中写了一个程序一秒调度一次,执行一些不是很短的任务,但是具体耗时不知道,但是这个服务在现场出问题了,然后代码审核的时候,觉得这里每秒 ...

  7. java 定时任务每日晚上凌晨执行数据统计

    例如:统计每日会员账户余额 Page page = new Page(); page.setPageSize(1000); page.setPageNum(1); do{//获取所有用户余额com.g ...

  8. Java定时任务工具详解之Timer篇

    Java定时任务调度工具详解 什么是定时任务调度? ◆ 基于给定的时间点,给定的时间间隔或者给定的执行次数自动执行的任务. 在Java中的定时调度工具? ◆ Timer ◆Quartz Timer和Q ...

  9. Quartz定时调度

    工作中我们经常会用到定时调度,也就是说每隔多久去执行一下某个方法.公司的系统要每天24点定时去抽取his数据库中的数据,所用的框架是Springmvc+mybatis.采用的 Q是uartz(版本2. ...

  10. java定时任务,每天定时执行任务

    java定时任务,每天定时执行任务.以下是这个例子的全部代码. public class TimerManager {//时间间隔private static final long PERIOD_DA ...

最新文章

  1. pytest框架安装(MacOS)
  2. C++用顶层函数重载操作符
  3. 【网址收藏】Centos7.3离线(rpm方式)安装mysql服务
  4. Ubuntu中update-alternatives命令(版本切换)
  5. 用户级线程与内核级线程
  6. Java Script基础(九) 下拉列表对象
  7. Kafka如何实现每秒上百万的超高并发写入?
  8. 解决:flask-sqlalchemy.exc.DataError: (pymysql.err.DataError) (1406数据库字段超出长度错误)
  9. 性能测试测试环境与生产环境_不在生产中测试? 在生产中进行测试!
  10. 应对全场景AI框架部署挑战,MindSpore“四招”让你躺平
  11. 补码乘法实验原理_你真的理解补码吗?
  12. java 内存 min_Java内存区域
  13. C++编程的 42 条建议
  14. java山上挑水_java编程:山上有一口缸可以装50升水,现在有15升。老和尚叫小和尚下山挑水,每次挑5升,要挑几次...
  15. 实时视频直播平台的技术要点详解
  16. 微信小程序(uniapp)获取用户位置信息及选择位置
  17. python判断是否为变位词_[python]变位词的判别与分类
  18. 脚本小子--------python脚本循环导出H3C华三核心防火墙context虚拟防火墙配置(你凝视bug,bug也在凝视你)
  19. 关于relief算法选择特征的问题
  20. SLUB和SLAB的区别

热门文章

  1. php json encode 参数,PHP json_encode函数的参数说明与用法
  2. CDD数据库文件制作(二)——DTC配置
  3. 【转载】通过搜狗站长平台手动向搜狗搜索提交文章加快收录
  4. 安装protobuf可能遇到的问题
  5. ES:java.nio.file.AccessDeniedException: /opt/shan/es/config/elasticsearch.keystore
  6. Docker实用指令整理
  7. 2019-01-01T00:00:00.000Z 这种时间日期类型格式是属于:格林尼治时间
  8. 激活工业数据价值 2020年建成国家工业互联网大数据中心
  9. 支付宝免签 个人支付宝到银行卡
  10. 通常网站当中的关键词密度如何控制呢