一 RawComparator介绍

Hadoop支持对序列化的二进制流直接进行比较。相比于对序列化二进制流进行反序列化再进行序列化,这种方式效率更高。

RawComparator接口就是用来进行序列化字节之间的比较的。该接口继承了Comparator接口,提供了一个compare方法:publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);

b1:第一个对象所在字节数组

s1:该对象在b1中的起始位置

l1:该对象在b1中的长度

b2:第二个对象所在字节数组

s2:该对象在b2的起始位置

l2:该对象在b2中的长度

二 二次排序

有时候,一个文件有多个字段,但是我们希望首先根据第一个字段排序,然后key相同的情况下,在进行第二个字段的排序。

要点:

2.1自定义实现一个WritableComprable<T>的类,这个需要定义需要比较的字段。

因为我们只能对key比较,所以就应该在key上想办法。试想又想同时对2个字段排序,又只能对key排序,所以,我们就需要自定义一个复合类型由需要比较的两个字段组成,这个复合类型可以对多个字段进行比较。

publicstatic classPairWritableimplementsWritableComparable<PairWritable> {

private intfirst;

private intsecond;

public PairWritable() {

}

public PairWritable(intfirst, intsecond) {

this.first =first;

this.second =second;

}

public voidwrite(DataOutputout) throws IOException {

out.writeInt(first);

out.writeInt(second);

}

public voidreadFields(DataInputin) throws IOException {

this.first =in.readInt();

this.second =in.readInt();

}

public voidset(intleft, intright) {

first = left;

second = right;

}

@Override

public inthashCode() {

final intprime = 31;

int result =1;

result = prime *result + first;

result = prime *result + second;

return result;

}

@Override

public booleanequals(Object obj) {

if (this ==obj)

return true;

if (obj ==null)

return false;

if (getClass() !=obj.getClass())

return false;

PairWritable other = (PairWritable)obj;

if (first !=other.first)

return false;

if (second !=other.second)

return false;

return true;

}

public intgetFirst() {

return first;

}

public intgetSecond() {

return second;

}

/** A Comparator that compares serializedIntPair. */

public staticclass Comparator extends WritableComparator{

public Comparator() {

super(PairWritable.class);

}

@Override

public intcompare(byte[]b1, ints1, intl1, byte[]b2, ints2, intl2) {

return compareBytes(b1,s1, l1, b2, s2, l2);

}

}

static {// register this comparator

WritableComparator.define(PairWritable.class,new Comparator());

}

public intcompareTo(PairWritableo) {

if (first !=o.first) {

return first < o.first ? -1 :1;

}else if (second !=o.second) {

return second < o.second ? -1 :1;

}else {

return 0;

}

}

}

2.2编写map函数 和 reduce函数

其中我们自定义的组合key类型作为OutputKey,而OutputValue还是和以前保持一致

2.3接着map就要开始shuffle操作, 首先要进行分区

以前的分区算法是基于key(也就是第一个字段),现在我们的key是复合类型,那势必可能和以前的分区不一样了,如果想保持以前的分区,我们需要自己实现。

publicstatic class SortPartitionerextends Partitioner<PairWritable, IntWritable> {

@Override

public intgetPartition(PairWritable key, IntWritable value, intnumPartitions) {

return (key.hashCode() & Integer.MAX_VALUE) %numPartitions;

}

}

2.4 分区之后,我们知道会进行排序操作,默认的排序规则就是先根据partition排序,然后如果partition相同,则根据key排序。

Key要排序:

# 我们指定了比较器

如果我们对于key没有指定具体比较器,那么,就会根据我们指定的比较器进行排序

# 我们没有指定比较器

就会根据MapOutputKey也就是我们自定义的复合key的compareTo方法进行排序,当然我们之前的代码里已经实现了。

2.5 然后就是Reduce阶段的排序

它的原理跟map阶段默认排序是一样的。发生在merge的时候。也是没有指定比较器,就需要根据key的compareTo方法进行排序。

2.6 开始分组

在reduce之前,会对结果进行一个归并或者分组操作。很显然,如果我们以自定义的类型去参与分组,结果是有问题的。所以我们需要保持和原始的key的类型分组一致。

也就是在分组的时候,我们只需要让(field1,field2)字段一参与分组。

如果我们指定了分组比较器,那么就使用我们自定义的 分组比较器。

否则还是按照OutputKey的compareTo方法进行比较,看是否是属于同一个key。

publicstatic class SortGroupComparatorimplements RawComparator<PairWritable> {

public intcompare(PairWritable o1, PairWritable o2) {

int l =o1.getFirst();

int r =o2.getFirst();

return l == r ? 0 : (l <r ? -1 : 1);

}

public intcompare(byte[]b1, ints1, intl1, byte[]b2, ints2, intl2) {

return WritableComparator.compareBytes(b1,s1, Integer.SIZE /8, b2, s2, Integer.SIZE /8);

}

}

2.6 编写mapper函数和reduce函数

publicstatic class SortMapperextendsMapper<LongWritable, Text, PairWritable, IntWritable> {

private PairWritable outputKey = new PairWritable();

private IntWritable outputValue = new IntWritable();

@Override

protected voidmap(LongWritable key, Textvalue, Mapper<LongWritable, Text, PairWritable,

IntWritable>.Contextcontext) throws IOException, InterruptedException {

if (value ==null) {

return;

}

String[]array = value.toString().split("");

if (ArrayUtils.isEmpty(array)) {

return;

}

int field1 = Integer.parseInt(array[0]);

int field2 = Integer.parseInt(array[1]);

outputKey.set(field1,field2);

outputValue.set(field2);

context.write(outputKey,outputValue);

}

}

publicstatic class SortReducerextends Reducer<PairWritable, IntWritable, Text, IntWritable> {

private staticfinal Text SEPARATOR =new Text("------------------------------------------------");

private final Textfirst = newText();

@Override

protected voidreduce(PairWritable key, Iterable<IntWritable> values,

Reducer<PairWritable,IntWritable, Text, IntWritable>.Contextcontext)

throws IOException, InterruptedException {

context.write(SEPARATOR,null);

StringfVal = Integer.toString(key.first);

first.set(fVal);

for (IntWritable value :values) {

context.write(first,value);

}

}

}

MapReduce之二次排序相关推荐

  1. hadoop之MapReduce自定义二次排序流程实例详解

    一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求 ...

  2. mapreduce的二次排序 SecondarySort

    mapreduce的二次排序 SecondarySort 关于二次排序主要涉及到这么几个东西: 在0.20.0 以前使用的是 setPartitionerClass setOutputkeyCompa ...

  3. MapReduce自定义二次排序流程

    每一条记录开始是进入到map函数进行处理,处理完了之后立马就入自定义分区函数中对其进行分区,当所有输入数据经过map函数和分区函数处理完之后,就调用自定义二次排序函数对其进行排序. MapReduce ...

  4. MapReduce实现二次排序续(十)

    文章目录 1. 前言 2. 换一种文件格式 3. 代码做部分修正 4. 效果截图 5. 小结 1. 前言 上一篇文章实现的二次排序key和value都是数字,接下来实现一组key为字母或单词,valu ...

  5. java mapreduce教程_Java搭建MapReduce完成二次排序步骤

    1.构建新的作业 Configuration conf=getConf(); Job job=Job.getInstance(conf); job.setJarByClass(SortYearAndT ...

  6. MapReduce二次排序

    2019独角兽企业重金招聘Python工程师标准>>> 默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候需要对Key排序的同时还需要对Value进行排序,这时候就要用 ...

  7. 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

       前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...

  8. 详细讲解MapReduce二次排序过程

    2019独角兽企业重金招聘Python工程师标准>>> 我在15年处理大数据的时候还都是使用MapReduce, 随着时间的推移, 计算工具的发展, 内存越来越便宜, 计算方式也有了 ...

  9. Hadoop Mapreduce分区、分组、二次排序过程详解

    2019独角兽企业重金招聘Python工程师标准>>> 1.MapReduce中数据流动    (1)最简单的过程:  map - reduce    (2)定制了partition ...

最新文章

  1. 认知科学顶刊:挑战过去50年神经科学观点,人类智力的优势或来自于记忆储存方式...
  2. 旋转的Apriltag码
  3. 如何解决ajax重复提交的问题
  4. 自定义MyHttpServletRequest解决过滤器拦截@RequestBody整体JSON请求问题
  5. wangEditor编辑器在laravel中上传图片(二)
  6. 空间复杂度分段分段有序数组合并成有序(空间复杂度为O(1))
  7. SVD在推荐系统中的应用详解以及算法推导
  8. JS修改CSS的三种方式
  9. 专访阿里 iDST 语音组总监鄢志杰:智能语音交互从技术到产品,有哪些坑和细节要注意?
  10. Serekh塞拉赫资源包背后的创作过程
  11. Webpack初学者介绍
  12. 《游戏行业DDoS攻击解决方案》重磅发布
  13. Magento给产品添加“new”或者折扣数量标签 magento new label. discount label
  14. 递归 - 求数字/字符串的全排列
  15. python基于dict、defaultdict、Counter的累加器
  16. 摩托罗拉Edge真机谍照曝光:挖孔瀑布屏+骁龙765
  17. 10000元重金奖励:谁能开发这样的编辑软件程序,编辑软件功能说明
  18. 小新 无法开机 联想_19年最“狠”轻薄本诞生 联想小新Pro 13优缺点一览
  19. bgp状态idle什么原因_27-高级路由:BGP状态
  20. C#-学生信息管理系统

热门文章

  1. dreamcast游戏_《Dreamcast Collection》开箱及游戏介绍
  2. 分层架构、六边形架构、CQRS架构模式解读
  3. 数据科学入门与实战:Matplotlib绘图DateFrame
  4. python的knn算法list_[机器学习]kNN算法python实现(实例:数字识别)
  5. distenct oracle_oracle中distinct的用法详解
  6. ARM中断产生和管理
  7. idata界面_iData手持终端常见问题集,持续更新中...
  8. atxserver运行没有反应_硫酸盐对厌氧系统的影响及运行注意事项!
  9. jsp 动态添加一行数据_大数据从入门到深入:JavaEE 之 动态网页开发基础 JSP的数据交互(3)...
  10. fastapi学习(一):输出hello world与基本运行方法