
  • map端合并数据,减少网络io
    • 一、普通的combiner
    • 二、自定义combiner,实现自由合并


前言:在map端使用combiner合并数据可以减少需要通过网络io的数据,有效增加map reduce程序的运行效率。


但是这种优化策略有两个缺陷,一个是数据量要比较大,不过考虑到map reduce程序处理的数据一般都是大量的数据,所以这个问题不是关键。


zhangsan 英语  60
zhangsan    政治  70
zhangsan    化学  80
lisi    英语  60
lisi    政治  70
lisi    化学  80
wanger  英语  60
wanger  政治  70
wanger  化学  80


zhangsan 语文  60
zhangsan    数学  70
lisi    语文  60
lisi    数学  70
wanger  语文  60
wanger  数学  70


package cn.yy.hadoop.mapreduce.average;import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** description: a mapper program for get average number from data files* author: bob yy* since: 1.8**/
public class AverageMapper extends Mapper<LongWritable, Text, Text, FloatWritable> {private Text text = new Text();private FloatWritable number = new FloatWritable();/*** get student and score from input file, push its to reduce** @param key     line number* @param value   a line data of input file* @param context context of map reduce* @throws IOException          write* @throws InterruptedException write*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] row = value.toString().split("\t");String student = row[0];String score = row[2];text.set(student);number.set(Float.parseFloat(score));context.write(text, number);}


package cn.yy.hadoop.mapreduce.average;import cn.yy.test.Test;
import com.sun.tools.javac.comp.Flow;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** description: a reducer program for get average number from mapper output* author: bob yy* since: 1.8**/
public class AverageReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {private FloatWritable avg = new FloatWritable();/*** add sum and count,get average by sum/count.** @param key     student flag* @param values  scores of key student* @param context context of map reduce, write data by the context.* @throws IOException          write* @throws InterruptedException write*/@Overrideprotected void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {float sum = 0;int count = 0;for (FloatWritable value : values) {sum += value.get();count++;}avg.set(sum / count);context.write(key, avg);}


/*** get average use map reduce** @throws IOException            getInstance* @throws ClassNotFoundException waitForCompletion* @throws InterruptedException   waitForCompletion*/private void getAverage() throws IOException, ClassNotFoundException, InterruptedException {String in = "J:\\data\\average\\input";String out = "J:\\data\\average\\output";Path inPath = new Path(in);Path outPath = new Path(out);// 删除该文件LocalFileSystem.deleteFile(out);Job job = Job.getInstance();job.setMapperClass(AverageMapper.class);job.setReducerClass(AverageReducer.class);// set output format of mapper and reducerjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FloatWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FloatWritable.class);// set input and output pathFileInputFormat.setInputPaths(job, inPath);FileOutputFormat.setOutputPath(job, outPath);job.waitForCompletion(true);}


lisi 68.0
wanger  68.0
zhangsan    68.0


/*** it is fail, combiner of get average must be self defined; combiner of map side added count number, reduced* average.** @throws IOException            getInstance* @throws ClassNotFoundException waitForCompletion* @throws InterruptedException   waitForCompletion*/private void getAverageNoCombiner() throws IOException, ClassNotFoundException, InterruptedException {String in = "J:\\data\\average\\input";String out = "J:\\data\\average\\output_noCombiner";Path inPath = new Path(in);Path outPath = new Path(out);// 删除该文件LocalFileSystem.deleteFile(out);Job job = Job.getInstance();job.setMapperClass(AverageMapper.class);job.setReducerClass(AverageReducer.class);// set output format of mapper and reducerjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FloatWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FloatWritable.class);// set Combiner no definejob.setCombinerClass(AverageReducer.class);// set input and output pathFileInputFormat.setInputPaths(job, inPath);FileOutputFormat.setOutputPath(job, outPath);job.waitForCompletion(true);}


lisi 67.5
wanger  67.5
zhangsan    67.5




package cn.yy.hadoop.mapreduce.average;import cn.yy.test.Test;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** description: a define Average Combiner for get average number* author: bob yy* since: 1.8**/
public class AverageCombiner extends Reducer<Text, Text, Text, Text> {private Text text = new Text();/*** get count and sum of student score, push it to reducer by use context write Text; the combiner output data* is result of a mapper, so data is reduced.** @param key     student from mapper* @param values  scores of student* @param context write count and sum to reducer* @throws IOException          write* @throws InterruptedException write*/@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {int sum = 0;int count = 0;for (Text value : values) {sum += Integer.parseInt(value.toString());count++;}text.set("" + sum + "\t" + count);context.write(key, text);}


package cn.yy.hadoop.mapreduce.average;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** description: a mapper program for use combiner get average number,combiner get data from mapper output* author: bob yy* since: 1.8**/
public class CombinerMapper extends Mapper<LongWritable, Text, Text, Text> {private Text student = new Text();private Text score = new Text();/*** get student and score, serially score by Text for unique output format of mapper and reducer.** @param key     line number* @param value   line data of input file, include student and score...* @param context context of map reduce, write key value to reducer by the context* @throws IOException          write* @throws InterruptedException write*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");student.set(split[0]);score.set(split[2]);context.write(student, score);}


package cn.yy.hadoop.mapreduce.average;import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** description: a reducer for define combin,it get data from combiner output* author: bob yy* since: 1.8**/
public class CombinerReducer extends Reducer<Text, Text, Text, FloatWritable> {private FloatWritable avg = new FloatWritable();/*** count score number and score sum of a student, run sum/count(score number) get average; because cache* count(score number) so that combiner not lose count.** @param key     student flag* @param values  some count and sum of a student* @param context use context write student and average to file* @throws IOException          write* @throws InterruptedException write*/@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {float sum = 0;float count = 0;for (Text value : values) {String[] split = value.toString().split("\t");sum += Integer.parseInt(split[0]);count += Integer.parseInt(split[1]);}avg.set(sum / count);context.write(key, avg);}


/*** define combiner, get average use map reduce and add define combiner.** @throws IOException            getInstance* @throws ClassNotFoundException waitForCompletion* @throws InterruptedException   waitForCompletion*/private void getAverageByCombiner() throws IOException, ClassNotFoundException, InterruptedException {String in = "J:\\data\\average\\input";String out = "J:\\data\\average\\output_Combiner";Path inPath = new Path(in);Path outPath = new Path(out);// 删除该文件LocalFileSystem.deleteFile(out);Job job = Job.getInstance();// use define combiner for get average numberjob.setMapperClass(CombinerMapper.class);job.setReducerClass(CombinerReducer.class);// set output format of mapper and reducerjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FloatWritable.class);// set combinerjob.setCombinerClass(AverageCombiner.class);// set input and output pathFileInputFormat.setInputPaths(job, inPath);FileOutputFormat.setOutputPath(job, outPath);job.waitForCompletion(true);}


lisi 68.0
wanger  68.0
zhangsan    68.0



