简介: airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

背景

airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

一、环境准备

  • Python 2.7.5  PyODPS支持Python2.6以上版本
  • Airflow apache-airflow-1.10.7

1.安装MaxCompute需要的包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10  # 可选,安装后能加速Tunnel上传。

pip install cython>=0.19.0  # 可选,不建议Windows用户安装。

pip install pyodps

注意:如果requests包冲突,先卸载再安装对应的版本

2.执行如下命令检查安装是否成功

python -c "from odps import ODPS"

二、开发步骤

1.在Airflow家目录编写python调度脚本Airiflow_MC.py

# -*- coding: UTF-8 -*-

import sys

import os

from odps import ODPS

from odps import options

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from datetime import datetime, timedelta

from configparser import ConfigParser

import time

reload(sys)

sys.setdefaultencoding('utf8')

#修改系统默认编码。

# MaxCompute参数设置

options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}

cfg = ConfigParser()

cfg.read("odps.ini")

print(cfg.items())

odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))

default_args = {

'owner': 'airflow',

'depends_on_past': False,

'retry_delay': timedelta(minutes=5),

'start_date':datetime(2020,1,15)

# 'email': ['airflow@example.com'],

# 'email_on_failure': False,

# 'email_on_retry': False,

# 'retries': 1,

# 'queue': 'bash_queue',

# 'pool': 'backfill',

# 'priority_weight': 10,

# 'end_date': datetime(2016, 1, 1),

}

dag = DAG(

'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))

def read_sql(sqlfile):

with io.open(sqlfile, encoding='utf-8', mode='r') as f:

sql=f.read()

f.closed

return sql

def get_time():

print '当前时间是{}'.format(time.time())

return time.time()

def mc_job ():

project = odps.get_project()  # 取到默认项目。

instance=odps.run_sql("select * from long_chinese;")

print(instance.get_logview_address())

instance.wait_for_success()

with instance.open_reader() as reader:

count = reader.count

print("查询表数据条数:{}".format(count))

for record in reader:

print record

return count

t1 = PythonOperator (

task_id = 'get_time' ,

provide_context = False ,

python_callable = get_time,

dag = dag )

t2 = PythonOperator (

task_id = 'mc_job' ,

provide_context = False ,

python_callable = mc_job ,

dag = dag )

t2.set_upstream(t1)

2.提交

python Airiflow_MC.py

3.进行测试

# print the list of active DAGs

airflow list_dags

# prints the list of tasks the "tutorial" dag_id

airflow list_tasks Airiflow_MC

# prints the hierarchy of tasks in the tutorial DAG

airflow list_tasks Airiflow_MC --tree

#测试task

airflow test Airiflow_MC get_time 2010-01-16

airflow test Airiflow_MC mc_job 2010-01-16

4.运行调度任务

登录到web界面点击按钮运行

5.查看任务运行结果

1.点击view log

2.查看结果

原文链接
本文为阿里云原创内容,未经允许不得转载。

使用AirFlow调度MaxCompute相关推荐

  1. Airflow调度 Design

    Airflow调度 Design 一.创建DAGS 从Oracle里的原始数据导入hive里的ods层,再从ods层进行数据清洗到dwd层 在airflow路径下创建dags文件夹,创建如下四个pyt ...

  2. AirFlow调度执行Talend ETL任务

    AirFlow调度平台简介 airflow 是一个编排.调度和监控工作流的平台,由Airbnb开源,现在在Apache Software Foundation 孵化.airflow将工作流编排为tas ...

  3. Airflow调度工具简介和使用

    Airflow是airbnb家的基于DAG(有向无环图)的任务管理系统, 最简单的理解就是一个高级版的crontab.它解决了crontab无法解决的任务依赖问题. 本文将介绍 Airflow 这一款 ...

  4. airflow调度方案

    主要服务 dag:包括 task.operator.sensor 模块 配置 dag.task.operator schedule:包括 control 模块 启动 暂停 恢复等 控制dag任务启动 ...

  5. 大数据调度平台Airflow(五):Airflow使用

    目录 Airflow使用 一.Airflow调度Shell命令 1.首先我们需要创建一个python文件,导入需要的类库 2.实例化DAG 3.定义Task 4.设置task依赖关系 5.上传pyth ...

  6. 【构建PB级准实时分析引擎】 -- azkaban、airflow、dolphinscheduler、quartz、xxl-job 、oozie调度方案评测

    评测说明   本评测方案主要从自身实用的角度出发,对比主流的开源任务调度方案,评选出最适合自己的方案,以作为自己的调度工具. 评测目标 短期目标 可调度多台机器的shell/python脚本: 可管理 ...

  7. 如何基于MaxCompute快速打通数据仓库和数据湖的湖仓一体实践

    简介: MaxCompute 是面向分析的企业级 SaaS 模式云数据仓库,以 Serverless 架构提供快速.全托管的在线数据仓库服务,消除了传统数据平台在资源扩展性和弹性方面的限制,最小化用户 ...

  8. airflow部署和使用示例

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 参考:https://www.jianshu.com/p/089c56b4ec14 airflow介绍 https://lxwei.github.io ...

  9. Airflow基础架构简介

    Aireflow 处理依赖的方式 Airflow的核心概念,是DAG(有向无环图),DAG由一个或多个TASK组成,而这个DAG正是解决了任务间的依赖问题.Task A 执行完成后才能执行 Task ...

最新文章

  1. php 面向对象开发 类的学习 一
  2. 腐蚀单机怎么进_华强北淘来的iPhone6S,进水后严重腐蚀,小伙飞线拯救又赚到了!...
  3. 休息五分钟,学几个bash快捷键
  4. svn版本库浏览器_svn:版本库xxx不存在||svn:No such revision xxx的问题
  5. 网络安全04_互联网发展史_网线+网卡+协议栈_中继器_集线器_网桥_路由器_AC/AP_防火墙_流控_家庭网络_小型创业公司网络_园区网_政务网络_数据中心网络拓扑_电信网/互联网_Mac地址
  6. 3-3-ServletContext接口
  7. 如何实现全选checkbox效果
  8. Java多线程之可见性与原子性——synchronized VS volatile
  9. VMware安装Windows Server 2003提示Operating System not found
  10. 常见8种无线通信协议简介
  11. 腾讯云服务器登录宝塔面板
  12. 各类学科入门书籍推荐
  13. Docker容器访问外部世界
  14. 转:管理者必备技能之全局观:找出复杂环境中的秩序
  15. 我的世界服务器物品怎么上锁,我的世界怎么给箱子上锁_我的世界箱子上锁指令用法及解锁方法_玩游戏网...
  16. python echo(msg) 字符串_python的subprocess模块
  17. html 必应网搜索,教程:创建自定义搜索网页 - 必应自定义搜索 - Azure Cognitive Services | Microsoft Docs...
  18. “1万起投,年化达8%”?天安金交中心卖力“吆喝”的产品,是“香”还是“坑”?
  19. 网络安全期末复习 - 20190625
  20. CSP认证历年真题题解 (Python)

热门文章

  1. file 选择的图片作为背景图片_酷炫!用Python把桌面变成实时更新的地球图片
  2. 计算机背景为什么总是黑色,电脑背景变成黑色的了是为什么
  3. oracle dbms_crypto,Oracle的dbms_obfuscation_toolkit加密解密数据
  4. python创建access表_Access创建表
  5. 如何提高Java代码的可重用性?
  6. oracle (+)的可读性,Oracle基础笔记一
  7. jsp工程防止外部注入_XPATH注入详解|OWASP Top 10安全风险实践(五)
  8. html调用python_flask之模板html中调用python函数方法
  9. java打印已经被加载的类_使用URLClassLoader加载类,不会报错,但被加载类中的内容也没有打印出来...
  10. kafka python教程_kafka python 指定分区消费