hadoop--Reduce Join
目录
- Reduce Join
- Reduce Join案例
- 需求
- 需求分析
- 运行结果
- 缺点
- 源码
Reduce Join
Map 端的主要工作:为来自不同表或文件的 key/value 对,打标签以区别不同来源的记 录。然后用连接字段作为 key,其余部分和新加的标志作为 value,最后进行输出;
Reduce 端的主要工作:在 Reduce 端以连接字段作为 key 的分组已经完成,我们只需要 在每一个分组当中将那些来源于不同文件的记录(在 Map 阶段已经打标志)分开,最后进 行合并就 ok 了。
Reduce Join案例
需求
订单表order:
产品表pd:
将上述两表中的数据根据商品pid合并到订单数据表中,要求呈现出如下图:
需求分析
通过将关联条件作为 Map 输出的 key,将两表满足 Join 条件的数据并携带数据所来源
的文件信息,发往同一个 ReduceTask,在 Reduce 中进行数据的串联。
Reduce端表合并(数据倾斜)
运行结果
本地order.txt
本地pd.txt
输出
缺点
缺点: 这种方式中,合并的操作是在 Reduce 阶段完成,Reduce 端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在 Reduce 阶段极易产生数据倾斜。
解决方案: Map 端实现数据合并==>
源码
tips:
hadoop迭代器中使用了对象重用,即迭代时value始终指向一个内存地址(引用值始终不变),改变的是引用指向的内存地址中的数据。
TableBean类:
package com.xiaobai.mapreduce.reduceJoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class TableBean implements Writable {private String id; //订单idprivate String pid; //商品idprivate int amount; //商品数量private String pname; // 商品名称private String flag; //标记表 order pd//空餐构造public TableBean() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}//序列化@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(flag);}//反序列化@Overridepublic void readFields(DataInput in) throws IOException {//反序列化应和序列化顺序一致this.id = in.readUTF();this.pid = in.readUTF();this.amount = in.readInt();this.pname = in.readUTF();this.flag = in.readUTF();}@Overridepublic String toString() {// id pname amountreturn id + "\t" + pname + "\t" + amount;}
}
TableMapper类:
package com.xiaobai.mapreduce.reduceJoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.File;
import java.io.IOException;public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {private String fileName;private Text outK = new Text();private TableBean outV = new TableBean();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//初始化 order pd//获取文件名称 一个文件只获取一次FileSplit split = (FileSplit)context.getInputSplit();fileName = split.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1. 获取一行String line = value.toString();//2.判断是哪个文件的if(fileName.contains("order")){ //处理的是订单表orderString[] split = line.split("\t");//封装outK.set(split[1]);outV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2])); //要转换为String类型outV.setPname("");outV.setFlag("order");}else{ //处理的是产品表pdString[] split = line.split("\t");outK.set(split[0]);outV.setId("");outV.setPid(split[0]);outV.setAmount(0);outV.setPname(split[1]);outV.setFlag("pd");}//写出context.write(outK,outV);}
}
TableReducer类:
package com.xiaobai.mapreduce.reduceJoin;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {// 01 1001 1 order// 01 1004 4 order// 01 小米 pd//创建2个集合ArrayList<TableBean> orderBeans = new ArrayList<>();TableBean pdBean = new TableBean();//循环遍历for (TableBean value : values) {if("order".equals(value.getFlag())){ //order表//创建临时TableBean对象tmptableBeanTableBean tmptableBean = new TableBean();try {BeanUtils.copyProperties(tmptableBean,value); //使用工具类BeanUtils将value赋值给tmptableBean} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}orderBeans.add(tmptableBean);}else{ //pd表try {BeanUtils.copyProperties(pdBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}//循环遍历orderBeans,赋值pdnamefor (TableBean orderBean : orderBeans) {orderBean.setPname(pdBean.getPname());//id相同context.write(orderBean,NullWritable.get());}}
}
TableDriver类:
package com.xiaobai.mapreduce.reduceJoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;import java.io.IOException;public class TableDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance(new Configuration());job.setJarByClass(TableDriver.class);job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,new org.apache.hadoop.fs.Path("/Users/jane/Desktop/test/JoinTest"));FileOutputFormat.setOutputPath(job,new Path("/Users/jane/Desktop/hadoop/JoinTestOutput"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}
hadoop--Reduce Join相关推荐
- Reduce Join介绍及案例
Reduce Join介绍及案例 Reduce Join介绍 Reduce Join案例 需求 1.需求说明 2.文件 案例分析 1.需求分析 2.输入数据 3.期望输出数据 4.MapTask 5. ...
- Hadoop之Join、计数器、数据清洗概述
Hadoop之Join.计数器.数据清洗概述 目录 Reduce join Map join 计数器应用 数据清洗(ETL) 1. Reduce join 原理 Map端的主要工作:为来自不同表(文件 ...
- Map And Reduce Join的使用案例
文章目录 Reduce Join 1)需求 2)需求分析 3)编程实现 1.创建Bean类 2.创建Mapper类 3.创建Reducer类 4.创建Driver类 4)查看结果 5)总结 Mappe ...
- 【MapReduce】基础案例 ---- Reduce Join 实现数据合并(表连接)
文章目录 一.Reduce Join ① Reduce Join工作原理 ② Reduce Join 案例 ☠ 需求 ☠ 案例分析 ☠ 代码实现 封装Bean对象 Mapper阶段 Reducer阶段 ...
- reduce 数据倾斜_Spark(四十)数据倾斜解决方案之将reduce join转换
一.背景 1.将reduce join转换为map join 2.broadcast出来的普通变量 普通的join,那么肯定是要走shuffle:那么,所以既然是走shuffle,那么普通的join, ...
- hadoop data join
概念: Hadoop有一个叫DataJoin的包为Data Join提供相应的框架.它的Jar包存在于contrib/datajoin/hadoop-*-datajoin. 为区别于其他的data j ...
- hadoop中join操作
前言 在mysql中,经常涉及到2张表或者多张表的关联查询,通常通过中间字段将两个表做关联,在MapReduce中,某些场景下也会遇到类似的需求,比如说,将两个原始的日志文件,通过中间业务字段进行关联 ...
- MR实现reduce join和map join及hive的执行计划
一.涵盖 MapReduce InputFormat RecordReader 切片:block=input split 1.1 File- Text- NLine- DB- Mapper setup ...
- Hadoop MapReduce编程 API入门系列之join(二十六)
天气记录数据库 气象站数据库 气象站和天气记录合并之后的示意图如下所示. 011990-99999 SIHCCAJAVRI 195005150700 0 011990-99999 SIHCCAJAVR ...
- Hadoop实例学习(十三) Join应用
目录 Reduce Join 解决的问题 实例 编写TableBean 类 编写TableMapper类 编写TableDriver类 Map Join 案例 编写MapJoinMapper类 编写M ...
最新文章
- 2007.04.26
- 小目标检测的一些问题,思路和方案
- 原生编辑器_免费开源的GIF制作神器,可录屏幕/摄像头/画板,自带编辑器
- c++的引用是什么意思?怎么回事?
- 直接进入ORACLE12C插件数据库
- Chapter 1 First Sight——30
- Java使用独立数据库连接池(DBCP为例)
- 大数据行为分析包含哪些功能
- R︱并行计算以及提高运算效率的方式(parallel包、clusterExport函数、SupR包简介)
- memcached mysql 性能测试_InnoDB memcached插件 vs 原生memcached对比性能测试
- 链表:从尾到头打印链表
- java混淆器_Java 混淆器
- nginx 服务器大文件上传时500错误
- Linux系统的权限管理
- Qt编写的复杂象棋程序 chessbroad.cpp 错误比较多
- Revit二次开发入门相关安装和配置
- 怎么把win10设置Linux样式,老司机教你把win10系统界面换成win7样式的方法
- 重组人碱性成纤维细胞生长因子(FGF 2)参考文献
- 5G智慧校园建设顶层设计方案智慧校园大脑建设方案
- html5的多媒体的属性及其简单的属性
热门文章
- (王道408考研操作系统)第三章内存管理-第一节7:非连续分配管理方式之基本分段管理方式
- Java统计每个大写字母的个数
- C#删除字符串倒数第几个字符后的所有字符串
- chmod递归授权文件夹(用法)
- Python zmq 讲解
- 141.Linked List Cycle
- OpenCV Using Python——基于SURF特征提取和金字塔LK光流法的单目视觉三维重建 (光流、场景流)...
- Django xadmin引入DjangoUeditor
- 判断应用程序是否是当前激活程序(获得焦点的程序)
- TcxComboBox控件说明