目录

  • Reduce Join
  • Reduce Join案例
    • 需求
    • 需求分析
    • 运行结果
    • 缺点
    • 源码

Reduce Join

Map 端的主要工作:为来自不同表或文件的 key/value 对,打标签以区别不同来源的记 录。然后用连接字段作为 key,其余部分和新加的标志作为 value,最后进行输出;

Reduce 端的主要工作:在 Reduce 端以连接字段作为 key 的分组已经完成,我们只需要 在每一个分组当中将那些来源于不同文件的记录(在 Map 阶段已经打标志)分开,最后进 行合并就 ok 了。

Reduce Join案例

需求

订单表order:

产品表pd:

将上述两表中的数据根据商品pid合并到订单数据表中,要求呈现出如下图:

需求分析

通过将关联条件作为 Map 输出的 key,将两表满足 Join 条件的数据并携带数据所来源
的文件信息,发往同一个 ReduceTask,在 Reduce 中进行数据的串联。

Reduce端表合并(数据倾斜)

运行结果

本地order.txt

本地pd.txt

输出

缺点

缺点: 这种方式中,合并的操作是在 Reduce 阶段完成,Reduce 端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在 Reduce 阶段极易产生数据倾斜。

解决方案: Map 端实现数据合并==>

源码

tips:
hadoop迭代器中使用了对象重用,即迭代时value始终指向一个内存地址(引用值始终不变),改变的是引用指向的内存地址中的数据。

TableBean类:

package com.xiaobai.mapreduce.reduceJoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class TableBean implements Writable {private String id; //订单idprivate String pid; //商品idprivate int amount; //商品数量private String pname; // 商品名称private String flag; //标记表  order pd//空餐构造public TableBean() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}//序列化@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(flag);}//反序列化@Overridepublic void readFields(DataInput in) throws IOException {//反序列化应和序列化顺序一致this.id = in.readUTF();this.pid = in.readUTF();this.amount = in.readInt();this.pname = in.readUTF();this.flag = in.readUTF();}@Overridepublic String toString() {// id pname amountreturn id + "\t" + pname + "\t" + amount;}
}

TableMapper类:

package com.xiaobai.mapreduce.reduceJoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.File;
import java.io.IOException;public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {private String fileName;private Text outK = new Text();private TableBean outV = new TableBean();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//初始化  order  pd//获取文件名称 一个文件只获取一次FileSplit split = (FileSplit)context.getInputSplit();fileName = split.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1. 获取一行String line = value.toString();//2.判断是哪个文件的if(fileName.contains("order")){ //处理的是订单表orderString[] split = line.split("\t");//封装outK.set(split[1]);outV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2])); //要转换为String类型outV.setPname("");outV.setFlag("order");}else{ //处理的是产品表pdString[] split = line.split("\t");outK.set(split[0]);outV.setId("");outV.setPid(split[0]);outV.setAmount(0);outV.setPname(split[1]);outV.setFlag("pd");}//写出context.write(outK,outV);}
}

TableReducer类:

package com.xiaobai.mapreduce.reduceJoin;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {//  01  1001    1   order//  01  1004    4   order//  01  小米          pd//创建2个集合ArrayList<TableBean> orderBeans = new ArrayList<>();TableBean pdBean = new TableBean();//循环遍历for (TableBean value : values) {if("order".equals(value.getFlag())){   //order表//创建临时TableBean对象tmptableBeanTableBean tmptableBean = new TableBean();try {BeanUtils.copyProperties(tmptableBean,value); //使用工具类BeanUtils将value赋值给tmptableBean} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}orderBeans.add(tmptableBean);}else{ //pd表try {BeanUtils.copyProperties(pdBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}//循环遍历orderBeans,赋值pdnamefor (TableBean orderBean : orderBeans) {orderBean.setPname(pdBean.getPname());//id相同context.write(orderBean,NullWritable.get());}}
}

TableDriver类:

package com.xiaobai.mapreduce.reduceJoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;import java.io.IOException;public class TableDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance(new Configuration());job.setJarByClass(TableDriver.class);job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,new org.apache.hadoop.fs.Path("/Users/jane/Desktop/test/JoinTest"));FileOutputFormat.setOutputPath(job,new Path("/Users/jane/Desktop/hadoop/JoinTestOutput"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

hadoop--Reduce Join相关推荐

  1. Reduce Join介绍及案例

    Reduce Join介绍及案例 Reduce Join介绍 Reduce Join案例 需求 1.需求说明 2.文件 案例分析 1.需求分析 2.输入数据 3.期望输出数据 4.MapTask 5. ...

  2. Hadoop之Join、计数器、数据清洗概述

    Hadoop之Join.计数器.数据清洗概述 目录 Reduce join Map join 计数器应用 数据清洗(ETL) 1. Reduce join 原理 Map端的主要工作:为来自不同表(文件 ...

  3. Map And Reduce Join的使用案例

    文章目录 Reduce Join 1)需求 2)需求分析 3)编程实现 1.创建Bean类 2.创建Mapper类 3.创建Reducer类 4.创建Driver类 4)查看结果 5)总结 Mappe ...

  4. 【MapReduce】基础案例 ---- Reduce Join 实现数据合并(表连接)

    文章目录 一.Reduce Join ① Reduce Join工作原理 ② Reduce Join 案例 ☠ 需求 ☠ 案例分析 ☠ 代码实现 封装Bean对象 Mapper阶段 Reducer阶段 ...

  5. reduce 数据倾斜_Spark(四十)数据倾斜解决方案之将reduce join转换

    一.背景 1.将reduce join转换为map join 2.broadcast出来的普通变量 普通的join,那么肯定是要走shuffle:那么,所以既然是走shuffle,那么普通的join, ...

  6. hadoop data join

    概念: Hadoop有一个叫DataJoin的包为Data Join提供相应的框架.它的Jar包存在于contrib/datajoin/hadoop-*-datajoin. 为区别于其他的data j ...

  7. hadoop中join操作

    前言 在mysql中,经常涉及到2张表或者多张表的关联查询,通常通过中间字段将两个表做关联,在MapReduce中,某些场景下也会遇到类似的需求,比如说,将两个原始的日志文件,通过中间业务字段进行关联 ...

  8. MR实现reduce join和map join及hive的执行计划

    一.涵盖 MapReduce InputFormat RecordReader 切片:block=input split 1.1 File- Text- NLine- DB- Mapper setup ...

  9. Hadoop MapReduce编程 API入门系列之join(二十六)

    天气记录数据库 气象站数据库 气象站和天气记录合并之后的示意图如下所示. 011990-99999 SIHCCAJAVRI 195005150700 0 011990-99999 SIHCCAJAVR ...

  10. Hadoop实例学习(十三) Join应用

    目录 Reduce Join 解决的问题 实例 编写TableBean 类 编写TableMapper类 编写TableDriver类 Map Join 案例 编写MapJoinMapper类 编写M ...

最新文章

  1. 2007.04.26
  2. 小目标检测的一些问题,思路和方案
  3. 原生编辑器_免费开源的GIF制作神器,可录屏幕/摄像头/画板,自带编辑器
  4. c++的引用是什么意思?怎么回事?
  5. 直接进入ORACLE12C插件数据库
  6. Chapter 1 First Sight——30
  7. Java使用独立数据库连接池(DBCP为例)
  8. 大数据行为分析包含哪些功能
  9. R︱并行计算以及提高运算效率的方式(parallel包、clusterExport函数、SupR包简介)
  10. memcached mysql 性能测试_InnoDB memcached插件 vs 原生memcached对比性能测试
  11. 链表:从尾到头打印链表
  12. java混淆器_Java 混淆器
  13. nginx 服务器大文件上传时500错误
  14. Linux系统的权限管理
  15. Qt编写的复杂象棋程序 chessbroad.cpp 错误比较多
  16. Revit二次开发入门相关安装和配置
  17. 怎么把win10设置Linux样式,老司机教你把win10系统界面换成win7样式的方法
  18. 重组人碱性成纤维细胞生长因子(FGF 2)参考文献
  19. 5G智慧校园建设顶层设计方案智慧校园大脑建设方案
  20. html5的多媒体的属性及其简单的属性

热门文章

  1. (王道408考研操作系统)第三章内存管理-第一节7:非连续分配管理方式之基本分段管理方式
  2. Java统计每个大写字母的个数
  3. C#删除字符串倒数第几个字符后的所有字符串
  4. chmod递归授权文件夹(用法)
  5. Python zmq 讲解
  6. 141.Linked List Cycle
  7. OpenCV Using Python——基于SURF特征提取和金字塔LK光流法的单目视觉三维重建 (光流、场景流)...
  8. Django xadmin引入DjangoUeditor
  9. 判断应用程序是否是当前激活程序(获得焦点的程序)
  10. TcxComboBox控件说明