Map And Reduce Join的使用案例
文章目录
- Reduce Join
- 1)需求
- 2)需求分析
- 3)编程实现
- 1.创建Bean类
- 2.创建Mapper类
- 3.创建Reducer类
- 4.创建Driver类
- 4)查看结果
- 5)总结
- Mapper Join
- 2)需求分析
- 3)编程实现
- 1.创建Bean类:
- 2.创建JoinMapper类
- 3.创建JoinDriver类
- 4)查看结果
Reduce Join
- Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录,然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
- Reduce端的主要工作:在Reduce端以链接字段作为key的分组已经完成,我们只需要在每一个分组中将那些来源于不同文件的记录(在Map阶段已经打上标志了)分开,最后进行合并就ok了
1)需求
2)需求分析
3)编程实现
1.创建Bean类
package com.yingzi.mapreduce.reduceJoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** @author 影子* @create 2022-01-15-12:45**/
public class TableBean implements Writable {private String id; //订单idprivate String pid; //产品idprivate int amount; //产品数量private String pname; //产品名称private String flag; //判断是order表还是pd表的标志字段public TableBean() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic String toString() {return id + "\t" + pname + "\t" + amount;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(id);dataOutput.writeUTF(pid);dataOutput.writeInt(amount);dataOutput.writeUTF(pname);dataOutput.writeUTF(flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.id = dataInput.readUTF();this.pid = dataInput.readUTF();this.amount = dataInput.readInt();this.pname = dataInput.readUTF();this.flag = dataInput.readUTF();}
}
2.创建Mapper类
package com.yingzi.mapreduce.reduceJoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;/*** @author 影子* @create 2022-01-15-12:54**/
public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {private String filename;private Text outK= new Text();private TableBean outV = new TableBean();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {//获取对应文件名称InputSplit split = context.getInputSplit();FileSplit fileSplit = (FileSplit) split; //向下强转filename = fileSplit.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {//获取一行String line = value.toString();//判断是哪个文件if (filename.contains("order")){//订单表的处理String[] split = line.split("\t");;//封装outK,outVoutK.set(split[1]);outV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2]));outV.setPname("");outV.setFlag("order");}else{//商品表的处理String[] split = line.split("\t");//封装outk,outVoutK.set(split[0]);outV.setId("");outV.setPid(split[0]);outV.setAmount(0);outV.setPname(split[1]);outV.setFlag("pd");}//写出outK,outVcontext.write(outK,outV);}
}
3.创建Reducer类
package com.yingzi.mapreduce.reduceJoin;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;/*** @author 影子* @create 2022-01-15-13:23**/
public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {ArrayList<TableBean> orderBeans = new ArrayList<>();TableBean pdBean = new TableBean();for (TableBean value : values) {//判断数据来自哪个表if("order".equals(value.getFlag())){//订单表//创建一个临时TableBean对象接收valueTableBean tmpOrderBean = new TableBean();try {BeanUtils.copyProperties(tmpOrderBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}orderBeans.add(tmpOrderBean);}else {//商品表try {BeanUtils.copyProperties(pdBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}//循环遍历orderBeans,替换掉每个orderBean的pid为pname,然后写出for (TableBean orderBean : orderBeans) {orderBean.setPname(pdBean.getPname());//写出修改后的orderBean对象context.write(orderBean,NullWritable.get());}}
}
4.创建Driver类
package com.yingzi.mapreduce.reduceJoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author 影子* @create 2022-01-15-13:45**/
public class TableDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Job job = Job.getInstance(new Configuration());job.setJarByClass(TableDriver.class);job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\11_input\\inputtable"));FileOutputFormat.setOutputPath(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\_output\\output10"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}
4)查看结果
5)总结
- 缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜
- 解决方案:Map端实现数据合并
Mapper Join
(1)使用场景:Map Join适用于一张表十分小、一张表很大的场景(2)优点:在Reduce端处理过多的表,非常容易产生数据倾斜,怎么办?在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力尽可能减少数据倾斜。(3)具体办法:采用DistributeCachea)在Mapper的setup阶段,将文件读取到缓存集合中b)在Driver驱动类中加载缓存//缓存普通文件到 Task 运行节点。job.addCacheFile(new URI("file:///e:/cache/pd.txt"));//如果是集群运行,需要设置 HDFS 路径job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
2)需求分析
3)编程实现
1.创建Bean类:
和上例无差别
package com.yingzi.mapreduce.mapJoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** @author 影子* @create 2022-01-15-12:45**/
public class TableBean implements Writable {private String id; //订单idprivate String pid; //产品idprivate int amount; //产品数量private String pname; //产品名称private String flag; //判断是order表还是pd表的标志字段public TableBean() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic String toString() {return id + "\t" + pname + "\t" + amount;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(id);dataOutput.writeUTF(pid);dataOutput.writeInt(amount);dataOutput.writeUTF(pname);dataOutput.writeUTF(flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.id = dataInput.readUTF();this.pid = dataInput.readUTF();this.amount = dataInput.readInt();this.pname = dataInput.readUTF();this.flag = dataInput.readUTF();}
}
2.创建JoinMapper类
package com.yingzi.mapreduce.mapJoin;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;/*** @author 影子* @create 2022-01-15-14:19**/
public class MapJoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> {private HashMap<String, String> pdMap = new HashMap<>();private Text outK = new Text();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {// 获取缓存的文件,并把文件内容封装到pd.txtURI[] cacheFiles = context.getCacheFiles();Path path = new Path(cacheFiles[0]);//获取文件系统对象,并开流FileSystem fs = FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(path);//通过包装流转换为reader,方便按行读取BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));//逐行读取,按行处理String line;while (StringUtils.isNotEmpty(line = reader.readLine())){//切割String[] fields = line.split("\t");pdMap.put(fields[0],fields[1]);}//关流IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {//读取大表数据String[] fields = value.toString().split("\t");//通过大表每行数据的pid,去pdMap里面去除pnameString pname = pdMap.get(fields[1]);//将大表每行数据的pid替换为pnameoutK.set(fields[0] + "\t" + pname + "\t" + fields[2]);//写出context.write(outK,NullWritable.get());}
}
3.创建JoinDriver类
package com.yingzi.mapreduce.mapJoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;/*** @author 影子* @create 2022-01-15-14:06**/
public class MapJoinDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {//1.获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2.设置加载jar包路径job.setJarByClass(MapJoinDriver.class);//3.关联mapperjob.setMapperClass(MapJoinMapper.class);//4.设置Map输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//5.设置最终输出KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//加载缓存数据job.addCacheFile(new URI("file:///G:/计算机资料/大数据开发/尚硅谷大数据技术之Hadoop3.x/资料/11_input/tablecache/pd.txt"));//Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0job.setNumReduceTasks(0);//6.设置输入输出路径FileInputFormat.setInputPaths(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\11_input\\inputtable"));FileOutputFormat.setOutputPath(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\_output\\output13"));//7.提交job对象boolean b = job.waitForCompletion(true);System.exit(b ? 0:1);}
}
4)查看结果
Map And Reduce Join的使用案例相关推荐
- Reduce Join介绍及案例
Reduce Join介绍及案例 Reduce Join介绍 Reduce Join案例 需求 1.需求说明 2.文件 案例分析 1.需求分析 2.输入数据 3.期望输出数据 4.MapTask 5. ...
- Map Join介绍及案例
Map Join介绍及案例 Map Join介绍 1. 使用场景 2. 优点 3. 实现方法 Map Join案例 1. 需求 (1)需求说明 (2)文件 2.案例分析 (1)需求分析 (2)输入数据 ...
- 【大数据day14】——MapReduce的运行机制详解(案列:Reduce 端实现 JOIN, Map端实现 JOIN,求共同好友)
文章目录 1 .MapReduce的运行机制详解 1.1:MapTask 工作机制 详细步骤 配置 1.2 :ReduceTask 工作机制 详细步骤 1.3:Shuffle 过程 2. 案例: Re ...
- 【MapReduce】基础案例 ---- Reduce Join 实现数据合并(表连接)
文章目录 一.Reduce Join ① Reduce Join工作原理 ② Reduce Join 案例 ☠ 需求 ☠ 案例分析 ☠ 代码实现 封装Bean对象 Mapper阶段 Reducer阶段 ...
- Hive mapreduce的map与reduce个数由什么决定?
文章目录 1.MapTask的数量决定 2.如何来调整MapTask的数量 2.1 增加map的数量 : 调小maxsize (要小于blockSize才有效,比如100byte) 2.2 减少map ...
- 详解Python函数式编程之map、reduce、filter
map().reduce().filter()是Python中很常用的几个函数,也是Python支持函数式编程的重要体现.不过,在Python 3.x中,reduce()不是内置函数,而是放到了标准库 ...
- reduce 数据倾斜_Spark(四十)数据倾斜解决方案之将reduce join转换
一.背景 1.将reduce join转换为map join 2.broadcast出来的普通变量 普通的join,那么肯定是要走shuffle:那么,所以既然是走shuffle,那么普通的join, ...
- hadoop3 任务卡在map 0% reduce 0%的解决方案
在VMWare上面配置好了hadoop3之后,当然是先试跑一下案例.然而在master上面运行wordcount的时候却出现了卡在map 0% reduce 0%的情况,搜索了这个问题的相关解决方案, ...
- python lambda map reduce_python:lambda、filter、map、reduce
lambda 为关键字.filter,map,reduce为内置函数. lambda:实现python中单行最小函数. g = lambda x: x * 2 #相当于 def g(x): retur ...
最新文章
- zts在c语言中的意思,C语言入门-全局变量 - osc_wna7tzts的个人空间 - OSCHINA - 中文开源技术交流社区...
- python中如何创建包_如何在Python中创建命名空间包?
- Java-工具类之发送邮件
- Cstring转化为String
- VTK修炼之道42:频域处理_高通滤波(理想+巴特沃兹)
- 「Apollo」直接在docker内部安装miniconda失败
- 配置hibernate_测试Hibernate的最低配置
- 【Java】RuleSource约束常用方法整理
- 使用 docker+tmux 加强容器调度
- 计算机音乐叫什么名字,电脑开机那段美妙的音乐叫什么名字?
- Spark SQL初始化和创建DataFrame的几种方式
- java 中文乱码转换_java中文乱码怎么转换
- C4D中常用材质的创建与赋予
- VBlog 的代码结构, 使用 vue-element, vue-vant 组件开发的纯前端博客
- hbiuder运行php_hbuilderx中运行php图文说明
- 微信小程序页面跳转失效原因
- 批量挖掘SRC思路与实践一
- 干货技巧|如何用3DsMax制作笔记本电脑
- Manjaro引导项丢失修复
- opencv学习之(三)-LBP算法的研究及其实现
热门文章
- 针对关键字是字符串的一个比较好的散列函数
- COCOS2D-X编译成android的项目
- 中文乱码在java中URLEncoder.encode方法要调用两次解决
- opipe--一款基于java的自定义“命令”管道
- Leetcode 814.二叉树剪枝
- 【React】JSX
- 20160828小结
- scipy.misc 介绍
- iOS 9检测QQ、微信是否安装
- 【Python+OpenCV】Windows+Python3.6.0(Anaconda3)+OpenCV3.2.0安装配置