MapReduce之join操作
一 前言
在很多时候,我们可能需要处理的不是一个单独的文件,而是几个有关联的文件,比如账户信息和订单信息=>
账户信息:customerIdname address telephone
订单信息:orderIdcustomerId price productName
我们很可能就需要用到这2个文件,并且他们的关系通过用户id进行关联或者join.
两个文件的关联点作为key,后面的字段作为value。
二 Map-Reduce join的三种方式
2.1 Reduce端Join: Join的操作在Reduce执行
2.2 Map端Join:
两个待连接的表,其中一个非常大,一个非常小,可以将小表直接存放于内存,DistributedCache实现。
2.3 半连接 SemiJoin: map端join和reduce端的join结合
三 Reduce端 Join
3.1场景:对于那种需要连接的文件大小差不多,且不需要过滤无效数据;这种情况效率比较低,特别是文件都很大的时候,还要经历shuffle过程。
3.2实现步骤:
#一般情况下,我们需要自定义key的类型,提供区别两种数据的一个标记
public class ReduceSideJoin extends Configured implements Tool {
public static class JoinWritable implements Writable {
private String tag;
private String data;
public JoinWritable() {
}
public JoinWritable(String tag, String data) {
this.tag = tag;
this.data = data;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(getTag());
out.writeUTF(getData());
}
public void readFields(DataInput in) throws IOException {
this.setTag(in.readUTF());
this.setData(in.readUTF());
}
public void set(String tag, String data) {
this.setTag(tag);
this.setData(data);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((data == null) ? 0 : data.hashCode());
result = prime * result + ((tag == null) ? 0 : tag.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
JoinWritable other = (JoinWritable) obj;
if (data == null) {
if (other.data != null)
return false;
} else if (!data.equals(other.data))
return false;
if (tag == null) {
if (other.tag != null)
return false;
} else if (!tag.equals(other.tag))
return false;
return true;
}
@Override
public String toString() {
return tag+","+data;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
public static class JoinMapper extends Mapper<LongWritable, Text, LongWritable,JoinWritable> {
private LongWritable outputKey = new LongWritable();
private JoinWritable outputValue = new JoinWritable();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text,LongWritable, JoinWritable>.Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (ArrayUtils.isEmpty(fields)) {
return;
}
if (fields.length != 3 && fields.length != 4) {
return;
}
long cid = Long.valueOf(fields[0]);
outputKey.set(cid);
String name = fields[1];
if (fields.length == 3) {
String phone = fields[2];
outputValue.set("customer", name+","+phone);
}
if (fields.length == 4) {
String price = fields[2];
String date = fields[3];
outputValue.set("order", name+","+price+","+date);
}
context.write(outputKey, outputValue);
}
}
public static class JoinReduce extends Reducer<LongWritable, JoinWritable,NullWritable, Text>{
private Text outputValue = new Text();
@Override
protected void reduce(LongWritable key, Iterable<JoinWritable> values,
Reducer<LongWritable, JoinWritable,NullWritable, Text>.Context context)
throws IOException, InterruptedException {
String customerInfo = null;
List<String> orderList = new ArrayList<String>();
for (JoinWritable value : values) {
if ("customer".equals(value.getTag())) {
customerInfo = value.getData();
} else if ("order".equals(value.getTag())){
orderList.add(value.getData());
}
}
for (String order : orderList) {
outputValue.set(key.get()+","+customerInfo+","+order);
context.write(NullWritable.get(), outputValue);
}
}
}
public int run(String[] args) throws Exception {
if (ArrayUtils.isEmpty(args) || args.length < 2) {
System.exit(2);
}
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(ReduceSideJoin.class);
Path in = new Path(args[0]);
FileInputFormat.addInputPath(job, in);
Path out = new Path(args[1]);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(JoinWritable.class);
job.setReducerClass(JoinReduce.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(Boolean.TRUE) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int num = new Random().nextInt(1000);
if (args == null || args.length == 0) {
args = new String[]{
"hdfs://hdfs-cluster/user/hadoop/input/join",
"hdfs://hdfs-cluster/user/hadoop/output/join"+num
};
}
Configuration conf = new Configuration();
int code = ToolRunner.run(conf, new ReduceSideJoin(), args);
System.exit(code);
}
}
四 Map端Join
4.1场景
如果需要连接的文件,一个比较大,一个比较小。这时候我们是比较适合在Map 端做join操作的。因为我们知道在Reduce端做join操作效率是比较低下的。
在Map阶段join会在数据达到map函数之前取出来,放在内存里的,以便task运行的时候可以去取,所以必须保证数据量不是很大,否则内存会撑不住。
我们要取文件需要通过DistributedCache来实现的,它会把我们上传的文件分发到各个节点一份,从而保证每个Mapper在工作的时候都能从本地缓存目录去取出这个文件。
4.2实现
#setup 阶段去读取文件,然后放入内存
#map阶段进行数据合并
#在提交job的时候,需要将这个文件路径放入分布式缓存
public class MapSideJoin extends Configured implements Tool{
public static class CustomerInfo {
private long cid;
private String name;
private String phone;
public CustomerInfo() {
}
public CustomerInfo(long cid, String name, String phone) {
this.cid = cid;
this.name = name;
this.phone = phone;
}
public long getCid() {
return cid;
}
public String getName() {
return name;
}
public String getPhone() {
return phone;
}
}
public static class MapSideJoinMapper extendsMapper<LongWritable, Text, LongWritable, Text> {
private Map<Long, CustomerInfo> customerInfos = new HashMap<Long, CustomerInfo>();
private LongWritable outputKey = new LongWritable();
private Text outputValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable,Text>.Context context)
throws IOException, InterruptedException {
if (value == null) {
return;
}
String[] fields = value.toString().split(",");
if (ArrayUtils.isEmpty(fields) || fields.length < 4) {
return;
}
if (customerInfos.size() == 0) {
return;
}
try {
long cid = Long.valueOf(fields[0]);
CustomerInfo cInfo = customerInfos.get(cid);
if (cInfo == null) {
return;
}
StringBuilder builder = new StringBuilder();
builder.append(cid).append("=>").append(cInfo.getName()).append("\t").append(cInfo.getPhone())
.append("\t").append(fields[1]).append("\t").append(fields[2]).append("\t").append(fields[3]);
outputKey.set(cid);
outputValue.set(builder.toString());
context.write(outputKey, outputValue);
} catch (NumberFormatException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected void setup(Mapper<LongWritable, Text, LongWritable,Text>.Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
// 获取小表中的缓存文件的URI
URI[] localCacheFiles = context.getCacheFiles();
if (ArrayUtils.isEmpty(localCacheFiles)) {
return;
}
// 通过URI构造HDFS 路径
Path path = new Path(localCacheFiles[0]);
// 获取文件系统
FileSystem fs = FileSystem.get(conf);
// 打开文件
FSDataInputStream in = fs.open(path);
if (in == null) {
return;
}
InputStreamReader isr = new InputStreamReader(in);
BufferedReader br = new BufferedReader(isr);
String line = null;
String[] fields = null;
CustomerInfo info = null;
while ((line = br.readLine()) != null) {
fields = line.split(",");
if (ArrayUtils.isEmpty(fields) || fields.length != 3) {
continue;
}
info = new CustomerInfo(Long.valueOf(fields[0]), fields[1], fields[2]);
customerInfos.put(Long.valueOf(fields[0]), info);
}
if (br != null) {
br.close();
}
if (isr != null) {
isr.close();
}
}
}
public static class MapSideJoinReducer extendsReducer<LongWritable, Text, NullWritable, Text> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values,
Reducer<LongWritable, Text,NullWritable, Text>.Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(NullWritable.get(), value);
}
}
}
public int run(String[] args) throws Exception {
if (ArrayUtils.isEmpty(args) || args.length < 3) {
System.exit(3);
}
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(MapSideJoin.class);
Path in = new Path(args[0]);
FileInputFormat.addInputPath(job, in);
Path out = new Path(args[1]);
FileOutputFormat.setOutputPath(job, out);
Path cacheURL = new Path(args[2]);
job.addCacheFile(cacheURL.toUri());
job.setMapperClass(MapSideJoinMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MapSideJoinReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(Boolean.TRUE) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int num = new Random().nextInt(1000);
if (args == null || args.length == 0) {
args = new String[]{
"hdfs://hdfs-cluster/user/hadoop/input/join",
"hdfs://hdfs-cluster/user/hadoop/output/join"+num,
"hdfs://hdfs-cluster/user/hadoop/cache/customers.csv"
};
}
Configuration conf = new Configuration();
int code = ToolRunner.run(conf, new MapSideJoin(), args);
System.exit(code);
}
}
五 Semi-Join(半连接)
如果我们需要过滤很多无效数据,但是在Reduce阶段才来过滤,显然并不是一件很好的方法,因为我们让这些数据经历了shuffle过程,这会涉及到很多磁盘I/O 和网络I/O操作。
如果我们在Map阶段就把这些数据过滤了,那么这部分数据就不会经历shuffle阶段,从而有助于性能提升。
5.1实现
利用DistributedCache将小表分发到各个节点上,在Map过程的setup()函数里,读取缓存里的文件,只将小表的连接键存储在hashSet中。
在map()函数执行时,对每一条数据进行判断,如果这条数据的连接键为空或者在hashSet里不存在,那么则认为这条数据无效,使条数据也不参与reduce的过程。
需要注意的是小表的key是不是很大,否则内存不够用。
public class SemiJoin extends Configured implements Tool {
public static class JoinWritable implements Writable {
private String tag;
private String data;
public JoinWritable() {
}
public JoinWritable(String tag, String data) {
this.tag = tag;
this.data = data;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(getTag());
out.writeUTF(getData());
}
public void readFields(DataInput in) throws IOException {
this.setTag(in.readUTF());
this.setData(in.readUTF());
}
public void set(String tag, String data) {
this.setTag(tag);
this.setData(data);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((data == null) ? 0 : data.hashCode());
result = prime * result + ((tag == null) ? 0 : tag.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
JoinWritable other = (JoinWritable) obj;
if (data == null) {
if (other.data != null)
return false;
} else if (!data.equals(other.data))
return false;
if (tag == null) {
if (other.tag != null)
return false;
} else if (!tag.equals(other.tag))
return false;
return true;
}
@Override
public String toString() {
return tag + "," + data;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
public static class SemiJoinMapper extendsMapper<LongWritable, Text, LongWritable, JoinWritable> {
private Set<Long> keySet = new HashSet<Long>();
private LongWritable outputKey = new LongWritable();
private JoinWritable outputValue = new JoinWritable();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text,LongWritable, JoinWritable>.Context context)
throws IOException, InterruptedException {
if (value == null) {
return;
}
String[] fields = value.toString().split(",");
if (ArrayUtils.isEmpty(fields) || fields.length < 3) {
return;
}
try {
long cid = Long.valueOf(fields[0]);
if (!keySet.contains(cid)) {
return;
}
outputKey.set(cid);
String name = fields[1];
if (fields.length == 3) {
String phone = fields[2];
outputValue.set("customer", name + "\t" + phone);
}
if (fields.length == 4) {
String price = fields[2];
String date = fields[3];
outputValue.set("order", name + "\t" + price + "\t" + date);
}
context.write(outputKey, outputValue);
} catch (NumberFormatException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected void setup(Mapper<LongWritable, Text, LongWritable,JoinWritable>.Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
// 获取小表中的缓存文件的URI
URI[] localCacheFiles = context.getCacheFiles();
if (ArrayUtils.isEmpty(localCacheFiles)) {
return;
}
// 通过URI构造HDFS 路径
Path path = new Path(localCacheFiles[0]);
// 获取文件系统
FileSystem fs = FileSystem.get(conf);
// 打开文件
FSDataInputStream in = fs.open(path);
if (in == null) {
return;
}
InputStreamReaderisr = new InputStreamReader(in);
BufferedReader br = new BufferedReader(isr);
String line = null;
String[] fields = null;
while ((line = br.readLine()) != null) {
fields = line.split(",");
if (ArrayUtils.isEmpty(fields) || fields.length != 3) {
continue;
}
keySet.add(Long.valueOf(fields[0]));
}
if (br != null) {
br.close();
}
if (isr != null) {
isr.close();
}
}
}
public static class SemiJoinReducer extendsReducer<LongWritable, JoinWritable, NullWritable, Text> {
private Text outputValue = new Text();
@Override
protected void reduce(LongWritable key, Iterable<JoinWritable> values,
Reducer<LongWritable,JoinWritable, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
List<String> orderList = new ArrayList<String>();
String customerInfo = null;
for (JoinWritable value : values) {
if ("customer".equals(value.getTag())) {
customerInfo = value.getData();
} else if ("order".equals(value.getTag())) {
orderList.add(value.getData());
}
}
for (String order : orderList) {
outputValue.set(key.get() + "\t" + customerInfo + "\t" + order);
context.write(NullWritable.get(), outputValue);
}
}
}
public int run(String[] args) throws Exception {
if (ArrayUtils.isEmpty(args) || args.length < 3) {
System.exit(3);
}
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(SemiJoin.class);
Path in = new Path(args[0]);
FileInputFormat.addInputPath(job, in);
Path out = new Path(args[1]);
FileOutputFormat.setOutputPath(job, out);
Path cacheURL = new Path(args[2]);
job.addCacheFile(cacheURL.toUri());
job.setMapperClass(SemiJoinMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(JoinWritable.class);
job.setReducerClass(SemiJoinReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(Boolean.TRUE) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int num = new Random().nextInt(1000);
if (args == null || args.length == 0) {
args = new String[] { "hdfs://hdfs-cluster/user/hadoop/input/join",
"hdfs://hdfs-cluster/user/hadoop/output/join" + num,
"hdfs://hdfs-cluster/user/hadoop/cache/customers.csv" };
}
Configuration conf = new Configuration();
int code = ToolRunner.run(conf, new SemiJoin(), args);
System.exit(code);
}
}
MapReduce之join操作相关推荐
- MapReduce实现join操作
前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地.今天终于费了些功夫把整个流程走了一遭,期间经历了诸多麻烦并最终得以将其一一搞定,再次深切体会到,什么叫从计算模型到 ...
- 使用MapReduce实现join操作
2019独角兽企业重金招聘Python工程师标准>>> 在关系型数据库中,要实现join操作是非常方便的,通过sql定义的join原语就可以实现.在hdfs存储的海量数据中,要实现j ...
- [MapReduce_add_4] MapReduce 的 join 操作
0. 说明 Map 端 join && Reduce 端 join 1. Map 端 join Map 端 join:大表+小表 => 将小表加入到内存,迭代大表每一行,与之进行 ...
- MapReduce之Map join操作
MapReduce之Map join操作(分布式缓存) 文章目录 MapReduce之Map join操作(分布式缓存) 案例结合 利用MapReduce中的setup方法与DistributedCa ...
- 5、HIVE DML操作、load数据、update、Delete、Merge、where语句、基于分区的查询、HAVING子句、LIMIT子句、Group By语法、Hive 的Join操作等
目录: 4.2.1 Load文件数据到表中 4.2.2查询的数据插入到表中 4.2.3将Hive查询的结果存到本地Linux的文件系统目录中 4.2.4通过SQL语句的方式插入数据 4.2.5 UPD ...
- Hive是如何让MapReduce实现SQL操作的?
learn from 从0开始学大数据(极客时间) 1. MapReduce 实现 SQL 的原理 SELECT pageid, age, count(1) FROM pv_users GROUP B ...
- Flink学习笔记:Operators之CoGroup及Join操作
本文为<Flink大数据项目实战>学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程: Flink大数据项目实战:http://t.cn/EJtKhaz ...
- shell中join链接多个域_shell 如何实现两个表的join操作
shell 如何实现两个表的join操作 今天研究的一个问题是:在Shell 脚本中如何实现两个表的 join 操作,这里说的两个表示的其实是 两个文件,但是文件是列表的形式,有固定的分割符号,即就相 ...
- 离线轻量级大数据平台Spark之JavaRDD关联join操作
对两个RDD进行关联操作,如: 1)文件post_data.txt包含:post_id\title\content 2)文件train.txt包含:dev_id\post_id\praise\time ...
最新文章
- 《每天学点博弈论全集》-读书总结
- 【前端3】jquary:页面加载,选择器,隔行换色,Dom,全选,动画,遍历,广告/抽奖,表单校验插件
- E - Escape from the Island(最短路+dp)
- apso matlab,APSO算法指导
- Myeclipse学习总结(12)——Eclipse/MyEclipse实用技巧再回顾
- XSS注入,js脚本注入后台
- 在浏览器上运行Qt应用 emscripten-qt
- 台式电脑备用电源_台式电源哪家稳?华硕 TUF GAMING六年质保,坚如磐石_电脑电源...
- Trucksim车辆动力学模型
- 九宫格游戏(java实现)
- 竞品分析文档撰写总结
- 亿网文交孟建州艺术品该怎么鉴别,代码分析
- 26个英语字母表及字母音标
- 关于单链表结构体定义结点时 LNode *LinkList的理解
- Atlas 200 DK登录密码 制卡安装DDK和lib
- unity更优秀的跳跃手感(简单物理小知识)
- ffmpeg 有声视频合成背景音乐(合成多声音/合成多音轨)
- 软件测试 | 测试开发 | Git实战(四)| Git分支管理实操,搞定在线合并和本地合并
- 题2:找出落单的那个数
- matlab傅里叶变换 绘图
热门文章
- textview 背景变形_今日推荐:温州洞头-背景墙框石塑线条
- 电脑显示器闪屏_时尚超薄可升降:华硕新品家用护眼显示器MZ27AQL
- Mysql orangepi_SSH远程登录香橙派Orange Pi Zero2开发板的操作方法
- 同步服务器客户端位置,服务器和客户端信息同步方式
- 3-31Pytorch与auto-variabletensor
- mysql 驱动说明_mysql_jdbc连接说明
- c#值get、 set用法(包含自动转换的说明)
- java获取教务系统成绩,Java httpClient 正方教务管理系统模拟登陆,爬取学生成绩和培养计划...
- 时序模型预测结果:DM检验含义与python实现
- matplotlib绘制三维折线图