在直接学习hadoop的排序之前还要了解一些基本知识。

Hadoop的序列化和比较接口

Hadoop的序列化格式:Writable

Writable是Hadoop自己的序列化格式,还要一个子接口是WritableComparable<T>,

public interface WritableComparable<T> extends Writable, Comparable<T>

这样一来WritableComparable接口不仅有序列化的功能,还可以进行比较。

排序在MapReduce是很重要的一个方面,因为MapReduce有一个基于键的排序过程,所以可以作为键的类型必须具有Comparable<T>的特性。

除了WritableComparable接口之外,还有一个叫做RawComparator的接口。

WritableComparable和RawComparator的区别是:

WritableComparable需要把数据反序列化为对象,然后做对象之间的比较;RawComparator是直接比较数据流的数据,不需要数据反序列化成对象,从而省去了新建对象的开销。

Hadoop中key的排序逻辑

Hadoop中key的数据类型的排序逻辑其实就是按照WritableComparable<T>的基本数据类型和其他类型的compareTo方法的定义。

Key的排序规则:

  1. 如果调用了jobconf的setOutputKeyComparatorClass(),那么就使用设置的;
  2. 否则,采用实现接口WritableComparable的compareTo()来比较大小并排序。

例如IntWritable的比较算法

public int compareTo(Object o) {  int thisValue = this.value;  int thatValue = ((IntWritable)o).value;  return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}

我们可以根据自己的需要来修改compareTo()这个方法来实现自己的比价算法。

在Hadoop内部是自动调用这个方法来进行比较的,不需要我们手动干预,而且排序只是局限于map或者reduce内部。针对于map和map之间,reduce与reduce之间的排序compareTo就管不着了。虽然这种情况不常出现,但是还是有,比如全排序。

全排序

如果遇到了全排序,就要关注Partition这个阶段,Partition阶段是针对每个Reduce创建一个分区,然后把Map的输出结果映射到特定的分区中,这个分区中可能会有N个Key对应的数据,但是一个Key只能出现在一个分区中。在实现全排序的过程中,如果只有一个Reduce,也就是只有一个Partition,那么所有Map的输出都会经过一个Partition到一个reduce里,一个reduce里就可以根据compareTo来排序,这样就实现了全排序。但是这样不好的地方就是你全排序都是在一个Reduce中做的,这还叫毛的分布式计算啊。

因此,为了take advantage of distributed computing,我们需要使用多个Reduce,也就是有多个分区,而且这些分区的特点是两个已经排序完毕的partition1和partition2,partition1中的根据排序规则要么全部大于partition2,要么全部小于partition2。

这样做可以利用到分布式计算的特性,但是仍然要考虑到的是分区不均的问题,也就是一个partition中的数要远比其他的partition中的多,这样的话对分布式计算的效果也会大打折扣。解决这个问题的方法就是抽样,目的就是使partition更加均匀。

二次排序

代码实例

需求:现在有这样类型的文件若干

file1.txt

2
32
654
32
15
756
65223

file2.txt

5956
22
650
92

file3.txt

26
54
6

现在希望你用Hadoop进行处理并得到如下结果:

1     2
2     6
3     15
4     22
5     26
6     32
7     32
8     54
9     92
10    650
11    654
12    756
13    5956
14    65223

注:本代码是在Hadoop 2.7.4真实分布式环境下执行通过的。

当然最简单的就是多个Map,一个Reduce;因为Reduce在处理的时候是默认的会排序的。所以可以得到一个最简单的版本。

Version1:最简单版本,多个Map,单个Reduce

package com.tuhooo;import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class Sort {// map将输入中的value化成IntWritable类型, 作为输出的keypublic static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {private static IntWritable data = new IntWritable();// 实现map函数
        @Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();if(line != null && line.length() != 0 && !"".equals(line)) {data.set(Integer.parseInt(line));context.write(data, new IntWritable(1));}}}// reduce会对map传进来的值进行排序public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {private static IntWritable linenum = new IntWritable(1);@Overridepublic void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for(IntWritable val : values) {context.write(linenum, key);linenum = new IntWritable(linenum.get() + 1);}}}public static void main(String[] args) throws Exception {// 校验参数是否正确if (args.length != 2) {System.err.println("Usage: MaxTemperature <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(Sort.class); // main Classjob.setJobName("Sort");FileInputFormat.addInputPath(job, new Path(args[0]));   // 输入数据的目录FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出数据的目录
job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

通过以上的讲解我们知道这得分区啊,于是有了下面的版本

Version2:M个Map,N个Reduce,N个Partition,均值分区不采样

package com.tuhooo;import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class Sort {// map将输入中的value化成IntWritable类型, 作为输出的keypublic static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {private static IntWritable data = new IntWritable();// 实现map函数
        @Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();if(line != null && line.length() != 0 && !"".equals(line)) {data.set(Integer.parseInt(line));context.write(data, new IntWritable(1));}}}// reduce会对map传进来的值进行排序public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {private static IntWritable linenum = new IntWritable(1);@Overridepublic void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for(IntWritable val : values) {context.write(linenum, key);linenum = new IntWritable(linenum.get() + 1);}}}public static class MyPartition extends Partitioner<IntWritable, IntWritable> {@Overridepublic int getPartition(IntWritable key, IntWritable value, int numTaskReduce) {int maxNumber = 65223;    // 样本数据中的最大值int part = maxNumber/numTaskReduce + 1;int keyNum = key.get();for(int i=0; i<numTaskReduce; i++) {if(keyNum >= part*i && keyNum <= part*(i+1)) {return i;}}return -1;     // 如果没有出现在分区里面就会返回-1, 如果返回-1肯定是要报错的
        }}public static void main(String[] args) throws Exception {// 校验参数是否正确if (args.length != 2) {System.err.println("Usage: MaxTemperature <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(Sort.class); // main Classjob.setJobName("Sort");FileInputFormat.addInputPath(job, new Path(args[0]));   // 输入数据的目录FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出数据的目录
        job.setNumReduceTasks(3);                     // Reduce任务数为3job.setPartitionerClass(MyPartition.class);   // 设置分区的类
job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

Version2的代码基本上可以实现分区排序的功能,但是并没有采样啊,就可能会导致分区不均匀导致有些task任务很重,所以我们要加入采样。因此有了Version3。

Version3:M个Map,N个Reduce,N个Partition(此时采用的是TotalSortPartitioner,并且随机采样)

package com.tuhooo;import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class Sort {// map将输入中的value化成LongWritable类型, 作为输出的keypublic static class Map extends Mapper<Object, Text, LongWritable, LongWritable> {private static LongWritable data = new LongWritable();// 实现map函数
        @Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();if(line != null && line.length() != 0 && !"".equals(line)) {data.set(Integer.parseInt(line));context.write(data, new LongWritable(1));}}}public static class Reduce extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {private static LongWritable linenum = new LongWritable(1);@Overridepublic void reduce(LongWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {for(LongWritable val : values) {context.write(linenum, key);linenum = new LongWritable(linenum.get() + 1);}}}public static void main(String[] args) throws Exception {if (args.length != 3) {System.err.println("Usage: MaxTemperature <input path> <output path> <partition path>");System.exit(-1);}Job job = new Job();job.setJarByClass(Sort.class); // main Classjob.setJobName("Sort");job.setNumReduceTasks(3);FileInputFormat.addInputPath(job, new Path(args[0])); // 输入数据的目录FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出数据的目录
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));InputSampler.Sampler<LongWritable, LongWritable> sampler = new InputSampler.RandomSampler<LongWritable, LongWritable>(0.5, 5, 2);InputSampler.writePartitionFile(job, sampler);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);// 这里换成了TotalOrderPartitionerjob.setPartitionerClass(TotalOrderPartitioner.class);// job.setPartitionerClass(MyPartition.class);job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(LongWritable.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

在编写并运行Version3的时候踩了很多坑:

1. 随机采样的参数设置问题,由于我的样本文件中的数据很少,因此我猜测采样出来的数也很少,导致报异常:数组越界,

此时我采用的参数为:

InputSampler.Sampler<LongWritable, LongWritable> sampler = new InputSampler.RandomSampler<LongWritable, LongWritable>(0.01, 500, 10);

我感觉是没有采到样导致的。

17/11/16 17:31:51 INFO input.FileInputFormat: Total input paths to process : 3
17/11/16 17:31:51 INFO partition.InputSampler: Using 0 samples
17/11/16 17:31:51 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
17/11/16 17:31:51 INFO compress.CodecPool: Got brand-new compressor [.deflate]
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
         at org.apache.hadoop.mapreduce.lib.partition.InputSampler.writePartitionFile(InputSampler.java:340)
         at com.tuhooo.Sort.main(Sort.java:67)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
         at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

2. 如果你有发现的话,最开始在代码Version1和代码Version2中,我用的都是IntWritable,但是在代码Version3中我用的是LongWritable,这是为啥捏。如果我继续在Version3中使用IntWritable的时候,采样文件的结果是这样的:

[root@hadoop Sort]# hdfs dfs -cat /partitionFile_heheda
SEQ!org.apache.hadoop.io.LongWritable!org.apache.hadoop.io.NullWritable*org.apache.hadoop.io.compress.DefaultCodec▒]▒S▒3n▒l▒▒▒▒Axx▒[root@hadoop Sort]#

后来一直报错就是IntWritable不是LongWritable,就是下面这样的:

Error: java.lang.IllegalArgumentException: Can't read partitions file
         at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:116)
         at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)
         at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
         at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:702)
         at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
         at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
         at java.security.AccessController.doPrivileged(Native Method)
         at javax.security.auth.Subject.doAs(Subject.java:422)
         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
         at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.LongWritable
         at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2329)
         at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2381)
         at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.readPartitions(TotalOrderPartitioner.java:306)
         at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:88)
         ... 10 more

大数据路长漫漫~~~

转载于:https://www.cnblogs.com/tuhooo/p/7833691.html

MapReduce中的排序(附代码)相关推荐

  1. Hadoop学习笔记—11.MapReduce中的排序和分组

    Hadoop学习笔记-11.MapReduce中的排序和分组 一.写在之前的 1.1 回顾Map阶段四大步骤 首先,我们回顾一下在MapReduce中,排序和分组在哪里被执行: 从上图中可以清楚地看出 ...

  2. Ionic+Angular实现中英国际化(附代码下载)

    场景 Ionic介绍以及搭建环境.新建和运行项目: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/106308166 在上面搭建起 ...

  3. 中希尔排序例题代码_【数据结构与算法】这或许是东半球分析十大排序算法最好的一篇文章...

    码农有道 历史文章目录(请戳我) 关于码农有道(请戳我) 前言 本文全长 14237 字,配有 70 张图片和动画,和你一起一步步看懂排序算法的运行过程. 预计阅读时间 47 分钟,强烈建议先收藏然后 ...

  4. 中希尔排序例题代码_超全面分析十大排序算法

    点击上方"零一视界",选择"星标"公众号 资源干货,第一时间送达 作者 | 不该相遇在秋天 责编 | 程序员小吴 前言 本文全长 14237 字,配有 70 张 ...

  5. 单元测试在深度学习中的应用 | 附代码「AI产品/工程落地」

    关注:决策智能与机器学习,深耕AI脱水干货 作者 |   Tilman Krokotsch 编译 |   ronghuaiyang   报道 |  AI公园 导读 本文非常详细的介绍并演示了如何将单元 ...

  6. 中希尔排序例题代码_十大经典排序算法最强总结

    排序算法属于经典基础算法基本功,笔试面试基本都会涉及和考察的,有原题也有变化,不过基础的几大排序算法还是得尽可能熟悉,能在思路熟悉的前提下手写出代码就更好了. ❝为了防止不提供原网址的转载,特加原文链 ...

  7. return在php中用法,细致解读PHP中return用法(附代码)_后端开发

    在大部分编程言语中,return关键字能够将函数的实行效果返回,PHP中return的用法也迥然不同,对初学者来讲,控制PHP中return的用法也是进修PHP的一个入手下手. 起首,它的意义就是返回 ...

  8. JavaScript 中常见排序算法详解

    十大经典算法 一张图概括: 名词解释: n:数据规模 k:"桶"的个数 In-place:占用常数内存,不占用额外内存 Out-place:占用额外内存 稳定性:排序后2个相等键值 ...

  9. Hadoop大数据——mapreduce中的Combiner/序列化/排序初步

    mapreduce中的Combiner (1)combiner是MR程序中Mapper和Reducer之外的一种组件 (2)combiner组件的父类就是Reducer (3)Combiner和red ...

最新文章

  1. 厉害了!单点登录系统用 8 张漫画就解释了。。。
  2. DB time抖动的原因分析
  3. python基础教程:可变,不可变数据类型
  4. MATLAB代写要求应该怎么写,matlab/simulink程序代写
  5. koa2:通过Ajax方式上传文件,使用FormData进行Ajax请求
  6. python hashlib安装_Hashlib加密,内置函数,安装操作数据库
  7. 创建一个显示所有预定义系统颜色的ListBox
  8. enable pen pressure in ps
  9. 踩过坑的可控硅设计 经验总结
  10. centos网卡配置
  11. android手机控制家用电器,手机遥控电脑!教你用手机控制家里电脑
  12. echarts柱状图加上渐变色报错问题
  13. 45句绝美宋词,哪一句触动了你?
  14. 弄了一整天,终于把打印自定义纸张大小搞定了
  15. java 实现写出倒立三角形的几种方法
  16. 微信公众平台群发消息里,为什么一天只能发一条
  17. Bug:No artifacts configured
  18. 华为鸿蒙电脑操作系统测试版,华为鸿蒙OS测试
  19. postman打不开
  20. java ActionListener 接口如何判断触发事件来源。getSource()和 getActionCommand()

热门文章

  1. anaconda Pycharm jupyter环境配置教程(最后一次写了!!!)
  2. 国土空间规划的体系和内容
  3. 二叉树线索化示意图_二叉树的线索化
  4. Content-Type
  5. java 柱状图jar_GitHub - mafulong/NetworkExper: 计网实验,抓包,java,jigloo界面开发,柱状图,文件自定义保存...
  6. 网络/系统调试维护笔记
  7. mysql不使用swapp的原因_SolidWorks不能使用的原因
  8. java中的map是什么_转载java中Map的详解
  9. opencv4.4.0函数手册_【文档更新】发布100ask_imx6ull用户手册V2.0和全新烧写工具
  10. 小汤学编程之JavaEE学习day03——JSP组成结构、JSP原理、JSP生命周期、JSP九大内置对象、四大作用域、JSP的MVC模式