MapReduce之二次排序
一 RawComparator介绍
Hadoop支持对序列化的二进制流直接进行比较。相比于对序列化二进制流进行反序列化再进行序列化,这种方式效率更高。
RawComparator接口就是用来进行序列化字节之间的比较的。该接口继承了Comparator接口,提供了一个compare方法:publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
b1:第一个对象所在字节数组
s1:该对象在b1中的起始位置
l1:该对象在b1中的长度
b2:第二个对象所在字节数组
s2:该对象在b2的起始位置
l2:该对象在b2中的长度
二 二次排序
有时候,一个文件有多个字段,但是我们希望首先根据第一个字段排序,然后key相同的情况下,在进行第二个字段的排序。
要点:
2.1自定义实现一个WritableComprable<T>的类,这个需要定义需要比较的字段。
因为我们只能对key比较,所以就应该在key上想办法。试想又想同时对2个字段排序,又只能对key排序,所以,我们就需要自定义一个复合类型由需要比较的两个字段组成,这个复合类型可以对多个字段进行比较。
publicstatic classPairWritableimplementsWritableComparable<PairWritable> {
private intfirst;
private intsecond;
public PairWritable() {
}
public PairWritable(intfirst, intsecond) {
this.first =first;
this.second =second;
}
public voidwrite(DataOutputout) throws IOException {
out.writeInt(first);
out.writeInt(second);
}
public voidreadFields(DataInputin) throws IOException {
this.first =in.readInt();
this.second =in.readInt();
}
public voidset(intleft, intright) {
first = left;
second = right;
}
@Override
public inthashCode() {
final intprime = 31;
int result =1;
result = prime *result + first;
result = prime *result + second;
return result;
}
@Override
public booleanequals(Object obj) {
if (this ==obj)
return true;
if (obj ==null)
return false;
if (getClass() !=obj.getClass())
return false;
PairWritable other = (PairWritable)obj;
if (first !=other.first)
return false;
if (second !=other.second)
return false;
return true;
}
public intgetFirst() {
return first;
}
public intgetSecond() {
return second;
}
/** A Comparator that compares serializedIntPair. */
public staticclass Comparator extends WritableComparator{
public Comparator() {
super(PairWritable.class);
}
@Override
public intcompare(byte[]b1, ints1, intl1, byte[]b2, ints2, intl2) {
return compareBytes(b1,s1, l1, b2, s2, l2);
}
}
static {// register this comparator
WritableComparator.define(PairWritable.class,new Comparator());
}
public intcompareTo(PairWritableo) {
if (first !=o.first) {
return first < o.first ? -1 :1;
}else if (second !=o.second) {
return second < o.second ? -1 :1;
}else {
return 0;
}
}
}
2.2编写map函数 和 reduce函数
其中我们自定义的组合key类型作为OutputKey,而OutputValue还是和以前保持一致
2.3接着map就要开始shuffle操作, 首先要进行分区
以前的分区算法是基于key(也就是第一个字段),现在我们的key是复合类型,那势必可能和以前的分区不一样了,如果想保持以前的分区,我们需要自己实现。
publicstatic class SortPartitionerextends Partitioner<PairWritable, IntWritable> {
@Override
public intgetPartition(PairWritable key, IntWritable value, intnumPartitions) {
return (key.hashCode() & Integer.MAX_VALUE) %numPartitions;
}
}
2.4 分区之后,我们知道会进行排序操作,默认的排序规则就是先根据partition排序,然后如果partition相同,则根据key排序。
Key要排序:
# 我们指定了比较器
如果我们对于key没有指定具体比较器,那么,就会根据我们指定的比较器进行排序
# 我们没有指定比较器
就会根据MapOutputKey也就是我们自定义的复合key的compareTo方法进行排序,当然我们之前的代码里已经实现了。
2.5 然后就是Reduce阶段的排序
它的原理跟map阶段默认排序是一样的。发生在merge的时候。也是没有指定比较器,就需要根据key的compareTo方法进行排序。
2.6 开始分组
在reduce之前,会对结果进行一个归并或者分组操作。很显然,如果我们以自定义的类型去参与分组,结果是有问题的。所以我们需要保持和原始的key的类型分组一致。
也就是在分组的时候,我们只需要让(field1,field2)字段一参与分组。
如果我们指定了分组比较器,那么就使用我们自定义的 分组比较器。
否则还是按照OutputKey的compareTo方法进行比较,看是否是属于同一个key。
publicstatic class SortGroupComparatorimplements RawComparator<PairWritable> {
public intcompare(PairWritable o1, PairWritable o2) {
int l =o1.getFirst();
int r =o2.getFirst();
return l == r ? 0 : (l <r ? -1 : 1);
}
public intcompare(byte[]b1, ints1, intl1, byte[]b2, ints2, intl2) {
return WritableComparator.compareBytes(b1,s1, Integer.SIZE /8, b2, s2, Integer.SIZE /8);
}
}
2.6 编写mapper函数和reduce函数
publicstatic class SortMapperextendsMapper<LongWritable, Text, PairWritable, IntWritable> {
private PairWritable outputKey = new PairWritable();
private IntWritable outputValue = new IntWritable();
@Override
protected voidmap(LongWritable key, Textvalue, Mapper<LongWritable, Text, PairWritable,
IntWritable>.Contextcontext) throws IOException, InterruptedException {
if (value ==null) {
return;
}
String[]array = value.toString().split("");
if (ArrayUtils.isEmpty(array)) {
return;
}
int field1 = Integer.parseInt(array[0]);
int field2 = Integer.parseInt(array[1]);
outputKey.set(field1,field2);
outputValue.set(field2);
context.write(outputKey,outputValue);
}
}
publicstatic class SortReducerextends Reducer<PairWritable, IntWritable, Text, IntWritable> {
private staticfinal Text SEPARATOR =new Text("------------------------------------------------");
private final Textfirst = newText();
@Override
protected voidreduce(PairWritable key, Iterable<IntWritable> values,
Reducer<PairWritable,IntWritable, Text, IntWritable>.Contextcontext)
throws IOException, InterruptedException {
context.write(SEPARATOR,null);
StringfVal = Integer.toString(key.first);
first.set(fVal);
for (IntWritable value :values) {
context.write(first,value);
}
}
}
MapReduce之二次排序相关推荐
- hadoop之MapReduce自定义二次排序流程实例详解
一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求 ...
- mapreduce的二次排序 SecondarySort
mapreduce的二次排序 SecondarySort 关于二次排序主要涉及到这么几个东西: 在0.20.0 以前使用的是 setPartitionerClass setOutputkeyCompa ...
- MapReduce自定义二次排序流程
每一条记录开始是进入到map函数进行处理,处理完了之后立马就入自定义分区函数中对其进行分区,当所有输入数据经过map函数和分区函数处理完之后,就调用自定义二次排序函数对其进行排序. MapReduce ...
- MapReduce实现二次排序续(十)
文章目录 1. 前言 2. 换一种文件格式 3. 代码做部分修正 4. 效果截图 5. 小结 1. 前言 上一篇文章实现的二次排序key和value都是数字,接下来实现一组key为字母或单词,valu ...
- java mapreduce教程_Java搭建MapReduce完成二次排序步骤
1.构建新的作业 Configuration conf=getConf(); Job job=Job.getInstance(conf); job.setJarByClass(SortYearAndT ...
- MapReduce二次排序
2019独角兽企业重金招聘Python工程师标准>>> 默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候需要对Key排序的同时还需要对Value进行排序,这时候就要用 ...
- 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)
前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...
- 详细讲解MapReduce二次排序过程
2019独角兽企业重金招聘Python工程师标准>>> 我在15年处理大数据的时候还都是使用MapReduce, 随着时间的推移, 计算工具的发展, 内存越来越便宜, 计算方式也有了 ...
- Hadoop Mapreduce分区、分组、二次排序过程详解
2019独角兽企业重金招聘Python工程师标准>>> 1.MapReduce中数据流动 (1)最简单的过程: map - reduce (2)定制了partition ...
最新文章
- 认知科学顶刊:挑战过去50年神经科学观点,人类智力的优势或来自于记忆储存方式...
- 旋转的Apriltag码
- 如何解决ajax重复提交的问题
- 自定义MyHttpServletRequest解决过滤器拦截@RequestBody整体JSON请求问题
- wangEditor编辑器在laravel中上传图片(二)
- 空间复杂度分段分段有序数组合并成有序(空间复杂度为O(1))
- SVD在推荐系统中的应用详解以及算法推导
- JS修改CSS的三种方式
- 专访阿里 iDST 语音组总监鄢志杰:智能语音交互从技术到产品,有哪些坑和细节要注意?
- Serekh塞拉赫资源包背后的创作过程
- Webpack初学者介绍
- 《游戏行业DDoS攻击解决方案》重磅发布
- Magento给产品添加“new”或者折扣数量标签 magento new label. discount label
- 递归 - 求数字/字符串的全排列
- python基于dict、defaultdict、Counter的累加器
- 摩托罗拉Edge真机谍照曝光:挖孔瀑布屏+骁龙765
- 10000元重金奖励:谁能开发这样的编辑软件程序,编辑软件功能说明
- 小新 无法开机 联想_19年最“狠”轻薄本诞生 联想小新Pro 13优缺点一览
- bgp状态idle什么原因_27-高级路由:BGP状态
- C#-学生信息管理系统
热门文章
- dreamcast游戏_《Dreamcast Collection》开箱及游戏介绍
- 分层架构、六边形架构、CQRS架构模式解读
- 数据科学入门与实战:Matplotlib绘图DateFrame
- python的knn算法list_[机器学习]kNN算法python实现(实例:数字识别)
- distenct oracle_oracle中distinct的用法详解
- ARM中断产生和管理
- idata界面_iData手持终端常见问题集,持续更新中...
- atxserver运行没有反应_硫酸盐对厌氧系统的影响及运行注意事项!
- jsp 动态添加一行数据_大数据从入门到深入:JavaEE 之 动态网页开发基础 JSP的数据交互(3)...
- fastapi学习(一):输出hello world与基本运行方法