管理Hadoop作业的官方工作流程调度程序是Apache Oozie。与许多其他Hadoop产品一样,Oozie是用Java编写的,是基于服务器的Web应用程序,它运行执行Hadoop MapReduce和Pig的工作流作业。 Oozie工作流是在XML文档中指定的控制依赖性指导非循环图(DAG)中排列的动作集合。虽然Oozie在Hadoop社区中有很多支持,但通过XML属性配置工作流和作业的学习曲线非常陡峭。

Luigi是Spotify创建的Python替代方案,可以构建和配置复杂的批处理作业管道。它处理依赖项解析,工作流管理,可视化等等。它还拥有庞大的社区,并支持许多Hadoop技术。在github上超过1万星。

本章介绍Luigi的安装和工作流程的详细说明。

安装

pip install luigi

工作流

在Luigi中,工作流由一系列操作组成,称为任务。 Luigi任务是非特定的,也就是说,它们可以是任何可以用Python编写的东西。任务的输入和输出数据的位置称为目标(target)。目标通常对应于磁盘上,HDFS上或数据库中的文件位置。除了任务和目标之外,Luigi还利用参数来自定义任务的执行方式。

  • 任务

任务是构成Luigi工作流的操作序列。每个任务都声明其依赖于其他任务创建的目标。这样Luigi能够创建依赖链。

  • 目标

目标是任务的输入和输出。最常见的目标是磁盘上的文件,HDFS中的文件或数据库中的记录。 Luigi包装了底层文件系统操作,以确保与目标的交互是原子的。这允许从故障点重放工作流,而不必重放任何已经成功完成的任务。

  • 参数

参数允许通过允许值从命令行,以编程方式或从其他任务传递任务来自定义任务。例如,任务输出的名称可以通过参数传递给任务的日期来确定。

参考资料

  • python测试开发项目实战-目录
  • python工具书籍下载-持续更新
  • python 3.7极速入门教程 - 目录
  • 原文地址
  • 本文涉及的python测试开发库 谢谢点赞!
  • [本文相关海量书籍下载](https://github.com/china-testing/python-api-tesing/blob/master/books.md

工作流本示例

#!/usr/bin/env python
# 项目实战讨论QQ群630011153 144081101
# https://github.com/china-testing/python-api-tesing
import luigiclass InputFile(luigi.Task):"""A task wrapping a Target """input_file = luigi.Parameter()def output(self):"""Return the target for this task"""return luigi.LocalTarget(self.input_file)class WordCount(luigi.Task):"""A task that counts the number of words in a file"""input_file = luigi.Parameter()output_file = luigi.Parameter(default='/tmp/wordcount')def requires(self):"""The task's dependencies:"""return InputFile(self.input_file)def output(self):"""The task's output"""return luigi.LocalTarget(self.output_file)def run(self):"""The task's logic"""count = {}ifp = self.input().open('r')for line in ifp:for word in line.strip().split():count[word] = count.get(word, 0) + 1ofp = self.output().open('w')for k, v in count.items():ofp.write('{}\t{}\n'.format(k, v))ofp.close()if __name__ == '__main__':luigi.run()

执行

$ python wordcount.py WordCount --local-scheduler --input-file /home/hduser_/input2.txt --output-file /tmp/wordcount2.txt
DEBUG: Checking if WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt) is complete
DEBUG: Checking if InputFile(input_file=/home/hduser_/input2.txt) is complete
INFO: Informed scheduler that task   WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2   has status   PENDING
INFO: Informed scheduler that task   InputFile__home_hduser__in_0eced493f7   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) running   WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) done      WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====Scheduled 2 tasks of which:
* 1 complete ones were encountered:- 1 InputFile(input_file=/home/hduser_/input2.txt)
* 1 ran successfully:- 1 WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)This progress looks :) because there were no failed tasks or missing dependencies===== Luigi Execution Summary =====hduser_@andrew-PC:/home/andrew/code/HadoopWithPython/python/Luigi$ cat /tmp/wordcount2.txt
jack    2
be    2
nimble    1
quick    1

[雪峰磁针石博客]大数据Hadoop工具python教程9-Luigi工作流...相关推荐

  1. [雪峰磁针石博客]大数据Hadoop工具python教程2-python访问HDFS

    https://pypi.org/project/hdfs3 已经不维护 PyArrow https://pypi.org/project/hdfs/ https://pypi.org/project ...

  2. [python作业AI毕业设计博客]大数据Hadoop工具python教程1-HDFS Hadoop分布式文件系统...

    Hadoop分布式文件系统(HDFS:Hadoop Distributed File System)是基于Java的分布式文件系统 分布式,可扩展和可移植的文件系统,旨在跨越大型商用服务器集群. HD ...

  3. python3基础教程雪峰_[雪峰磁针石博客]python3快速入门教程2数据结构1变量与赋值...

    Published: 日 02 九月 2018 语法基础 解释器像简单的计算器:可以输入表达式,它会返回值.表达式语法很简单:运算符 + , - , * 和 / 与其它语言一样(例如Pascal或C) ...

  4. [雪峰磁针石博客]可爱的python测试开发库

    欢迎转载,转载请注明来源:github地址 谢谢点赞 相关书籍下载 测试开发 Web UI测试自动化 splinter - web UI测试工具,基于selnium封装. 链接 selenium - ...

  5. [雪峰磁针石博客]软件测试专家工具包1web测试

    web测试 本章主要涉及功能测试.自动化测试(参考: 软件自动化测试初学者忠告) .接口测试(参考:10分钟学会API测试).跨浏览器测试.可访问性测试和可用性测试的测试工具列表. 安全测试工具和性能 ...

  6. [雪峰磁针石博客]数据仓库快速入门教程1简介

    数据仓库是从各种渠道收集和管理数据的技术,可提供有意义的业务洞察,战略性地使用数据. 它用于查询和分析而不是事务处理,是将数据转换为信息并及时向用户提供的过程. 决策支持数据库(数据仓库)与组织的运营 ...

  7. [雪峰磁针石博客]性能测试艺术

    为什么要进行性能测试? 什么是好的与坏的性能?为什么性能测试在软件开发生命周期(SDLC software development life cycle)中很重要? 性能不佳的应用通常无法实现企业预期 ...

  8. 大学使用python 编辑器_[雪峰磁针石博客]2018 最佳python编辑器和IDE

    IDE没有统一的标准,自己习惯就是最好的.本文列出一些较常用的IDE,供大家参考. 一般而言,WingIDE.PyCharm.Spyder.Vim是比较常用的IDE. Spyder Spyder是Py ...

  9. [雪峰磁针石博客]python3标准库-中文版2:内置函数

    2019独角兽企业重金招聘Python工程师标准>>> 内置功能 abs() dict() help() min() setattr() all() dir() hex() next ...

最新文章

  1. 面试?莫慌--- 教你如何“秀技”摩擦面试官
  2. 网页制作遵循四大原则让网站建设更加优质
  3. Stanford UFLDL教程 自编码算法与稀疏性
  4. 让线程等待10秒_把python程序变成多线程
  5. 如何获取独立项目开发经验
  6. HBase全网最佳学习资料汇总
  7. Spring mvc @ModelAttribute
  8. java不能弹出打印窗口,java – 如何打印一个摆动窗口,使其非常适合一页
  9. 家用计算机的使用说明,AWIND奇机家用无线投屏器使用说明
  10. 什么是IPS?如何对其进行调整?
  11. Ubuntu 13.10 安装后配置
  12. PyTorch常用5个抽样函数
  13. 日本IT公司工作体验
  14. Linux tar压缩文件夹,排除该文件夹下的某些文件夹或文件
  15. 光纤收发器tx和rx是什么意思?二者有什么区别?
  16. C语言会员卡计费系统
  17. Python框架django路由管理
  18. grid - gap
  19. iOS App构建版本
  20. python 打包命令以及使用方法

热门文章

  1. c语言两个变量相乘出现乱码,C语言,矩阵的乘法运算程序,输出一堆乱码,求大神看看哪里错了。。...
  2. 未能加载文件或程序集或它的某一个依赖项_手写一个miniwebpack
  3. RT-Thread Pin设备驱动API应用介绍
  4. oracle里的AUE是什么意思,oracle 创建表空间步骤代码
  5. linux实时线程调度bug,linux中采用用户级线程模拟实现EDF和RMS两种处理机实时调度算法之改进...
  6. flash activex java_Adobe flash player ActiveX和NPAPI和PPAPI 这三个软件有什么区别?
  7. java 华为面试题_JAVA华为面试题
  8. 依赖注入的三种方式_一起学Spring之三种注入方式及集合类型注入
  9. js 单页面ajax缓存策略,浅谈ajax的缓存机制---IE浏览器方面
  10. 主板用什么软件测试呢,什么软件检测主板能用什么cpu