使用AirFlow调度MaxCompute
简介: airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务
背景
airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务
一、环境准备
- Python 2.7.5 PyODPS支持Python2.6以上版本
- Airflow apache-airflow-1.10.7
1.安装MaxCompute需要的包
pip install setuptools>=3.0
pip install requests>=2.4.0
pip install greenlet>=0.4.10 # 可选,安装后能加速Tunnel上传。
pip install cython>=0.19.0 # 可选,不建议Windows用户安装。
pip install pyodps
注意:如果requests包冲突,先卸载再安装对应的版本
2.执行如下命令检查安装是否成功
python -c "from odps import ODPS"
二、开发步骤
1.在Airflow家目录编写python调度脚本Airiflow_MC.py
# -*- coding: UTF-8 -*-
import sys
import os
from odps import ODPS
from odps import options
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from configparser import ConfigParser
import time
reload(sys)
sys.setdefaultencoding('utf8')
#修改系统默认编码。
# MaxCompute参数设置
options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
cfg = ConfigParser()
cfg.read("odps.ini")
print(cfg.items())
odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retry_delay': timedelta(minutes=5),
'start_date':datetime(2020,1,15)
# 'email': ['airflow@example.com'],
# 'email_on_failure': False,
# 'email_on_retry': False,
# 'retries': 1,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
def read_sql(sqlfile):
with io.open(sqlfile, encoding='utf-8', mode='r') as f:
sql=f.read()
f.closed
return sql
def get_time():
print '当前时间是{}'.format(time.time())
return time.time()
def mc_job ():
project = odps.get_project() # 取到默认项目。
instance=odps.run_sql("select * from long_chinese;")
print(instance.get_logview_address())
instance.wait_for_success()
with instance.open_reader() as reader:
count = reader.count
print("查询表数据条数:{}".format(count))
for record in reader:
print record
return count
t1 = PythonOperator (
task_id = 'get_time' ,
provide_context = False ,
python_callable = get_time,
dag = dag )
t2 = PythonOperator (
task_id = 'mc_job' ,
provide_context = False ,
python_callable = mc_job ,
dag = dag )
t2.set_upstream(t1)
2.提交
python Airiflow_MC.py
3.进行测试
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks Airiflow_MC
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks Airiflow_MC --tree
#测试task
airflow test Airiflow_MC get_time 2010-01-16
airflow test Airiflow_MC mc_job 2010-01-16
4.运行调度任务
登录到web界面点击按钮运行
5.查看任务运行结果
1.点击view log
2.查看结果
原文链接
本文为阿里云原创内容,未经允许不得转载。
使用AirFlow调度MaxCompute相关推荐
- Airflow调度 Design
Airflow调度 Design 一.创建DAGS 从Oracle里的原始数据导入hive里的ods层,再从ods层进行数据清洗到dwd层 在airflow路径下创建dags文件夹,创建如下四个pyt ...
- AirFlow调度执行Talend ETL任务
AirFlow调度平台简介 airflow 是一个编排.调度和监控工作流的平台,由Airbnb开源,现在在Apache Software Foundation 孵化.airflow将工作流编排为tas ...
- Airflow调度工具简介和使用
Airflow是airbnb家的基于DAG(有向无环图)的任务管理系统, 最简单的理解就是一个高级版的crontab.它解决了crontab无法解决的任务依赖问题. 本文将介绍 Airflow 这一款 ...
- airflow调度方案
主要服务 dag:包括 task.operator.sensor 模块 配置 dag.task.operator schedule:包括 control 模块 启动 暂停 恢复等 控制dag任务启动 ...
- 大数据调度平台Airflow(五):Airflow使用
目录 Airflow使用 一.Airflow调度Shell命令 1.首先我们需要创建一个python文件,导入需要的类库 2.实例化DAG 3.定义Task 4.设置task依赖关系 5.上传pyth ...
- 【构建PB级准实时分析引擎】 -- azkaban、airflow、dolphinscheduler、quartz、xxl-job 、oozie调度方案评测
评测说明 本评测方案主要从自身实用的角度出发,对比主流的开源任务调度方案,评选出最适合自己的方案,以作为自己的调度工具. 评测目标 短期目标 可调度多台机器的shell/python脚本: 可管理 ...
- 如何基于MaxCompute快速打通数据仓库和数据湖的湖仓一体实践
简介: MaxCompute 是面向分析的企业级 SaaS 模式云数据仓库,以 Serverless 架构提供快速.全托管的在线数据仓库服务,消除了传统数据平台在资源扩展性和弹性方面的限制,最小化用户 ...
- airflow部署和使用示例
全栈工程师开发手册 (作者:栾鹏) 架构系列文章 参考:https://www.jianshu.com/p/089c56b4ec14 airflow介绍 https://lxwei.github.io ...
- Airflow基础架构简介
Aireflow 处理依赖的方式 Airflow的核心概念,是DAG(有向无环图),DAG由一个或多个TASK组成,而这个DAG正是解决了任务间的依赖问题.Task A 执行完成后才能执行 Task ...
最新文章
- php 面向对象开发 类的学习 一
- 腐蚀单机怎么进_华强北淘来的iPhone6S,进水后严重腐蚀,小伙飞线拯救又赚到了!...
- 休息五分钟,学几个bash快捷键
- svn版本库浏览器_svn:版本库xxx不存在||svn:No such revision xxx的问题
- 网络安全04_互联网发展史_网线+网卡+协议栈_中继器_集线器_网桥_路由器_AC/AP_防火墙_流控_家庭网络_小型创业公司网络_园区网_政务网络_数据中心网络拓扑_电信网/互联网_Mac地址
- 3-3-ServletContext接口
- 如何实现全选checkbox效果
- Java多线程之可见性与原子性——synchronized VS volatile
- VMware安装Windows Server 2003提示Operating System not found
- 常见8种无线通信协议简介
- 腾讯云服务器登录宝塔面板
- 各类学科入门书籍推荐
- Docker容器访问外部世界
- 转:管理者必备技能之全局观:找出复杂环境中的秩序
- 我的世界服务器物品怎么上锁,我的世界怎么给箱子上锁_我的世界箱子上锁指令用法及解锁方法_玩游戏网...
- python echo(msg) 字符串_python的subprocess模块
- html 必应网搜索,教程:创建自定义搜索网页 - 必应自定义搜索 - Azure Cognitive Services | Microsoft Docs...
- “1万起投,年化达8%”?天安金交中心卖力“吆喝”的产品,是“香”还是“坑”?
- 网络安全期末复习 - 20190625
- CSP认证历年真题题解 (Python)
热门文章
- file 选择的图片作为背景图片_酷炫!用Python把桌面变成实时更新的地球图片
- 计算机背景为什么总是黑色,电脑背景变成黑色的了是为什么
- oracle dbms_crypto,Oracle的dbms_obfuscation_toolkit加密解密数据
- python创建access表_Access创建表
- 如何提高Java代码的可重用性?
- oracle (+)的可读性,Oracle基础笔记一
- jsp工程防止外部注入_XPATH注入详解|OWASP Top 10安全风险实践(五)
- html调用python_flask之模板html中调用python函数方法
- java打印已经被加载的类_使用URLClassLoader加载类,不会报错,但被加载类中的内容也没有打印出来...
- kafka python教程_kafka python 指定分区消费