转自:http://qindongliang.iteye.com/blog/2064281

如果MapReduce中需要用到多个job,而且多个job之间需要设置一些依赖关系,比如Job3需要依赖于Job1和Job2,这就要用到JobControl,具体的用法如下:

  1. public static int handleJobChain(Job job1 ,Job job2,Job job3, String chainName) throws IOException{
  2. ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
  3. controlledJob1.setJob(job1);
  4. ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
  5. controlledJob2.setJob(job2);
  6. ControlledJob controlledJob3 = new ControlledJob(job2.getConfiguration());
  7. controlledJob3.setJob(job3);
  8. controlledJob3.addDependingJob(controlledJob1);
  9. controlledJob3.addDependingJob(controlledJob2);
  10. JobControl jc = new JobControl(chainName);
  11. jc.addJob(controlledJob1);
  12. jc.addJob(controlledJob2);
  13. jc.addJob(controlledJob2);
  14. Thread jcThread = new Thread(jc);
  15. jcThread.start();
  16. while(true){
  17. if(jc.allFinished()){
  18. System.out.println(jc.getSuccessfulJobList());
  19. jc.stop();
  20. return 0;
  21. }
  22. if(jc.getFailedJobList().size() > 0){
  23. System.out.println(jc.getFailedJobList());
  24. jc.stop();
  25. return 1;
  26. }
  27. }
  28. }

需要给每个Job设置自己的Configuration,然后通过JobControl将多个Job连接到一起。

由于JobControl实现了Runnable接口,可以通过线程运行JobControl,最后通过Stop方法可以停止。如果不用一个Thread来运行,就会导致Hadoop中所有Job执行完毕之后,最后不会退出,但是结果是输出完毕的。

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------

使用Hadoop里面的MapReduce来处理海量数据是非常简单方便的,但有时候我们的应用程序,往往需要多个MR作业,来计算结果,比如说一个最简单的使用MR提取海量搜索日志的TopN的问题,注意,这里面,其实涉及了两个MR作业,第一个是词频统计,第两个是排序求TopN,这显然是需要两个MapReduce作业来完成的。其他的还有,比如一些数据挖掘类的作业,常常需要迭代组合好几个作业才能完成,这类作业类似于DAG类的任务,各个作业之间是具有先后,或相互依赖的关系,比如说,这一个作业的输入,依赖上一个作业的输出等等。

在Hadoop里实际上提供了,JobControl类,来组合一个具有依赖关系的作业,在新版的API里,又新增了ControlledJob类,细化了任务的分配,通过这两个类,我们就可以轻松的完成类似DAG作业的模式,这样我们就可以通过一个提交来完成原来需要提交2次的任务,大大简化了任务的繁琐度。具有依赖式的作业提交后,hadoop会根据依赖的关系,先后执行的job任务,每个任务的运行都是独立的。

下面来看下散仙的例子,组合一个词频统计+排序的作业,测试数据如下:

Java代码  
  1. 秦东亮;72
  2. 秦东亮;34
  3. 秦东亮;100
  4. 三劫;899
  5. 三劫;32
  6. 三劫;1
  7. a;45
  8. b;567
  9. b;12

代码如下:

Java代码  
  1. package com.qin.test.hadoop.jobctrol;
  2. import java.io.IOException;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.io.WritableComparator;
  9. import org.apache.hadoop.mapred.JobConf;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
  15. import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  17. /**
  18. * Hadoop的版本是1.2的
  19. * JDK环境1.6
  20. * 使用ControlledJob+JobControl新版API
  21. * 完成组合式任务
  22. * 第一个任务是统计词频
  23. * 第二个任务是降序排序
  24. *
  25. * 如果使用MapReduce作业来完成的话,则需要跑2个MR任务
  26. * 但是如果我们使用了JobControl+ControlledJob就可以在
  27. * 一个类里面完成类型的DAG依赖式的作业
  28. *
  29. *
  30. * @author qindongliang
  31. *
  32. *
  33. *
  34. * ***/
  35. public class MyHadoopControl {
  36. /***
  37. *
  38. *MapReduce作业1的Mapper
  39. *
  40. *LongWritable 1  代表输入的key值,默认是文本的位置偏移量
  41. *Text 2          每行的具体内容
  42. *Text 3          输出的Key类型
  43. *Text 4          输出的Value类型
  44. *
  45. * */
  46. private static class SumMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  47. private Text t=new Text();
  48. private IntWritable one=new IntWritable(1);
  49. /**
  50. *
  51. * map阶段输出词频
  52. *
  53. *
  54. * **/
  55. @Override
  56. protected void map(LongWritable key, Text value,Context context)
  57. throws IOException, InterruptedException {
  58. String data=value.toString();
  59. String words[]=data.split(";");
  60. if(words[0].trim()!=null){
  61. t.set(" "+words[0]);//赋值K
  62. one.set(Integer.parseInt(words[1]));
  63. context.write(t, one);
  64. }
  65. }
  66. }
  67. /**
  68. * MapReduce作业1的Reducer
  69. * 负责词频累加,并输出
  70. *
  71. * **/
  72. private static class SumReduce extends Reducer<Text, IntWritable, IntWritable, Text>{
  73. //存储词频对象
  74. private IntWritable iw=new IntWritable();
  75. @Override
  76. protected void reduce(Text key, Iterable<IntWritable> value,Context context)
  77. throws IOException, InterruptedException {
  78. int sum=0;
  79. for(IntWritable count:value){
  80. sum+=count.get();//累加词频
  81. }
  82. iw.set(sum);//设置词频
  83. context.write(iw, key);//输出数据
  84. }
  85. }
  86. /**
  87. * MapReduce作业2排序的Mapper
  88. *
  89. * **/
  90. private static class SortMapper  extends Mapper<LongWritable, Text, IntWritable, Text>{
  91. IntWritable iw=new IntWritable();//存储词频
  92. private Text t=new Text();//存储文本
  93. @Override
  94. protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
  95. String words[]=value.toString().split(" ");
  96. System.out.println("数组的长度: "+words.length);
  97. System.out.println("Map读入的文本: "+value.toString());
  98. System.out.println("=====>  "+words[0]+"  =====>"+words[1]);
  99. if(words[0]!=null){
  100. iw.set(Integer.parseInt(words[0].trim()));
  101. t.set(words[1].trim());
  102. context.write(iw, t);//map阶段输出,默认按key排序
  103. }
  104. }
  105. }
  106. /**
  107. * MapReduce作业2排序的Reducer
  108. *
  109. * **/
  110. private static class SortReduce extends Reducer<IntWritable, Text, Text, IntWritable>{
  111. /**
  112. *
  113. * 输出排序内容
  114. *
  115. * **/
  116. @Override
  117. protected void reduce(IntWritable key, Iterable<Text> value,Context context)
  118. throws IOException, InterruptedException {
  119. for(Text t:value){
  120. context.write(t, key);//输出排好序后的K,V
  121. }
  122. }
  123. }
  124. /***
  125. * 排序组件,在排序作业中,需要使用
  126. * 按key的降序排序
  127. *
  128. * **/
  129. public static class DescSort extends  WritableComparator{
  130. public DescSort() {
  131. super(IntWritable.class,true);//注册排序组件
  132. }
  133. @Override
  134. public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
  135. int arg4, int arg5) {
  136. return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序
  137. }
  138. @Override
  139. public int compare(Object a, Object b) {
  140. return   -super.compare(a, b);//注意使用负号来完成降序
  141. }
  142. }
  143. /**
  144. * 驱动类
  145. *
  146. * **/
  147. public static void main(String[] args)throws Exception {
  148. JobConf conf=new JobConf(MyHadoopControl.class);
  149. conf.set("mapred.job.tracker","192.168.75.130:9001");
  150. conf.setJar("tt.jar");
  151. System.out.println("模式:  "+conf.get("mapred.job.tracker"));;
  152. /**
  153. *
  154. *作业1的配置
  155. *统计词频
  156. *
  157. * **/
  158. Job job1=new Job(conf,"Join1");
  159. job1.setJarByClass(MyHadoopControl.class);
  160. job1.setMapperClass(SumMapper.class);
  161. job1.setReducerClass(SumReduce.class);
  162. job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key
  163. job1.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value
  164. job1.setOutputKeyClass(IntWritable.class);//reduce阶段的输出的key
  165. job1.setOutputValueClass(Text.class);//reduce阶段的输出的value
  166. //加入控制容器
  167. ControlledJob ctrljob1=new  ControlledJob(conf);
  168. ctrljob1.setJob(job1);
  169. FileInputFormat.addInputPath(job1, new Path("hdfs://192.168.75.130:9000/root/input"));
  170. FileSystem fs=FileSystem.get(conf);
  171. Path op=new Path("hdfs://192.168.75.130:9000/root/op");
  172. if(fs.exists(op)){
  173. fs.delete(op, true);
  174. System.out.println("存在此输出路径,已删除!!!");
  175. }
  176. FileOutputFormat.setOutputPath(job1, op);
  177. /**========================================================================*/
  178. /**
  179. *
  180. *作业2的配置
  181. *排序
  182. *
  183. * **/
  184. Job job2=new Job(conf,"Join2");
  185. job2.setJarByClass(MyHadoopControl.class);
  186. //job2.setInputFormatClass(TextInputFormat.class);
  187. job2.setMapperClass(SortMapper.class);
  188. job2.setReducerClass(SortReduce.class);
  189. job2.setSortComparatorClass(DescSort.class);//按key降序排序
  190. job2.setMapOutputKeyClass(IntWritable.class);//map阶段的输出的key
  191. job2.setMapOutputValueClass(Text.class);//map阶段的输出的value
  192. job2.setOutputKeyClass(Text.class);//reduce阶段的输出的key
  193. job2.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value
  194. //作业2加入控制容器
  195. ControlledJob ctrljob2=new ControlledJob(conf);
  196. ctrljob2.setJob(job2);
  197. /***
  198. *
  199. * 设置多个作业直接的依赖关系
  200. * 如下所写:
  201. * 意思为job2的启动,依赖于job1作业的完成
  202. *
  203. * **/
  204. ctrljob2.addDependingJob(ctrljob1);
  205. //输入路径是上一个作业的输出路径
  206. FileInputFormat.addInputPath(job2, new Path("hdfs://192.168.75.130:9000/root/op/part*"));
  207. FileSystem fs2=FileSystem.get(conf);
  208. Path op2=new Path("hdfs://192.168.75.130:9000/root/op2");
  209. if(fs2.exists(op2)){
  210. fs2.delete(op2, true);
  211. System.out.println("存在此输出路径,已删除!!!");
  212. }
  213. FileOutputFormat.setOutputPath(job2, op2);
  214. // System.exit(job2.waitForCompletion(true) ? 0 : 1);
  215. /**====================================================================***/
  216. /**
  217. *
  218. * 主的控制容器,控制上面的总的两个子作业
  219. *
  220. * **/
  221. JobControl jobCtrl=new JobControl("myctrl");
  222. //ctrljob1.addDependingJob(ctrljob2);// job2在job1完成后,才可以启动
  223. //添加到总的JobControl里,进行控制
  224. jobCtrl.addJob(ctrljob1);
  225. jobCtrl.addJob(ctrljob2);
  226. //在线程启动
  227. Thread  t=new Thread(jobCtrl);
  228. t.start();
  229. while(true){
  230. if(jobCtrl.allFinished()){//如果作业成功完成,就打印成功作业的信息
  231. System.out.println(jobCtrl.getSuccessfulJobList());
  232. jobCtrl.stop();
  233. break;
  234. }
  235. if(jobCtrl.getFailedJobList().size()>0){//如果作业失败,就打印失败作业的信息
  236. System.out.println(jobCtrl.getFailedJobList());
  237. jobCtrl.stop();
  238. break;
  239. }
  240. }
  241. }
  242. }

运行日志如下:

Java代码  
  1. 模式:  192.168.75.130:9001
  2. 存在此输出路径,已删除!!!
  3. 存在此输出路径,已删除!!!
  4. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
  5. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
  6. WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  7. WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
  8. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
  9. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
  10. [job name:  Join1
  11. job id: myctrl0
  12. job state:  SUCCESS
  13. job mapred id:  job_201405092039_0001
  14. job message:    just initialized
  15. job has no depending job:
  16. , job name: Join2
  17. job id: myctrl1
  18. job state:  SUCCESS
  19. job mapred id:  job_201405092039_0002
  20. job message:    just initialized
  21. job has 1 dependeng jobs:
  22. depending job 0:   Join1
  23. ]

处理的结果如下:

Java代码  
  1. 三劫  932
  2. b   579
  3. 秦东亮 206
  4. a   45

可以看出,结果是正确的。程序运行成功,上面只是散仙测的2个MapReduce作业的组合,更多的组合其实和上面的一样。 
总结:在配置多个作业时,Job的配置尽量分离单独写,不要轻易拷贝修改,这样很容易出错的,散仙在配置的时候,就是拷贝了一个,结果因为少修改了一个地方,在运行时候一直报错,最后才发现,原来少改了某个地方,这一点需要注意一下。

如何使用Hadoop的JobControl相关推荐

  1. hadoop源代码组织结构与阅读技巧

    本文将介绍在 Eclipse 下阅读 Hadoop 源代码的一些技巧, 比如如何查看一个基类有哪些派生类. 一个方法被其他哪些方法调用等. 本文地址:http://www.cnblogs.com/ar ...

  2. Hadoop源代码组织结构

    文章转自:http://book.51cto.com/art/201312/422113.htm 1.4 Hadoop源代码组织结构 直接解压Hadoop压缩包后,可看到图1-11所示的目录结构,其中 ...

  3. Hadoop实现词频统计(按照词频降序排列以及相同词频的单词按照字母序排列)

    Hadoop实现词频统计(按照词频降序排列以及相同词频的单词按照字母序排列) 一.环境 二.实现步骤 1.数据 2.主函数 3.第一个MapReduce Map Reduce 4.第二个MapRedu ...

  4. MapReduce DataJoin 链接多数据源

    主要介绍用DataJoin类来链接多数据源,先看一下例子,假设二个数据源customs和orders customer ID       Name      PhomeNumber 1         ...

  5. ChainMapper和ChainReducer

    The ChainMapper class allows to use multiple Mapper classes within a single Map task.  The ChainRedu ...

  6. MapReduce—案例(五)求两两共同好友

    题目: A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O ...

  7. MapReduce(五)

    MapReduce的(五) 1.MapReduce的多表关联查询. 根据文本数据格式.查询多个文本中的内容关联.查询. 2.MapReduce的多任务窜执行的使用 多任务的串联执行问题,主要是要建立c ...

  8. Hadoop实战第四章--读书笔记

    Hadoop三种运行方式: 单机模式.优点:安装配置简单,运行在本地文件系统,便于调试和查看运行效果:缺点:数据量大时较慢,不能模拟分布式模式: 伪分布式模式.优点:运行在本地HDFS文件系统上,能够 ...

  9. Hadoop大数据零基础高端实战培训系列配文本挖掘项目

    <Hadoop大数据零基础高端实战培训系列配文本挖掘项目(七大亮点.十大目标)> 课程讲师:迪伦 课程分类:大数据 适合人群:初级 课时数量:230课时 用到技术:部署Hadoop集群 涉 ...

最新文章

  1. 【300】◀▶ IDL - ENVI API
  2. python qtablewedgit_PyQt5-高级控件使用(QTableWidget)
  3. opengl加载显示3D模型ZAE类型文件
  4. tiny4412u-boot烧写及根文件系统制作(不进入终端问题)
  5. 2018蓝桥杯B组:猴子分香蕉(C++/JAVA)
  6. 智能优化算法应用:基于麻雀搜索算法的积分计算 -附代码
  7. YDOOK:STM32: 最新版选型手册下载 2021
  8. 神舟计算机主板bios,神舟笔记本BIOS设置详解
  9. 品质qc工程图_QC工程图_(品管).xls
  10. 用谷歌浏览器模拟打开天眼查网站并爬取需要的数据
  11. 服务器系统盘是否需要阵列,服务器硬盘必须设置阵列吗
  12. 天津滨海农商银行数据脱敏建设实践
  13. 数独高级算法,直接复制到网页,即可结题
  14. java几种对象的区别(PO,POJO,VO,BO,DAO)
  15. kubernetes 入门实践-搭建集群
  16. Mongodb stop: Unknown instance报错
  17. S3C2440上LCD驱动(FrameBuffer)实例开发讲解(一)
  18. html网页制作—登录及注册页面设计
  19. 小波变换的matlab实现,维小波变换MATLAB实现
  20. KVM-虚拟化技术之Hypervisor-架构

热门文章

  1. NSAutoReleasePool使用中drain和release的区别
  2. mysql状态常用参数分析
  3. 移动商城第五篇【查看、删除、编辑品牌】
  4. Python 开发面试题
  5. 在win7下安装SQL sever2005
  6. Win10无法修改编辑hosts文件
  7. quartz定时定时任务执行两次
  8. mapreduce原理
  9. Java实现单例模式
  10. Visio 快捷大全(转载)