文章目录

  • Reduce Join
    • 1)需求
    • 2)需求分析
    • 3)编程实现
      • 1.创建Bean类
      • 2.创建Mapper类
      • 3.创建Reducer类
      • 4.创建Driver类
    • 4)查看结果
    • 5)总结
  • Mapper Join
    • 2)需求分析
    • 3)编程实现
      • 1.创建Bean类:
      • 2.创建JoinMapper类
      • 3.创建JoinDriver类
    • 4)查看结果

Reduce Join

  • Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录,然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
  • Reduce端的主要工作:在Reduce端以链接字段作为key的分组已经完成,我们只需要在每一个分组中将那些来源于不同文件的记录(在Map阶段已经打上标志了)分开,最后进行合并就ok了

1)需求

2)需求分析

3)编程实现

1.创建Bean类

package com.yingzi.mapreduce.reduceJoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** @author 影子* @create 2022-01-15-12:45**/
public class TableBean implements Writable {private String id;  //订单idprivate String pid; //产品idprivate int amount;  //产品数量private String pname;   //产品名称private String flag;    //判断是order表还是pd表的标志字段public TableBean() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic String toString() {return id + "\t" + pname + "\t" + amount;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(id);dataOutput.writeUTF(pid);dataOutput.writeInt(amount);dataOutput.writeUTF(pname);dataOutput.writeUTF(flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.id = dataInput.readUTF();this.pid = dataInput.readUTF();this.amount = dataInput.readInt();this.pname = dataInput.readUTF();this.flag = dataInput.readUTF();}
}

2.创建Mapper类

package com.yingzi.mapreduce.reduceJoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;/*** @author 影子* @create 2022-01-15-12:54**/
public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {private String filename;private Text outK= new Text();private TableBean outV = new TableBean();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {//获取对应文件名称InputSplit split = context.getInputSplit();FileSplit fileSplit = (FileSplit) split; //向下强转filename = fileSplit.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {//获取一行String line = value.toString();//判断是哪个文件if (filename.contains("order")){//订单表的处理String[] split = line.split("\t");;//封装outK,outVoutK.set(split[1]);outV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2]));outV.setPname("");outV.setFlag("order");}else{//商品表的处理String[] split = line.split("\t");//封装outk,outVoutK.set(split[0]);outV.setId("");outV.setPid(split[0]);outV.setAmount(0);outV.setPname(split[1]);outV.setFlag("pd");}//写出outK,outVcontext.write(outK,outV);}
}

3.创建Reducer类

package com.yingzi.mapreduce.reduceJoin;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;/*** @author 影子* @create 2022-01-15-13:23**/
public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {ArrayList<TableBean> orderBeans = new ArrayList<>();TableBean pdBean = new TableBean();for (TableBean value : values) {//判断数据来自哪个表if("order".equals(value.getFlag())){//订单表//创建一个临时TableBean对象接收valueTableBean tmpOrderBean = new TableBean();try {BeanUtils.copyProperties(tmpOrderBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}orderBeans.add(tmpOrderBean);}else {//商品表try {BeanUtils.copyProperties(pdBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}//循环遍历orderBeans,替换掉每个orderBean的pid为pname,然后写出for (TableBean orderBean : orderBeans) {orderBean.setPname(pdBean.getPname());//写出修改后的orderBean对象context.write(orderBean,NullWritable.get());}}
}

4.创建Driver类

package com.yingzi.mapreduce.reduceJoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author 影子* @create 2022-01-15-13:45**/
public class TableDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Job job = Job.getInstance(new Configuration());job.setJarByClass(TableDriver.class);job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\11_input\\inputtable"));FileOutputFormat.setOutputPath(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\_output\\output10"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

4)查看结果

5)总结

  • 缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜
  • 解决方案:Map端实现数据合并

Mapper Join

(1)使用场景:Map Join适用于一张表十分小、一张表很大的场景(2)优点:在Reduce端处理过多的表,非常容易产生数据倾斜,怎么办?在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力尽可能减少数据倾斜。(3)具体办法:采用DistributeCachea)在Mapper的setup阶段,将文件读取到缓存集合中b)在Driver驱动类中加载缓存//缓存普通文件到 Task 运行节点。job.addCacheFile(new URI("file:///e:/cache/pd.txt"));//如果是集群运行,需要设置 HDFS 路径job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));

2)需求分析

3)编程实现

1.创建Bean类:

和上例无差别

package com.yingzi.mapreduce.mapJoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** @author 影子* @create 2022-01-15-12:45**/
public class TableBean implements Writable {private String id;  //订单idprivate String pid; //产品idprivate int amount;  //产品数量private String pname;   //产品名称private String flag;    //判断是order表还是pd表的标志字段public TableBean() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic String toString() {return id + "\t" + pname + "\t" + amount;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(id);dataOutput.writeUTF(pid);dataOutput.writeInt(amount);dataOutput.writeUTF(pname);dataOutput.writeUTF(flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.id = dataInput.readUTF();this.pid = dataInput.readUTF();this.amount = dataInput.readInt();this.pname = dataInput.readUTF();this.flag = dataInput.readUTF();}
}

2.创建JoinMapper类

package com.yingzi.mapreduce.mapJoin;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;/*** @author 影子* @create 2022-01-15-14:19**/
public class MapJoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> {private HashMap<String, String> pdMap = new HashMap<>();private Text outK = new Text();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {//  获取缓存的文件,并把文件内容封装到pd.txtURI[] cacheFiles = context.getCacheFiles();Path path = new Path(cacheFiles[0]);//获取文件系统对象,并开流FileSystem fs = FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(path);//通过包装流转换为reader,方便按行读取BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));//逐行读取,按行处理String line;while (StringUtils.isNotEmpty(line = reader.readLine())){//切割String[] fields = line.split("\t");pdMap.put(fields[0],fields[1]);}//关流IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {//读取大表数据String[] fields = value.toString().split("\t");//通过大表每行数据的pid,去pdMap里面去除pnameString pname = pdMap.get(fields[1]);//将大表每行数据的pid替换为pnameoutK.set(fields[0] + "\t" + pname + "\t" + fields[2]);//写出context.write(outK,NullWritable.get());}
}

3.创建JoinDriver类

package com.yingzi.mapreduce.mapJoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;/*** @author 影子* @create 2022-01-15-14:06**/
public class MapJoinDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {//1.获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2.设置加载jar包路径job.setJarByClass(MapJoinDriver.class);//3.关联mapperjob.setMapperClass(MapJoinMapper.class);//4.设置Map输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//5.设置最终输出KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//加载缓存数据job.addCacheFile(new URI("file:///G:/计算机资料/大数据开发/尚硅谷大数据技术之Hadoop3.x/资料/11_input/tablecache/pd.txt"));//Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0job.setNumReduceTasks(0);//6.设置输入输出路径FileInputFormat.setInputPaths(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\11_input\\inputtable"));FileOutputFormat.setOutputPath(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\_output\\output13"));//7.提交job对象boolean b = job.waitForCompletion(true);System.exit(b ? 0:1);}
}

4)查看结果

Map And Reduce Join的使用案例相关推荐

  1. Reduce Join介绍及案例

    Reduce Join介绍及案例 Reduce Join介绍 Reduce Join案例 需求 1.需求说明 2.文件 案例分析 1.需求分析 2.输入数据 3.期望输出数据 4.MapTask 5. ...

  2. Map Join介绍及案例

    Map Join介绍及案例 Map Join介绍 1. 使用场景 2. 优点 3. 实现方法 Map Join案例 1. 需求 (1)需求说明 (2)文件 2.案例分析 (1)需求分析 (2)输入数据 ...

  3. 【大数据day14】——MapReduce的运行机制详解(案列:Reduce 端实现 JOIN, Map端实现 JOIN,求共同好友)

    文章目录 1 .MapReduce的运行机制详解 1.1:MapTask 工作机制 详细步骤 配置 1.2 :ReduceTask 工作机制 详细步骤 1.3:Shuffle 过程 2. 案例: Re ...

  4. 【MapReduce】基础案例 ---- Reduce Join 实现数据合并(表连接)

    文章目录 一.Reduce Join ① Reduce Join工作原理 ② Reduce Join 案例 ☠ 需求 ☠ 案例分析 ☠ 代码实现 封装Bean对象 Mapper阶段 Reducer阶段 ...

  5. Hive mapreduce的map与reduce个数由什么决定?

    文章目录 1.MapTask的数量决定 2.如何来调整MapTask的数量 2.1 增加map的数量 : 调小maxsize (要小于blockSize才有效,比如100byte) 2.2 减少map ...

  6. 详解Python函数式编程之map、reduce、filter

    map().reduce().filter()是Python中很常用的几个函数,也是Python支持函数式编程的重要体现.不过,在Python 3.x中,reduce()不是内置函数,而是放到了标准库 ...

  7. reduce 数据倾斜_Spark(四十)数据倾斜解决方案之将reduce join转换

    一.背景 1.将reduce join转换为map join 2.broadcast出来的普通变量 普通的join,那么肯定是要走shuffle:那么,所以既然是走shuffle,那么普通的join, ...

  8. hadoop3 任务卡在map 0% reduce 0%的解决方案

    在VMWare上面配置好了hadoop3之后,当然是先试跑一下案例.然而在master上面运行wordcount的时候却出现了卡在map 0% reduce 0%的情况,搜索了这个问题的相关解决方案, ...

  9. python lambda map reduce_python:lambda、filter、map、reduce

    lambda 为关键字.filter,map,reduce为内置函数. lambda:实现python中单行最小函数. g = lambda x: x * 2 #相当于 def g(x): retur ...

最新文章

  1. zts在c语言中的意思,C语言入门-全局变量 - osc_wna7tzts的个人空间 - OSCHINA - 中文开源技术交流社区...
  2. python中如何创建包_如何在Python中创建命名空间包?
  3. Java-工具类之发送邮件
  4. Cstring转化为String
  5. VTK修炼之道42:频域处理_高通滤波(理想+巴特沃兹)
  6. 「Apollo」直接在docker内部安装miniconda失败
  7. 配置hibernate_测试Hibernate的最低配置
  8. 【Java】RuleSource约束常用方法整理
  9. 使用 docker+tmux 加强容器调度
  10. 计算机音乐叫什么名字,电脑开机那段美妙的音乐叫什么名字?
  11. Spark SQL初始化和创建DataFrame的几种方式
  12. java 中文乱码转换_java中文乱码怎么转换
  13. C4D中常用材质的创建与赋予
  14. VBlog 的代码结构, 使用 vue-element, vue-vant 组件开发的纯前端博客
  15. hbiuder运行php_hbuilderx中运行php图文说明
  16. 微信小程序页面跳转失效原因
  17. 批量挖掘SRC思路与实践一
  18. 干货技巧|如何用3DsMax制作笔记本电脑
  19. Manjaro引导项丢失修复
  20. opencv学习之(三)-LBP算法的研究及其实现

热门文章

  1. 针对关键字是字符串的一个比较好的散列函数
  2. COCOS2D-X编译成android的项目
  3. 中文乱码在java中URLEncoder.encode方法要调用两次解决
  4. opipe--一款基于java的自定义“命令”管道
  5. Leetcode 814.二叉树剪枝
  6. 【React】JSX
  7. 20160828小结
  8. scipy.misc 介绍
  9. iOS 9检测QQ、微信是否安装
  10. 【Python+OpenCV】Windows+Python3.6.0(Anaconda3)+OpenCV3.2.0安装配置