Reduce Join介绍及案例
Reduce Join介绍及案例
- Reduce Join介绍
- Reduce Join案例
- 需求
- 1.需求说明
- 2.文件
- 案例分析
- 1.需求分析
- 2.输入数据
- 3.期望输出数据
- 4.MapTask
- 5. ReduceTask
- 代码实现
- 1.创建商品和订单合并后的Bean类
- 2.编写TableMapper类
- 3.编写TableReducer类
- 4. 编写TableDriver类
- 结果截图
Reduce Join介绍
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
Reduce Join案例
需求
1.需求说明
将商品信息表中的数据根据商品pid合并到订单数据表。
2.文件
案例分析
1.需求分析
通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。
2.输入数据
3.期望输出数据
4.MapTask
(1)在Map中处理的事情
- 获取输入文本类型
- 获取输入数据
- 不同文件分别处理
- 封装bean对象输出
(2)默认对产品id排序
5. ReduceTask
Reduce方法缓存订单数据集合和产品表,然后进行合并
代码实现
1.创建商品和订单合并后的Bean类
package com.atguigu.mr.table;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;public class TableBean implements Writable {private String id;//订单idprivate String pid; // 产品idprivate int amount; //数量private String pname; // 产品名称private String flag; //标记是订单表还是产品表public TableBean() {super();}public TableBean(String id, String pid, int amount, String name, String flag) {super();this.id = id;this.pid = pid;this.amount = amount;this.pname = name;this.flag = flag;}//序列化@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(flag);}//反序列化@Overridepublic void readFields(DataInput in) throws IOException {id = in.readUTF();pid = in.readUTF();amount = in.readInt();pname = in.readUTF();flag = in.readUTF();}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic String toString() {return id + "\t" + amount + "\t" + pname;}
}
2.编写TableMapper类
package com.atguigu.mr.table;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {String name;@Overrideprotected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context)throws IOException, InterruptedException {//获取文件名称FileSplit inputSplit = (FileSplit)context.getInputSplit();name = inputSplit.getPath().getName();}TableBean tableBean = new TableBean();Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context)throws IOException, InterruptedException {//获取一行String line = value.toString();if (name.startsWith("order")) {//订单表String[] fields = line.split("\t");//封装key和valuetableBean.setId(fields[0]);tableBean.setPid(fields[1]);tableBean.setAmount(Integer.parseInt(fields[2]));tableBean.setPname("");tableBean.setFlag("order");k.set(fields[1]);}else {//产品表String[] fields = line.split("\t");//封装key和valuetableBean.setId("");tableBean.setPid(fields[0]);tableBean.setAmount(0);tableBean.setPname(fields[1]);tableBean.setFlag("pd");k.set(fields[0]);}//写出context.write(k, tableBean);}}
3.编写TableReducer类
package com.atguigu.mr.table;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo.Bean;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values,Context context) throws IOException, InterruptedException {//存放所有订单集合ArrayList<TableBean> orderBeans = new ArrayList<>();//存放产品信息TableBean pdBean = new TableBean();for (TableBean tableBean : values) {if ("order".equals(tableBean.getFlag())) { //订单表TableBean tmpBean = new TableBean();try {BeanUtils.copyProperties(tmpBean, tableBean);orderBeans.add(tmpBean);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}else {try {BeanUtils.copyProperties(pdBean, tableBean);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}for (TableBean tableBean : orderBeans) {tableBean.setPname(pdBean.getPname());context.write(tableBean, NullWritable.get());}}}
4. 编写TableDriver类
package com.atguigu.mr.table;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TableDriver {public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {// 0 根据自己电脑路径重新配置args = new String[] { "S:\\centos学习笔记\\input\\tableinput", "S:\\centos学习笔记\\output\\tableoutput" };// 1 获取配置信息,或者job对象实例Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2 指定本程序的jar包所在的本地路径job.setJarByClass(TableDriver.class);// 3 指定本业务job要使用的Mapper/Reducer业务类job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);// 4 指定Mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);// 5 指定最终输出的数据的kv类型job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);// 6 指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}
结果截图
Reduce Join介绍及案例相关推荐
- Map Join介绍及案例
Map Join介绍及案例 Map Join介绍 1. 使用场景 2. 优点 3. 实现方法 Map Join案例 1. 需求 (1)需求说明 (2)文件 2.案例分析 (1)需求分析 (2)输入数据 ...
- Map And Reduce Join的使用案例
文章目录 Reduce Join 1)需求 2)需求分析 3)编程实现 1.创建Bean类 2.创建Mapper类 3.创建Reducer类 4.创建Driver类 4)查看结果 5)总结 Mappe ...
- 【MapReduce】基础案例 ---- Reduce Join 实现数据合并(表连接)
文章目录 一.Reduce Join ① Reduce Join工作原理 ② Reduce Join 案例 ☠ 需求 ☠ 案例分析 ☠ 代码实现 封装Bean对象 Mapper阶段 Reducer阶段 ...
- wow mysql dbc_DBC中悲观锁介绍附案例详解
DBC中悲观锁介绍附案例详解 了解下DBC中悲观锁: 代码如下: BDUtils 工具类: package JDBC; import java.sql.*; public class BDUtils ...
- python高阶函数、map reduce 自己如何去定义_「python」高阶函数map、reduce的介绍
Python map 先看官方介绍: map(function, iterable, ...)Return an iterator that applies function to every ite ...
- 资源放送丨《MGR原理介绍与案例分享》PPT视频
点击上方"蓝字" 关注我们,享更多干货! 前段时间,墨天轮邀请数据库资深专家 黄江平 老师分享了<MGR原理介绍与案例分享>(<MySQL Group Rrpli ...
- 本周四直播预告(内含福利)丨 经典知识库:MGR原理介绍与案例分享
经典知识库:MGR原理介绍与案例分享-9月2日20:00 MySQL推出MGR之前,传统复制分为两种:异步复制.半同步复制 :随着应用对数据库的高可用要求越来越高,MySQL MGR应运而生. MyS ...
- reduce 数据倾斜_Spark(四十)数据倾斜解决方案之将reduce join转换
一.背景 1.将reduce join转换为map join 2.broadcast出来的普通变量 普通的join,那么肯定是要走shuffle:那么,所以既然是走shuffle,那么普通的join, ...
- puppet成长日记二 Package资源详细介绍及案例分析
puppet成长日记二 Package资源详细介绍及案例分析 一.系统环境 1.puppet服务端 Release:RHEL6.4 HOSTNAME: puppetserver.rsyslog.org ...
最新文章
- ZABBIX API简介及使用
- Unity有哪些让做项目事半功倍的插件值得推荐?
- Java语言程序设计(序)
- chstr php,PHPWind与Discuz截取字符函数substrs与cutstr性能比较
- [转]wxParse-微信小程序富文本解析组件
- Cocos2d-x 坐标系及其坐标转换
- 数据库表多维度数据的计算和汇总
- Docker Redis 安装
- ApiPost、Postman及并发测试工具Jmeter、PostJson接口测试工具,设置参数传递。
- mermaid流程图语法教程
- matlab微积分如何计算器,如何用matlab对这个函数进行积分。 请问这个公式是如何算出来的?使用微积分吗?...
- SQL Server阻塞与锁
- Dubbo学习笔记:No provider available for the service ...异常问题的解决
- 四阶魔方邻角互换公式
- RK3568平台开发系列讲解(驱动篇)驱动开发之GPIO使用
- 【Office Outlook】发送具有数字签名的电子邮件
- Linux内核内存检测工具KASAN
- NPDP学员王杰备考心得:多看书,多记,多刷题
- VIEWGOOD(远古)P2P流媒体直播系统的设计与实现
- 智能眼镜、语音识别等8种输入设备即将取代传统键盘
热门文章
- Go借助PProf的一次性能优化
- Golang 入门系列(九) 如何读取YAML,JSON,INI等配置文件...
- day1 作业二:多级菜单操作
- 五条强化 SSH 安全的建议
- /proc/mtd 各参数的含义 -- linux内核
- C#零基础入门04:打老鼠初级之枚举、重构、事件处理器
- MFC:怎么将程序窗口最小化到系统托盘
- sqlserver创建对于job失败_创建维护计划失败创建 对于 JobStep“子计划”失败
- 博途v15做上位画面_realme真我V15评测:当科技遇上国潮 越级还能这么玩
- 建空列表list,数组array,矩阵matrix