自定义combiner

  • map端合并数据,减少网络io
    • 一、普通的combiner
    • 二、自定义combiner,实现自由合并

map端合并数据,减少网络io

前言:在map端使用combiner合并数据可以减少需要通过网络io的数据,有效增加map reduce程序的运行效率。

一、普通的combiner

在map端提前使用combiner合并数据是广为人知的一种优化策略。
但是这种优化策略有两个缺陷,一个是数据量要比较大,不过考虑到map reduce程序处理的数据一般都是大量的数据,所以这个问题不是关键。
使combiner不那么受人重视的是另一个关键缺陷,因为combiner是要reducer程序在map端的提前,所以普遍的策略是combiner直接采用已有的reducer代码,而采用这种相同逻辑的combiner要求提前执行combiner程序,合并的数据不会影响到reducer端最终的合并。
这种要求只符合一些简单逻辑的程序,比如统计单词、求最大/小值等,这些程序的数据提前合并不会影响到reducer端的最终合并。
对比较复杂的程序逻辑来说是不能满足的,比如求平均数,一般的对一个学生的各科成绩score求平均数,reduce的逻辑是平均数(单个map的平均数)=score值的和/score个数,如果提前执行这样的combiner,reduce处理的数据将会变成最终平均数=单个map的平均数的和/map个数,将前者带入后者中即有最终平均数=score值的和/(score个数*map个数),可以发现如果每个map的score个数相等还是可以得出正确答案,但如果score个数不相等,则score个数小的会变成score大的值,从而使分母变大,进而导致最终平均值变小。

有如下数据:
score1.txt:

zhangsan 英语  60
zhangsan    政治  70
zhangsan    化学  80
lisi    英语  60
lisi    政治  70
lisi    化学  80
wanger  英语  60
wanger  政治  70
wanger  化学  80

score2.txt

zhangsan 语文  60
zhangsan    数学  70
lisi    语文  60
lisi    数学  70
wanger  语文  60
wanger  数学  70

mapper程序:

package cn.yy.hadoop.mapreduce.average;import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** description: a mapper program for get average number from data files* author: bob yy* since: 1.8**/
public class AverageMapper extends Mapper<LongWritable, Text, Text, FloatWritable> {private Text text = new Text();private FloatWritable number = new FloatWritable();/*** get student and score from input file, push its to reduce** @param key     line number* @param value   a line data of input file* @param context context of map reduce* @throws IOException          write* @throws InterruptedException write*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] row = value.toString().split("\t");String student = row[0];String score = row[2];text.set(student);number.set(Float.parseFloat(score));context.write(text, number);}
}

reducer程序:

package cn.yy.hadoop.mapreduce.average;import cn.yy.test.Test;
import com.sun.tools.javac.comp.Flow;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** description: a reducer program for get average number from mapper output* author: bob yy* since: 1.8**/
public class AverageReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {private FloatWritable avg = new FloatWritable();/*** add sum and count,get average by sum/count.** @param key     student flag* @param values  scores of key student* @param context context of map reduce, write data by the context.* @throws IOException          write* @throws InterruptedException write*/@Overrideprotected void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {float sum = 0;int count = 0;for (FloatWritable value : values) {sum += value.get();count++;}avg.set(sum / count);context.write(key, avg);}
}

正常的求平均值驱动程序:对于相同逻辑的combiner和reducer的求平均值程序是不能使用combiner在map端提前聚合的。

/*** get average use map reduce** @throws IOException            getInstance* @throws ClassNotFoundException waitForCompletion* @throws InterruptedException   waitForCompletion*/private void getAverage() throws IOException, ClassNotFoundException, InterruptedException {String in = "J:\\data\\average\\input";String out = "J:\\data\\average\\output";Path inPath = new Path(in);Path outPath = new Path(out);// 删除该文件LocalFileSystem.deleteFile(out);Job job = Job.getInstance();job.setMapperClass(AverageMapper.class);job.setReducerClass(AverageReducer.class);// set output format of mapper and reducerjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FloatWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FloatWritable.class);// set input and output pathFileInputFormat.setInputPaths(job, inPath);FileOutputFormat.setOutputPath(job, outPath);job.waitForCompletion(true);}

正确的结果:

lisi 68.0
wanger  68.0
zhangsan    68.0

如下是错误的案例:

/*** it is fail, combiner of get average must be self defined; combiner of map side added count number, reduced* average.** @throws IOException            getInstance* @throws ClassNotFoundException waitForCompletion* @throws InterruptedException   waitForCompletion*/private void getAverageNoCombiner() throws IOException, ClassNotFoundException, InterruptedException {String in = "J:\\data\\average\\input";String out = "J:\\data\\average\\output_noCombiner";Path inPath = new Path(in);Path outPath = new Path(out);// 删除该文件LocalFileSystem.deleteFile(out);Job job = Job.getInstance();job.setMapperClass(AverageMapper.class);job.setReducerClass(AverageReducer.class);// set output format of mapper and reducerjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FloatWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FloatWritable.class);// set Combiner no definejob.setCombinerClass(AverageReducer.class);// set input and output pathFileInputFormat.setInputPaths(job, inPath);FileOutputFormat.setOutputPath(job, outPath);job.waitForCompletion(true);}

错误的结果:可以发现最终结果比正确的平均值变小了。

lisi 67.5
wanger  67.5
zhangsan    67.5

主要错误是在map端使用combiner程序进行提前聚合;如上述所示,提前使用combiner程序聚合平均值会导致最终结果发生变化。

二、自定义combiner,实现自由合并

根据官方api我们知道,job设置combiner时接收Reducer的子类,一般的为了简化代码,直接将必有的Reducer实现类AverageReducer(业务reducer实现类)当作combiner传入job执行。
但是根据上面的论述,这种简单的方式导致了对于较为复杂的业务不能使用combiner提前合并。
所以,对于复杂业务,我们需要自定义combiner类。
因为mapreduce程序处理的都是大数据,所以map数据传递给reduce端所经过的网络i/o一般都是比较大,这使得我们可以对大多数mapreduce程序都采用提前combiner策略,即map传给reduce程序的数据都可以是经过提前合并的结果数据,从而极大的优化网络i/o。
对于求平均值的mapreduce程序,提前combiner会丢失隐性的数值,即score的个数。
自定义combiner逻辑:
很明显,当提前计算平均值就会导致数值变化,所以,可以统计结果(<student,<sum,count>>,保留对score求和得到的sum,统计score个数得到的count)但不求平均值(所以也就不会丢失数值),而将这些统计结果传给reduce程序,这里我们需要改变reduce原来的逻辑,现在的逻辑是遍历values是统计每个student在每个map中求得的sum和count,设所有sum和为ss,所有count和为cs,得到student平均值=ss/cs。
这里使用combiner统计结果元素,但不执行会导致数值变化的运算,在减少数据量的同时保持了combiner的正确性。
如此,可以说,对于大数据量的复杂业务,使用自定义combiner也完全可以成为常规的有效的优化手段,而代价则是增加了代码复杂度,所以使用这种优化手段的同时,我们应该注意系统的复杂度,如果太过复杂,则应该及时优化该代码。
自定义的combiner程序:这里只是简单的使用text完成数据的序列化,如果业务复杂可以使用javabean优化。

package cn.yy.hadoop.mapreduce.average;import cn.yy.test.Test;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** description: a define Average Combiner for get average number* author: bob yy* since: 1.8**/
public class AverageCombiner extends Reducer<Text, Text, Text, Text> {private Text text = new Text();/*** get count and sum of student score, push it to reducer by use context write Text; the combiner output data* is result of a mapper, so data is reduced.** @param key     student from mapper* @param values  scores of student* @param context write count and sum to reducer* @throws IOException          write* @throws InterruptedException write*/@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {int sum = 0;int count = 0;for (Text value : values) {sum += Integer.parseInt(value.toString());count++;}text.set("" + sum + "\t" + count);context.write(key, text);}
}

因为combiner改变了map和reduce的输入输出格式,所以map进行关于text的简化,简化后的map程序:

package cn.yy.hadoop.mapreduce.average;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** description: a mapper program for use combiner get average number,combiner get data from mapper output* author: bob yy* since: 1.8**/
public class CombinerMapper extends Mapper<LongWritable, Text, Text, Text> {private Text student = new Text();private Text score = new Text();/*** get student and score, serially score by Text for unique output format of mapper and reducer.** @param key     line number* @param value   line data of input file, include student and score...* @param context context of map reduce, write key value to reducer by the context* @throws IOException          write* @throws InterruptedException write*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");student.set(split[0]);score.set(split[2]);context.write(student, score);}
}

逻辑改变后的reduce程序:

package cn.yy.hadoop.mapreduce.average;import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** description: a reducer for define combin,it get data from combiner output* author: bob yy* since: 1.8**/
public class CombinerReducer extends Reducer<Text, Text, Text, FloatWritable> {private FloatWritable avg = new FloatWritable();/*** count score number and score sum of a student, run sum/count(score number) get average; because cache* count(score number) so that combiner not lose count.** @param key     student flag* @param values  some count and sum of a student* @param context use context write student and average to file* @throws IOException          write* @throws InterruptedException write*/@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {float sum = 0;float count = 0;for (Text value : values) {String[] split = value.toString().split("\t");sum += Integer.parseInt(split[0]);count += Integer.parseInt(split[1]);}avg.set(sum / count);context.write(key, avg);}
}

改变job流程的driver程序:

/*** define combiner, get average use map reduce and add define combiner.** @throws IOException            getInstance* @throws ClassNotFoundException waitForCompletion* @throws InterruptedException   waitForCompletion*/private void getAverageByCombiner() throws IOException, ClassNotFoundException, InterruptedException {String in = "J:\\data\\average\\input";String out = "J:\\data\\average\\output_Combiner";Path inPath = new Path(in);Path outPath = new Path(out);// 删除该文件LocalFileSystem.deleteFile(out);Job job = Job.getInstance();// use define combiner for get average numberjob.setMapperClass(CombinerMapper.class);job.setReducerClass(CombinerReducer.class);// set output format of mapper and reducerjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FloatWritable.class);// set combinerjob.setCombinerClass(AverageCombiner.class);// set input and output pathFileInputFormat.setInputPaths(job, inPath);FileOutputFormat.setOutputPath(job, outPath);job.waitForCompletion(true);}

使用自定义combiner求平均值的mapreduce程序运行如上数据,结果:

lisi 68.0
wanger  68.0
zhangsan    68.0

对于大多数的mapreduce程序,自定义combiner都可以运行,所以我们又多了一种对mapreduce程序优化的有效手段。
并且网上说combiner对于复杂业务不适用的情况也是可以避免的。
大数据运算技术之mapreduce。

mapreduce优化之自定义combiner相关推荐

  1. MapReduce学习总结之Combiner、Partitioner、Jobhistory

    一.Combiner 在MapReduce编程模型中,在Mapper和Reducer之间有一个非常重要的组件,主要用于解决MR性能瓶颈问题 combiner其实属于优化方案,由于带宽限制,应该尽量ma ...

  2. 【Android 内存优化】自定义组件长图组件 ( 长图滚动区域解码 | 手势识别 GestureDetector | 滑动计算类 Scroller | 代码示例 )

    文章目录 一.GestureDetector 创建与设置 二.GestureDetector 触摸事件传递 三.触摸滑动操作 四.惯性滑动操作 五.长图滑动组件代码示例 六.运行效果 七.源码及资源下 ...

  3. 【Android 内存优化】自定义组件长图组件 ( 获取图像宽高 | 计算解码区域 | 设置图像解码属性 复用 像素格式 | 图像绘制 )

    文章目录 一.获取图像真实宽高 二.计算解码区域 三.设置解码参数 内存复用 像素格式 四.图像绘制 五.执行效果 六.源码及资源下载 官方文档 API : BitmapRegionDecoder 在 ...

  4. MapReduce中的combiner类详解及自定义combiner类(转)

    转自:https://www.cnblogs.com/edisonchou/p/4297786.html 一.Combiner的出现背景 1.1 回顾Map阶段五大步骤 在第四篇博文<初识Map ...

  5. Hadoop学习笔记—8.Combiner与自定义Combiner

    一.Combiner的出现背景 1.1 回顾Map阶段五大步骤 在第四篇博文<初识MapReduce>中,我们认识了MapReduce的八大步凑,其中在Map阶段总共五个步骤,如下图所示: ...

  6. MapReduce (Shuffle,partition,combiner,Spill )

    一.shuffle介绍 1 .shuffle就是洗牌弄乱的意思,shuffle代表map 输出 到reduce 的整个过程,他解决的问题就是如何将多个map task的输出,作为多个reduce ta ...

  7. Combiner与自定义Combiner

    一.Combiner的出现背景 1.1 回顾Map阶段五大步骤 在第四篇博文<初识MapReduce>中,我们认识了MapReduce的八大步凑,其中在Map阶段总共五个步骤,如下图所示: ...

  8. 一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现

    1:首先搞好实体类对象: write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toStrin ...

  9. mapreduce的规约(Combiner)

    听了超哥的一席课后逐渐明白了Combiner,记录一下自己的理解!(thanks 超哥) 首先贴上两段代码: code1: package combine;import java.io.IOExcep ...

最新文章

  1. 去掉字符串连续出现K个0的子串
  2. C语言程序设计 练习1-13
  3. RMI强制Full GC每小时运行一次
  4. 用Openswan组建Linux IPSec ---第二部分
  5. 缺少ntstrsafe.lib kndis5mp.lib解决办法
  6. python 视频分析_成为视频分析专家:自动生成集锦的方法(Python实现)
  7. 深入了解clientXY,offsetXY,pageXY的区别
  8. 2021华为软挑赛复盘
  9. RDA226数字热释电传感器调试笔记
  10. 英特尔服务器主板型号参数对照表,Intel主板芯片组参数速查表(201805版)
  11. [匈牙利算法] 洛谷 P1640 连续攻击
  12. 自由到底意味着什么(二)叔本华说的第一类自由
  13. 制作elasticsearch 镜像_相册视频制作-相册视频制作App下载-
  14. css3 svg 背景图 data:image/svg+xml;base64
  15. 树莓派配置环境细节(JDK+pycharm+miniconda+pyqt5+opencv-python)
  16. 【C/C++】龙格库塔+亚当姆斯求解数值微分初值问题
  17. CSS——css外部样式文件的引入
  18. iOS中把故事板中视功能和美工结合在1起
  19. paypal ipn java_PayPal IPN验证
  20. 第七课 Java基础篇——阶段性综合练习

热门文章

  1. CSDN第一何方神圣?(附前十排名)
  2. Transaction Processing Monitor(事务处理监视器),TP服务,TP监控服务
  3. Problem C: Sums
  4. 无gui服务器上使用matplotlib
  5. 线性代数:矩阵空间的理解
  6. 十六氯酞菁铁(FePcCl16)|132-16-1 酞菁铁(II)|齐岳生物供应
  7. cvpr eccv iccv icml ijcai tpami ijcv 2017 2018 papers
  8. 苹果服务器暂停响应怎么打开,iPhone 应用卡死、无响应或无法退出怎么办?
  9. 微信小程序采坑之连接MySql数据库
  10. Unity 出现error CS0103: The name ‘AssetDatabase‘ does not exist in the current context