文章目录

  • 一、Reduce Join
    • ① Reduce Join工作原理
    • ② Reduce Join 案例
      • ☠ 需求
      • ☠ 案例分析
      • ☠ 代码实现
        • 封装Bean对象
        • Mapper阶段
        • Reducer阶段
        • Driver阶段
      • ☠ 总结

一、Reduce Join

① Reduce Join工作原理

  • Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段(公共字段)作为key,其余部分和新加的标志作为 value,最后进行输出。
  • Reduce端的主要工作:在 Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了

返回顶部


② Reduce Join 案例

☠ 需求


返回顶部


☠ 案例分析

通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。

返回顶部


☠ 代码实现

封装Bean对象

package 第三章_MR框架原理.多种join应用;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class TableBean implements Writable{// id pid amountprivate String id; // 订单idprivate String pid;// 产品idprivate int amount;// 产品数量// pid pnameprivate String pname;// 产品名称private String flag; // 定义一个标记,标记是订单表还是产品表/*** 空参构造*/public TableBean() {}/***  有参构造* @param id* @param pid* @param amount* @param pname* @param flag*/public TableBean(String id, String pid, int amount, String pname, String flag) {this.id = id;this.pid = pid;this.amount = amount;this.pname = pname;this.flag = flag;}/*** 序列化* @param dataOutput* @throws IOException*/@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(id);dataOutput.writeUTF(pid);dataOutput.writeInt(amount);dataOutput.writeUTF(pname);dataOutput.writeUTF(flag);}/*** 反序列化* @param dataInput* @throws IOException*/@Overridepublic void readFields(DataInput dataInput) throws IOException {id = dataInput.readUTF();pid = dataInput.readUTF();amount = dataInput.readInt();pname = dataInput.readUTF();flag = dataInput.readUTF();}/*** 重写toString* @return*/@Overridepublic String toString() {return  id + '\t' + pname + '\t' + amount + '\'';}/*** set、get* @return*/public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}
}

返回顶部


Mapper阶段

与之前有所不同

  • 需要重写setUp()方法通过切片信息获取文件名称,判断读取的是哪个文件
  • map()方法中要对读取的文件判断处理
package 第三章_MR框架原理.多种join应用;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class TableMap extends Mapper<LongWritable, Text, Text, TableBean> {String name;Text k = new Text();TableBean v = new TableBean();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {// 获取文件的名称 --- 通过切片信息获取FileSplit inputSplit = (FileSplit) context.getInputSplit();name = inputSplit.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// id pid amount// 1001   01  1// pid pname// 01  小米// 1.获取一行数据String line = value.toString();// 2.判断数据来源if (name.startsWith("order")) {  // 订单表数据// 3.封装输出的key、valueString[] fields = line.split("\t");String id = fields[0];String pid = fields[1];int amount = Integer.parseInt(fields[2]);v.setId(id);v.setPid(pid);v.setAmount(amount);v.setPname("");      // 没有传入空字符串,但不能不写,因为序列化的时候有v.setFlag("order");  // 设定标记 --- order表k.set(pid);// 4.写出context.write(k,v);} else {  // 产品表数据// 3.封装输出的key、valueString[] fields = line.split("\t");String pid = fields[0];String pname = fields[1];v.setId("");       // 没有传入空字符串,但不能不写,因为序列化的时候有v.setPid(pid);     // 没有传入默认值0v.setAmount(0);v.setPname(pname);v.setFlag("opd");  // 设定标记 --- order表k.set(pid);// 4.写出context.write(k,v);}}
}

返回顶部


Reducer阶段

package 第三章_MR框架原理.多种join应用;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;public class TableReduce extends Reducer<Text,TableBean,TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {// 1.存储所有订单的集合 --- orderArrayList<TableBean> orderBeans = new ArrayList<>();// 2.存储产品信息 --- pdTableBean pdBean = new TableBean();// 3.遍历存储for (TableBean value:values){               // 循环TableBean对象if ("order".equals(value.getFlag())){   // 读取标记,判定为订单表// 创建临时TableBean对象TableBean temp = new TableBean();try {// 将当前循环到的对象内容拷贝到临时TableBean对象BeanUtils.copyProperties(temp,value);// 将其添加到集合中orderBeans.add(temp);} catch (Exception e){e.printStackTrace();}}else{                                  // 读取标记,判定为产品表try {// 将当前循环到的对象内容拷贝到BeanUtils.copyProperties(pdBean,value);} catch (Exception e){e.printStackTrace();}}}// 4.将设置对应的产品名称,写出for (TableBean tableBean:orderBeans){tableBean.setPname(pdBean.getPname());context.write(tableBean,NullWritable.get());}}
}

这里说明一下,为什么一个是集合,一个是对象:

  • 相同的key可以同时进入到同一个reduce()中,在MapTask的时候对key进行了默认排序
  • 如下图所示,一个reduce中只能获取01部分(或02或03)的数据集
  • 而对应的每个部分中的order表数据不止一条,但是pd表数据只有一条,所以分别用集合、对象来存储。

返回顶部


Driver阶段

package 第三章_MR框架原理.多种join应用;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TableDriver {public static void main(String[] args) {Job job = null;Configuration conf = new Configuration();try{// 1.获取jobjob = Job.getInstance(conf);// 2.配置job.setMapperClass(TableMap.class);job.setReducerClass(TableReduce.class);job.setJarByClass(TableDriver.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);// 设置输入输出路径FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\多种join应用\\data"));FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\多种join应用\\output"));// 提交jobboolean result = job.waitForCompletion(true);System.exit(result? 0:1);} catch (Exception e){e.printStackTrace();}}
}


返回顶部


☠ 总结

返回顶部


【MapReduce】基础案例 ---- Reduce Join 实现数据合并(表连接)相关推荐

  1. Python数据分析【第9天】| DataFrame的属性编码、数据合并和连接(get_dummies,merge,join,concat)

    系列文章目录 第1天:读入数据 第2天:read().readline()与readlines() 第3天:进度条(tqdm模块) 第4天:命令行传参(argparse模块) 第5天:读.写json文 ...

  2. Mysql基础篇(1)—— 基础概念、DML基本语法和表连接

    前言 Mysql基础篇相关的内容是看了康师傅的视频做的笔记吧 数据库相关概念 DB: 数据库(Database) ​ 存储数据的仓库,本质是一个文件系统.它保存了一系列有组织的数据. DBMS:数据库 ...

  3. R语言数据合并与连接技巧

    最近仍然在陆陆续续自学,真.生命不息学习不止,这次和大家分享一些实用的数据处理技巧,干货满满! 一.数据合并 涉及函数cbind(),rbind(),bind_rows(),bind_cols(). ...

  4. delete表1条件是另一个表中的数据,多表连接删除(转)

    DELETE删除多表数据,怎样才能同时删除多个关联表的数据呢?这里做了深入的解释: 1. delete from t1 where 条件 2.delete t1 from t1 where 条件 3. ...

  5. delete表1条件是另一个表中的数据,多表连接删除

    2019独角兽企业重金招聘Python工程师标准>>> 数据库中有两张表. DELETE cdb_posts,cdb_threads FROM cdb_posts ,cdb_thre ...

  6. Reduce Join介绍及案例

    Reduce Join介绍及案例 Reduce Join介绍 Reduce Join案例 需求 1.需求说明 2.文件 案例分析 1.需求分析 2.输入数据 3.期望输出数据 4.MapTask 5. ...

  7. Pandas简明教程:九、表的合并、连接、拼接(数据聚合基础)

    真实场景中常会遇到多种信息放在不同的表里的情况,此时我们就需要将这些表格的信息整合到一起.这种操作可以极大地减轻我们手动粘数据的工作,从而达到事半功倍的效果. 由于本篇要举的例子较多,因此直接采用官网 ...

  8. 数据合并之concat、append、merge和join

    Pandas 是一套用于 Python 的快速.高效的数据分析工具.它可以用于数据挖掘和数据分析,同时也提供数据清洗功能.本文将详细讲解数据合并与连接,目录如下: ① concat 一.定义 conc ...

  9. Pandas 中DataFrame 数据合并 Contract | Merge

    最近在工作中,遇到了数据合并.连接的问题,故整理如下,供需要者参考~ 参考自:象在舞:https://blog.csdn.net/gdkyxy2013/article/details/80785361 ...

最新文章

  1. 读后感和机翻《他们在看哪里,为什么看?在复杂的任务中共同推断人类的注意力和意图》
  2. 计算机房 门,标准机房门的规格
  3. Java-Java5.0泛型解读
  4. python开发mbus程序_Python pywmbus包_程序模块 - PyPI - Python中文网
  5. ​马卡龙配色你好夏天PPT模板​
  6. java应用程序的执行起点是什么方法_Java应用程序的执行起点是____________方法。(3.0分)_学小易找答案...
  7. Eclipse如何打出war包
  8. 20.从0开始的微服务架构
  9. KALI安装软件链接不上源,数字签名软件保护,Kali Linux 更新源 数字签名无效处理
  10. Android WiFi only配置
  11. 计算机辅助项目管理实验论文,计算机辅助项目管理课程设计--毕设论文.doc
  12. 全栈开发-Python的介绍
  13. cath数据库fasta备注_sam's note
  14. iOS小技巧11-Xcode中相对路径和绝对路径的使用
  15. RT-Thread Studio 红外Infrared使用笔记
  16. (二)最新版Django项目数据库迁移;读取数据库增添删改;以及显示在html或vue前端(Django+Vue+Mysql,数据库管理数据分析网站)
  17. 评论360和腾讯--程苓峰--《战争的本质》
  18. 专科生学习云计算的就业前景如何?
  19. 安装问题:bokeh安装报错
  20. 大数据产业链和生态图谱

热门文章

  1. HelloWorld案例中显示找不到helloworld.java文件.
  2. 【密码学基础】RSA加密算法
  3. 友情链接交换标准注意事项
  4. word 插入 代码 优雅插入
  5. kafka数据丢失总结
  6. 第二章 网站开发基础之HTML教程 - 三、HTML及网页相关的知识点:img图片标签的宽度属性(width)和高度属性(height)
  7. CentOS7创建应用程序的桌面快捷方式
  8. C#WMI 操作 【转载】
  9. 天涯明月刀服务器维护了,《天涯明月刀》3月9日服务器例行维护公告
  10. 搜狗输入法显示过大/过小