MRJobConfig
public static fina COMBINE_CLASS_ATTR
属性COMBINE_CLASS_ATTR= "mapreduce.job.combine.class"
————子接口(F4) JobContent
方法getCombinerClass
————子实现类 JobContextImpl
实现getCombinerClass方法:
public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
throws ClassNotFoundException {
return (Class<? extends Reducer<?,?,?,?>>)
conf.getClass(COMBINE_CLASS_ATTR, null);
}
因为JobContextImpl是MRJobConfig子类
所以得到了父类MRJobConfig的COMBINE_CLASS_ATTR属性
————子类Job
public void setCombinerClass(Class<? extends Reducer> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
}
因为JobContextImpl是MRJobConfig子类,
而Job是JobContextImpl的子类
所以也有COMBINE_CLASS_ATTR属性
通过setCombinerClass设置了父类MRJobConfig的属性
MRJobConfig
————子接口JobContent
方法getCombinerClass
————子实现类 JobContextImpl
————子类 Job
————子实现类 TaskAttemptContext
继承了方法getCombinerClass
Task   
$CombinerRunner(Task的内部类)   
该内部类有方法create:
public static <K,V> CombinerRunner<K,V> create(JobConf job,
TaskAttemptID taskId,
Counters.Counter inputCounter,
TaskReporter reporter,
org.apache.hadoop.mapreduce.OutputCommitter committer
) throws ClassNotFoundException
{
Class<? extends Reducer<K,V,K,V>> cls =
(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
if (cls != null) {
return new OldCombinerRunner(cls, job, inputCounter, reporter);
}
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
reporter);
Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
(Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
taskContext.getCombinerClass();
if (newcls != null) {
return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
inputCounter, reporter, committer);
}
return null;
}
其中这一段应该是旧的API
Class<? extends Reducer<K,V,K,V>> cls =
(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
if (cls != null) {
return new OldCombinerRunner(cls, job, inputCounter, reporter);
}
而这个是新的API
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
reporter);
Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
(Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
taskContext.getCombinerClass();
if (newcls != null) {
return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
inputCounter, reporter, committer);
}
return null;
(不知道为什么要写全名,去掉那些包名、向上/下转型和各种泛型的话,看起来就会清晰很多?)
而TaskAttemptContext是JobContent的子实现类,所以继承了getCombinerClass方法
而且,这里用的是多态,其调用的是子实现类TaskAttemptContextImpl的getCombinerClass方法
(TaskAttemptContextImpl继承了JobContextImpl,而JobContextImpl实现了该方法)
所以最终get到了属性COMBINE_CLASS_ATTR,即得到了我们通过job.setCombinerClass的xxxC
而这个xxxC是给了newcls,而newcls是给了NewCombinerRunner的构造函数的reducerClassc参数
NewCombinerRunner(Class reducerClass,
JobConf job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
org.apache.hadoop.mapreduce.TaskAttemptContext context,
Counters.Counter inputCounter,
TaskReporter reporter,
org.apache.hadoop.mapreduce.OutputCommitter committer)
{
super(inputCounter, job, reporter);
this.reducerClass = reducerClass;
this.taskId = taskId;
keyClass = (Class<K>) context.getMapOutputKeyClass();
valueClass = (Class<V>) context.getMapOutputValueClass();
comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
this.committer = committer;
}
Task
MapTask
$MapOutputBuffer
private CombinerRunner<K,V> combinerRunner;
$SpillThread类($表示内部类)
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
//此时,我们得到了设置好的合并类                            
if (combinerRunner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
final int kvoff = offsetFor(spindex % maxRec);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
}
再查看combine函数
在Task的内部类NewCombinerRunner下
public void combine(RawKeyValueIterator iterator,
OutputCollector<K,V> collector)
throws IOException, InterruptedException,ClassNotFoundException
{
// make a reducer
org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =
(org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)
ReflectionUtils.newInstance(reducerClass, job);
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, taskId,
iterator, null, inputCounter,
new OutputConverter(collector),
committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
}
上面的reducerClass就是我们传入的xxxC
最终是通过反射创建了一个xxxC对象,并将其强制向上转型为Reducer实例对象,
然后调用了向上转型后对象的run方法(当前的xxxC没有run方法,调用的是父类Reduce的run)
在类Reducer中,run方法如下
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
           * control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();       
}
}
} finally {
cleanup(context);
}
}
有由于多态,此时调用的reduce是子类xxxC中的reduce方法
(多态态性质:子类复写了该方法,则实际上执行的是子类中的该方法)
所以说,我们自定义combine用的类的时候,应该继承Reducer类,并且复写reduce方法
且其输入形式:(以wordcount为例)
       reduce(Text key, Iterable<IntWritable> values, Context context)
       其中key是单词个数,而values是个数列表,也就是value1、value2........
       注意,此时已经是列表,即<键,list<值1、值2、值3.....>>
       (之所以得到这个结论,是因为我当时使用的combine类是WCReduce,
        即Reduce和combine所用的类是一样的,通过对代码的分析,传入值的结构如果是<lkey,value>的话,是不可能做到combine的啊——即所谓的对相同值合并,求计数的累积和,这根本就是两个步骤,对key相同的键值对在map端就进行了一次合并了,合并成了<key,value list>,然后才轮到combine接受直接换个形式的输入,并处理——我们的处理是求和,然后再输出到context,进入reduce端的shuffle过程。
        然后我在reduce中遍历了用syso输出
        结果发现是0,而这实际上是因为经过一次遍历,我的指针指向的位置就不对了啊,
        )
嗯,自己反复使用以下的代码,不断的组合、注释,去测试吧~就会得出这样的结论了
  1. /reduce
  2.     publicstaticclassWCReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
  3.         private final IntWritableValueOut=newIntWritable();
  4.         @Override
  5.         protectedvoid reduce(Text key,Iterable<IntWritable> values,
  6.                 Context context)  throws IOException,InterruptedException{
  7.             for(IntWritable value : values){
  8.                 System.out.println(value.get()+"--");
  9.             }
  10.  
  11. //            int total = 0 ;
  12. //            for (IntWritable value : values) {
  13. //                total += value.get();
  14. //            }
  15. //            ValueOut.set(total);
  16. //            context.write(key, ValueOut);
  17.         }
  18.  
  19.     }
  20.           
  21. job.setCombinerClass(WCReduce.class);
来自为知笔记(Wiz)

附件列表

转载于:https://www.cnblogs.com/xuanlvshu/p/5744445.html

关于MapReduce中自定义Combine类(一)相关推荐

  1. 在php中自定义一个类的关键字为( ),精读《未来简史》尔雅章节测验答案

    精读<未来简史>尔雅章节测验答案 更多相关问题 [单选题]当采用环刀法取样时,取样点应位于每层土的()深度处. A. 1/2 B. 1/3 C. 1/4 D. 2/3 [填空题]OSI/R ...

  2. MapReduce中的combiner类详解及自定义combiner类(转)

    转自:https://www.cnblogs.com/edisonchou/p/4297786.html 一.Combiner的出现背景 1.1 回顾Map阶段五大步骤 在第四篇博文<初识Map ...

  3. java client类_Jmeter中自定义JavaSamplerClient类的编写

    自定义的JavaSamplerClient类需要满足以下几个条件: 1.需要把Jmter的ext目录下的ApacheJmeter_core.jar和ApacheJmetere_java.jar 引入到 ...

  4. eclipse中自定义videoview类_android控件之VideoView建立自己的播放器

    简介 用来播放视频文件.该VideoView类可以加载各种来源的图像(如资源或内容提供商),需要计算它从视频测量,以便它可以在任何布局管理器使用,并提供诸如缩放和着色的各种显示选项.在其他的平台上面可 ...

  5. java中自定义日期类_java日期操作自定义类

    每次当需要对日期进行处理的时候,总是会记不住库.方法和格式,所以就自己简单封装了下:包含了大部分功能,部分不常用的没有实现. package com.tunicorn.marketing.utils; ...

  6. 在python中定义类时、运算符重载_自定义 Python 类中的运算符和函数重载(上)...

    如果你对 Python 中的str对象使用过 + 或 * 运算符,你一定注意到了它的操作与 int 或 float 类型的区别: 你可能想知道同一内置运算符或函数如何对不同类对象进行不同操作的.这分别 ...

  7. C#中自定义类数组和结构数组的使用

    如有雷同,不胜荣幸,若转载,请注明 C#中自定义类数组和结构数组的使用 最近在很多项目中发现很多时候给定的数组要实现某个逻辑或处理很是麻烦,一维数组,二维数组,,,等等需要经过n多转换,还不如自己写一 ...

  8. Qt中的自定义模型类

    文章目录 1 Qt中的通用模型类 1.1 Qt中的通用模型类 1.2 Qt中的变体类型QVariant 2 自定义模型类 2.1 自定义模型类设计分析 2.2 自定义模型类数据层.数据表示层.数据组织 ...

  9. kettle中java组件_kettle系列-[KettleUtil]kettle插件,类似kettle的自定义java类控件

    该kettle插件功能类似kettle现有的定义java类插件,自定java类插件主要是支持在kettle中直接编写java代码实现自定特殊功能,而本控件主要是将自定义代码转移到jar包,就是说自定义 ...

最新文章

  1. 对话文津|相约“信息”的前世今生
  2. FPGA 和ASIC开发的区别
  3. [转]浅谈OCR之Tesseract
  4. Matlab处理JSON数据
  5. DDD:DomainEvent、ApplicationEvent、Command
  6. ASP.NET中Server.MapPath() 和Request.MapPath()使用
  7. 洛谷 P3063 [USACO12DEC]牛奶的路由Milk Routing
  8. (Singleton)单例模式的Java实现
  9. sql学生选课管理系统
  10. qq里测试音色的软件,QQ换音大师2015(QQ消息提示声音修改软件)
  11. win10c盘扩容_三招给你的C盘瘦身
  12. 聚类分析matlab原理,matlab笔记模糊聚类分析原理及实现023.docx
  13. Golang | flag pflag介绍
  14. 爬虫:深度爬取网易云音乐所有歌手及其对应热门歌曲
  15. 实证研究的步骤_本科生毕业论文设计可以用到的研究方法有哪些
  16. node.js 上传文件比较 busboy vs. formidable vs. multer vs. multiparty
  17. 机器学习笔记(十六):多项式回归、拟合程度、模型泛化
  18. 2022第十二届中国电子文件管理论坛嘉宾揭晓
  19. 看苹果出的面试难题!!!
  20. Windows查看ios手机日志

热门文章

  1. 转载Linq中GroupBy方法的使用总结
  2. MOSS服务器场迁移1-有关切换登录用户时需要刷新一次才能成功的问题
  3. Socket网络编程【获取本机IP】
  4. C++之构造函数和析构函数强化
  5. freemarker 从 spring boot execute jar可执行jar中访问模板文件
  6. 【Python3网络爬虫开发实战】4-解析库的使用-3 使用pyquery
  7. 理解什么是前后端分离
  8. CloudCC CRM:物联网必将成为CRM的推动力
  9. Android Ion 框架 文件下载
  10. Sublime 资源汇总