MapReduce的自制Writable分组输出及组内排序
问题描述:
输入文件格式如下:
name1 2
name3 4
name1 6
name1 1
name3 3
name1 0
要求输出的文件格式如下:
name1 0,1,2,6
name3 3,4
要求是按照第一列分组,name1与name3也是按照顺序排列的,组内升序排序。
思路:
常规的输出,无法排序key所对应的多个值的顺序。为了排序组内中的值,需要将key与value放在同一个组。Job中有两个方法setGroupingComparatorClass和setSortComparatorClass,可以利用这两个方法来实现组内排序。但是这些排序都是基于key的,则就要将key和value定义成组合键。
但是必须要保证第一列相同的全部都放在同一个分区中,则就需要自定义分区,分区的时候只考虑第一列的值。由于partitioner仅仅能保证每一个reducer接受同一个name的所有记录,但是reducer仍然是通过键进行分组的分区,也就说该分区中还是按照键来分成不同的组,还需要分组只参考name值
先按照name分组,再在name中内部进行排序。
解决方法:
运用自定义组合键的策略,将name和1定义为一个组合键。在分区的时候只参考name的值,即继承partitioner。
由于要按照name分组,则就需要定义分组策略,然后设置setGroupingComparatorClass。
setGroupingComparatorClass主要定义哪些key可以放置在一组,分组的时候会对组合键进行比较,由于这里只需要考虑组合键中的一个值,则定义实现一个WritableComparator,设置比较策略。
对于组内的排序,可以利用setSortComparatorClass来实现,
这个方法主要用于定义key如何进行排序在它们传递给reducer之前,
这里就可以来进行组内排序。
具体代码:
Hadoop版本号:hadoop1.1.2
自定义组合键
view sourceprint?01.
package
whut;
02.
import
java.io.DataInput;
03.
import
java.io.DataOutput;
04.
import
java.io.IOException;
05.
import
org.apache.hadoop.io.IntWritable;
06.
import
org.apache.hadoop.io.Text;
07.
import
org.apache.hadoop.io.WritableComparable;
08.
//自定义组合键策略
09.
//java基本类型数据
10.
public
class
TextInt
implements
WritableComparable{
11.
//直接利用java的基本数据类型
12.
private
String firstKey;
13.
private
int
secondKey;
14.
//必须要有一个默认的构造函数
15.
public
String getFirstKey() {
16.
return
firstKey;
17.
}
18.
public
void
setFirstKey(String firstKey) {
19.
this
.firstKey = firstKey;
20.
}
21.
public
int
getSecondKey() {
22.
return
secondKey;
23.
}
24.
public
void
setSecondKey(
int
secondKey) {
25.
this
.secondKey = secondKey;
26.
}
27.
28.
@Override
29.
public
void
write(DataOutput out)
throws
IOException {
30.
// TODO Auto-generated method stub
31.
out.writeUTF(firstKey);
32.
out.writeInt(secondKey);
33.
}
34.
@Override
35.
public
void
readFields(DataInput in)
throws
IOException {
36.
// TODO Auto-generated method stub
37.
firstKey=in.readUTF();
38.
secondKey=in.readInt();
39.
}
40.
//map的键的比较就是根据这个方法来进行的
41.
@Override
42.
public
int
compareTo(Object o) {
43.
// TODO Auto-generated method stub
44.
TextInt ti=(TextInt)o;
45.
//利用这个来控制升序或降序
46.
//this本对象写在前面代表是升序
47.
//this本对象写在后面代表是降序
48.
return
this
.getFirstKey().compareTo(ti.getFirstKey());
49.
}
50.
}
分组策略
view sourceprint?01.
package
whut;
02.
import
org.apache.hadoop.io.WritableComparable;
03.
import
org.apache.hadoop.io.WritableComparator;
04.
//主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组
05.
public
class
TextComparator
extends
WritableComparator {
06.
//必须要调用父类的构造器
07.
protected
TextComparator() {
08.
super
(TextInt.
class
,
true
);
//注册comparator
09.
}
10.
@Override
11.
public
int
compare(WritableComparable a, WritableComparable b) {
12.
// TODO Auto-generated method stub
13.
TextInt ti1=(TextInt)a;
14.
TextInt ti2=(TextInt)b;
15.
return
ti1.getFirstKey().compareTo(ti2.getFirstKey());
16.
}
17.
}
组内排序策略
view sourceprint?01.
package
whut;
02.
import
org.apache.hadoop.io.WritableComparable;
03.
import
org.apache.hadoop.io.WritableComparator;
04.
//分组内部进行排序,按照第二个字段进行排序
05.
public
class
TextIntComparator
extends
WritableComparator {
06.
public
TextIntComparator()
07.
{
08.
super
(TextInt.
class
,
true
);
09.
}
10.
//这里可以进行排序的方式管理
11.
//必须保证是同一个分组的
12.
//a与b进行比较
13.
//如果a在前b在后,则会产生升序
14.
//如果a在后b在前,则会产生降序
15.
@Override
16.
public
int
compare(WritableComparable a, WritableComparable b) {
17.
// TODO Auto-generated method stub
18.
TextInt ti1=(TextInt)a;
19.
TextInt ti2=(TextInt)b;
20.
//首先要保证是同一个组内,同一个组的标识就是第一个字段相同
21.
if
(!ti1.getFirstKey().equals(ti2.getFirstKey()))
22.
return
ti1.getFirstKey().compareTo(ti2.getFirstKey());
23.
else
24.
return
ti2.getSecondKey()-ti1.getSecondKey();
//0,-1,1
25.
}
26.
27.
}
分区策略
view sourceprint?01.
package
whut;
02.
import
org.apache.hadoop.io.IntWritable;
03.
import
org.apache.hadoop.mapreduce.Partitioner;
04.
//参数为map的输出类型
05.
public
class
KeyPartitioner
extends
Partitioner<TextInt, IntWritable> {
06.
@Override
07.
public
int
getPartition(TextInt key, IntWritable value,
int
numPartitions) {
08.
// TODO Auto-generated method stub
09.
return
(key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
10.
}
11.
}
MapReduce策略
view sourceprint?001.
package
whut;
002.
import
java.io.IOException;
003.
import
org.apache.hadoop.conf.Configuration;
004.
import
org.apache.hadoop.conf.Configured;
005.
import
org.apache.hadoop.fs.Path;
006.
import
org.apache.hadoop.io.IntWritable;
007.
import
org.apache.hadoop.io.Text;
008.
import
org.apache.hadoop.mapreduce.Job;
009.
import
org.apache.hadoop.mapreduce.Mapper;
010.
import
org.apache.hadoop.mapreduce.Reducer;
011.
import
org.apache.hadoop.mapreduce.Mapper.Context;
012.
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
013.
import
org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
014.
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
015.
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
016.
import
org.apache.hadoop.util.Tool;
017.
import
org.apache.hadoop.util.ToolRunner;
018.
//需要对数据进行分组以及组内排序的时候
019.
public
class
SortMain
extends
Configured
implements
Tool{
020.
//这里设置输入文格式为KeyValueTextInputFormat
021.
//name1 5
022.
//默认输入格式都是Text,Text
023.
public
static
class
GroupMapper
extends
024.
Mapper<Text, Text, TextInt, IntWritable> {
025.
public
IntWritable second=
new
IntWritable();
026.
public
TextInt tx=
new
TextInt();
027.
@Override
028.
protected
void
map(Text key, Text value, Context context)
029.
throws
IOException, InterruptedException {
030.
String lineKey=key.toString();
031.
String lineValue=value.toString();
032.
int
lineInt=Integer.parseInt(lineValue);
033.
tx.setFirstKey(lineKey);
034.
tx.setSecondKey(lineInt);
035.
second.set(lineInt);
036.
context.write(tx, second);
037.
}
038.
}
039.
//设置reduce
040.
public
static
class
GroupReduce
extends
Reducer<TextInt, IntWritable, Text, Text>
041.
{
042.
@Override
043.
protected
void
reduce(TextInt key, Iterable<IntWritable> values,
044.
Context context)
045.
throws
IOException, InterruptedException {
046.
StringBuffer sb=
new
StringBuffer();
047.
for
(IntWritable val:values)
048.
{
049.
sb.append(val+
","
);
050.
}
051.
if
(sb.length()>
0
)
052.
{
053.
sb.deleteCharAt(sb.length()-
1
);
054.
}
055.
context.write(
new
Text(key.getFirstKey()),
new
Text(sb.toString()));
056.
}
057.
}
058.
059.
@Override
060.
public
int
run(String[] args)
throws
Exception {
061.
// TODO Auto-generated method stub
062.
Configuration conf=getConf();
063.
Job job=
new
Job(conf,
"SecondarySort"
);
064.
job.setJarByClass(SortMain.
class
);
065.
// 设置输入文件的路径,已经上传在HDFS
066.
FileInputFormat.addInputPath(job,
new
Path(args[
0
]));
067.
// 设置输出文件的路径,输出文件也存在HDFS中,但是输出目录不能已经存在
068.
FileOutputFormat.setOutputPath(job,
new
Path(args[
1
]));
069.
070.
job.setMapperClass(GroupMapper.
class
);
071.
job.setReducerClass(GroupReduce.
class
);
072.
//设置分区方法
073.
job.setPartitionerClass(KeyPartitioner.
class
);
074.
075.
//下面这两个都是针对map端的
076.
//设置分组的策略,哪些key可以放置到一组中
077.
job.setGroupingComparatorClass(TextComparator.
class
);
078.
//设置key如何进行排序在传递给reducer之前.
079.
//这里就可以设置对组内如何排序的方法
080.
/*************关键点**********/
081.
job.setSortComparatorClass(TextIntComparator.
class
);
082.
//设置输入文件格式
083.
job.setInputFormatClass(KeyValueTextInputFormat.
class
);
084.
//使用默认的输出格式即TextInputFormat
085.
//设置map的输出key和value类型
086.
job.setMapOutputKeyClass(TextInt.
class
);
087.
job.setMapOutputValueClass(IntWritable.
class
);
088.
//设置reduce的输出key和value类型
089.
//job.setOutputFormatClass(TextOutputFormat.class);
090.
job.setOutputKeyClass(Text.
class
);
091.
job.setOutputValueClass(Text.
class
);
092.
job.waitForCompletion(
true
);
093.
int
exitCode=job.isSuccessful()?
0
:
1
;
094.
return
exitCode;
095.
}
096.
097.
public
static
void
main(String[] args)
throws
Exception
098.
{
099.
int
exitCode=ToolRunner.run(
new
SortMain(), args);
100.
System.exit(exitCode);
101.
}
102.
}
注意事项
1,设置分组排序按照升序还是降序是在自定义WritableComparable中的compareTo()方法实现的,具体升序或者降序的设置在代码中已经注释说明
2,设置组内值进行升序还是降序的排序是在组内排序策略中的compare()方法注释说明的。
3,这里同时最重要的一点是,将第二列即放在组合键中,又作为value,这样对于组合键排序也就相当于对于value进行排序了。
4,在自定义组合键的时候,对于组合键中的数据的基本类型可以采用Java的基本类型也可以采用Hadoop的基本数据类型,对于Hadoop的基本数据类型一定要记得初始化new一个基本数据类型对象。对于组合键类,必须要有默认的构造方法。
MapReduce的自制Writable分组输出及组内排序相关推荐
- Json数组列表中的数据分组排序、组内排序
文章目录 问题描述 方式一:先全部排序,在分组排序 方式二:使用HashMap取出来分组再组内排序 方式三:使用TreeMap取出来分组再组内排序 测试代码及耗时 问题描述 现在有一个用户信息数组,用 ...
- linq分组再实现组内排序
哎呀 转前端了 才一年没写后端 linq查询都忘记了 原来炒鸡简单 记录下: 1. 分组查询 // 简单分组查询 var query = (from p in queryorderby p. ...
- SQL实现分组排序和组内排序(相同分数并列排名)
创建数据表 CREATE TABLE `heyf_t10` (`empid` int(11) DEFAULT NULL,`deptid` int(11) DEFAULT NULL,`salary` d ...
- mysql 组内分组_[MySQL] group by 分组并进行组内排序取得最新一条
有一个需求是获取指定用户发送的最新的内容,这个时候需要使用group by分组功能 但是怎么获取最新的呢 ? 如果直接进行order by 是不能实现的,因为MysqL会先执行group by 后执行 ...
- mysql的组内排序生成序号_sql 分组查询,组内排序, 组内添加序号 (SQL Server 排序函数 ROW_NUMBER和RANK 用法总结)...
下面的例子和SQL语句均在SQL Server 2008环境下运行通过,使用SQL Server自带的AdventureWorks数据库. -- 添加序列号 -- 行号用法: ROW_NUMBER() ...
- mysql分组后组内排名_SQL实现group by 分组后组内排序
在一个月黑风高的夜晚,自己无聊学习的SQL的时候,练习,突发奇想的想实现一个功能查询,一张成绩表有如下字段,班级ID,英语成绩,数据成绩,语文成绩如下图 实现 查询出 每个班级英语成绩最高的前两名的记 ...
- 《数据库SQL实战》从titles表获取按照title进行分组,每组个数大于等于2,给出title以及对应的数目t。 注意对于重复的emp_no进行忽略。
题目描述 从titles表获取按照title进行分组,每组个数大于等于2,给出title以及对应的数目t. 注意对于重复的emp_no进行忽略. CREATE TABLE IF NOT EXISTS ...
- 从titles表获取按照title进行分组,每组个数大于等于2,给出title以及对应的数目t。
题目描述 从titles表获取按照title进行分组,每组个数大于等于2,给出title以及对应的数目t. CREATE TABLE IF NOT EXISTS "titles" ...
- mysql分组取出每组地一条数据_基于mysql实现group by取各分组最新一条数据
基于mysql实现group by取各分组最新一条数据 前言: group by函数后取到的是分组中的第一条数据,但是我们有时候需要取出各分组的最新一条,该怎么实现呢? 本文提供两种实现方式. 一.准 ...
最新文章
- pam_frpintd.so 错误修复
- sql_trace的介绍
- 【精品】【分享】盖茨留给职场工作者的十句警告
- 深度学习和目标检测系列教程 13-300:YOLO 物体检测算法
- [转] 爱情的隐式马尔可夫模型(Love in the Hidden Markov Model)
- 类从未使用_如果您从未依赖在线销售,如何优化您的网站
- 双指针--Codeforces Round #645 (Div. 2) d题
- defconfig、 .config
- 内存管理, 对象的生命周期
- [警告] multi-字符 character constant [-Wmultichar] ----字符+符号输出错误
- 记录——《C Primer Plus (第五版)》第八章编程练习第八题
- SecureCrt 常用命令
- CHD+CM-1 安装
- 教你怎么筛选排除百度搜索引擎的屏蔽违规词
- 苹果历代产品中的8大亮点设计(上)
- 保健用品智慧供应链管理系统:精细化管理供应商与采购环节,打造敏捷型供应链
- 【头部姿态】头部姿态检测(一)
- 布拉格捷克理工大学研究团队:Prisma进化版
- 网络安全卷么? 你身边的网络安全人过的怎么样呢?
- 【WRF如何在输出的wrfoutput文件中设置添加/删除变量】
热门文章
- 小论Java类变量的隐私泄露
- 风险案例-25期-与有过合作经历客户在新合同约定中过于简单、范围不明确,导致客户对新需求工作量不认可...
- Linux tmux分屏工具
- linux mint 13 input method of chinese
- 常用的 服务器 与 交换机
- iphone-common-codes-ccteam源代码 CCUIAlertView.m
- SQL查询案例:行列转换[行转列, 使用 CASE WHEN 处理]
- 在代码中向ReportViewer动态添加数据源
- 2019百度之星初赛-1
- IIS6.0打开ASP文件,出现500错误或404错误解决方法