Reduce Join介绍及案例

  • Reduce Join介绍
  • Reduce Join案例
    • 需求
      • 1.需求说明
      • 2.文件
    • 案例分析
      • 1.需求分析
      • 2.输入数据
      • 3.期望输出数据
      • 4.MapTask
      • 5. ReduceTask
    • 代码实现
      • 1.创建商品和订单合并后的Bean类
      • 2.编写TableMapper类
      • 3.编写TableReducer类
      • 4. 编写TableDriver类
    • 结果截图

Reduce Join介绍

Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

Reduce Join案例

需求

1.需求说明

将商品信息表中的数据根据商品pid合并到订单数据表。

2.文件


案例分析

1.需求分析

通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。

2.输入数据


3.期望输出数据

4.MapTask

(1)在Map中处理的事情

  • 获取输入文本类型
  • 获取输入数据
  • 不同文件分别处理
  • 封装bean对象输出

(2)默认对产品id排序

5. ReduceTask

Reduce方法缓存订单数据集合和产品表,然后进行合并

代码实现

1.创建商品和订单合并后的Bean类

package com.atguigu.mr.table;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;public class TableBean implements Writable  {private String id;//订单idprivate String pid; // 产品idprivate int amount; //数量private String pname; // 产品名称private String flag; //标记是订单表还是产品表public TableBean() {super();}public TableBean(String id, String pid, int amount, String name, String flag) {super();this.id = id;this.pid = pid;this.amount = amount;this.pname = name;this.flag = flag;}//序列化@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(flag);}//反序列化@Overridepublic void readFields(DataInput in) throws IOException {id = in.readUTF();pid = in.readUTF();amount = in.readInt();pname = in.readUTF();flag = in.readUTF();}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic String toString() {return  id + "\t" + amount + "\t" + pname;}
}

2.编写TableMapper类

package com.atguigu.mr.table;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {String name;@Overrideprotected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context)throws IOException, InterruptedException {//获取文件名称FileSplit inputSplit = (FileSplit)context.getInputSplit();name = inputSplit.getPath().getName();}TableBean tableBean = new TableBean();Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context)throws IOException, InterruptedException {//获取一行String line = value.toString();if (name.startsWith("order")) {//订单表String[] fields = line.split("\t");//封装key和valuetableBean.setId(fields[0]);tableBean.setPid(fields[1]);tableBean.setAmount(Integer.parseInt(fields[2]));tableBean.setPname("");tableBean.setFlag("order");k.set(fields[1]);}else {//产品表String[] fields = line.split("\t");//封装key和valuetableBean.setId("");tableBean.setPid(fields[0]);tableBean.setAmount(0);tableBean.setPname(fields[1]);tableBean.setFlag("pd");k.set(fields[0]);}//写出context.write(k, tableBean);}}

3.编写TableReducer类

package com.atguigu.mr.table;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo.Bean;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values,Context context) throws IOException, InterruptedException {//存放所有订单集合ArrayList<TableBean> orderBeans = new ArrayList<>();//存放产品信息TableBean pdBean = new TableBean();for (TableBean tableBean : values) {if ("order".equals(tableBean.getFlag())) { //订单表TableBean tmpBean = new TableBean();try {BeanUtils.copyProperties(tmpBean, tableBean);orderBeans.add(tmpBean);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}else {try {BeanUtils.copyProperties(pdBean, tableBean);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}for (TableBean tableBean : orderBeans) {tableBean.setPname(pdBean.getPname());context.write(tableBean, NullWritable.get());}}}

4. 编写TableDriver类

package com.atguigu.mr.table;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TableDriver {public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {// 0 根据自己电脑路径重新配置args = new String[] { "S:\\centos学习笔记\\input\\tableinput", "S:\\centos学习笔记\\output\\tableoutput" };// 1 获取配置信息,或者job对象实例Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2 指定本程序的jar包所在的本地路径job.setJarByClass(TableDriver.class);// 3 指定本业务job要使用的Mapper/Reducer业务类job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);// 4 指定Mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);// 5 指定最终输出的数据的kv类型job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);// 6 指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}

结果截图

Reduce Join介绍及案例相关推荐

  1. Map Join介绍及案例

    Map Join介绍及案例 Map Join介绍 1. 使用场景 2. 优点 3. 实现方法 Map Join案例 1. 需求 (1)需求说明 (2)文件 2.案例分析 (1)需求分析 (2)输入数据 ...

  2. Map And Reduce Join的使用案例

    文章目录 Reduce Join 1)需求 2)需求分析 3)编程实现 1.创建Bean类 2.创建Mapper类 3.创建Reducer类 4.创建Driver类 4)查看结果 5)总结 Mappe ...

  3. 【MapReduce】基础案例 ---- Reduce Join 实现数据合并(表连接)

    文章目录 一.Reduce Join ① Reduce Join工作原理 ② Reduce Join 案例 ☠ 需求 ☠ 案例分析 ☠ 代码实现 封装Bean对象 Mapper阶段 Reducer阶段 ...

  4. wow mysql dbc_DBC中悲观锁介绍附案例详解

    DBC中悲观锁介绍附案例详解 了解下DBC中悲观锁: 代码如下: BDUtils 工具类: package JDBC; import java.sql.*; public class BDUtils ...

  5. python高阶函数、map reduce 自己如何去定义_「python」高阶函数map、reduce的介绍

    Python map 先看官方介绍: map(function, iterable, ...)Return an iterator that applies function to every ite ...

  6. 资源放送丨《MGR原理介绍与案例分享》PPT视频

    点击上方"蓝字" 关注我们,享更多干货! 前段时间,墨天轮邀请数据库资深专家 黄江平 老师分享了<MGR原理介绍与案例分享>(<MySQL Group Rrpli ...

  7. 本周四直播预告(内含福利)丨 经典知识库:MGR原理介绍与案例分享

    经典知识库:MGR原理介绍与案例分享-9月2日20:00 MySQL推出MGR之前,传统复制分为两种:异步复制.半同步复制 :随着应用对数据库的高可用要求越来越高,MySQL MGR应运而生. MyS ...

  8. reduce 数据倾斜_Spark(四十)数据倾斜解决方案之将reduce join转换

    一.背景 1.将reduce join转换为map join 2.broadcast出来的普通变量 普通的join,那么肯定是要走shuffle:那么,所以既然是走shuffle,那么普通的join, ...

  9. puppet成长日记二 Package资源详细介绍及案例分析

    puppet成长日记二 Package资源详细介绍及案例分析 一.系统环境 1.puppet服务端 Release:RHEL6.4 HOSTNAME: puppetserver.rsyslog.org ...

最新文章

  1. ZABBIX API简介及使用
  2. Unity有哪些让做项目事半功倍的插件值得推荐?
  3. Java语言程序设计(序)
  4. chstr php,PHPWind与Discuz截取字符函数substrs与cutstr性能比较
  5. [转]wxParse-微信小程序富文本解析组件
  6. Cocos2d-x 坐标系及其坐标转换
  7. 数据库表多维度数据的计算和汇总
  8. Docker Redis 安装
  9. ApiPost、Postman及并发测试工具Jmeter、PostJson接口测试工具,设置参数传递。
  10. mermaid流程图语法教程
  11. matlab微积分如何计算器,如何用matlab对这个函数进行积分。 请问这个公式是如何算出来的?使用微积分吗?...
  12. SQL Server阻塞与锁
  13. Dubbo学习笔记:No provider available for the service ...异常问题的解决
  14. 四阶魔方邻角互换公式
  15. RK3568平台开发系列讲解(驱动篇)驱动开发之GPIO使用
  16. 【Office Outlook】发送具有数字签名的电子邮件
  17. Linux内核内存检测工具KASAN
  18. NPDP学员王杰备考心得:多看书,多记,多刷题
  19. VIEWGOOD(远古)P2P流媒体直播系统的设计与实现
  20. 智能眼镜、语音识别等8种输入设备即将取代传统键盘

热门文章

  1. Go借助PProf的一次性能优化
  2. Golang 入门系列(九) 如何读取YAML,JSON,INI等配置文件...
  3. day1 作业二:多级菜单操作
  4. 五条强化 SSH 安全的建议
  5. /proc/mtd 各参数的含义 -- linux内核
  6. C#零基础入门04:打老鼠初级之枚举、重构、事件处理器
  7. MFC:怎么将程序窗口最小化到系统托盘
  8. sqlserver创建对于job失败_创建维护计划失败创建 对于 JobStep“子计划”失败
  9. 博途v15做上位画面_realme真我V15评测:当科技遇上国潮 越级还能这么玩
  10. 建空列表list,数组array,矩阵matrix