【MapReduce】基础案例 ---- Reduce Join 实现数据合并(表连接)
文章目录
- 一、Reduce Join
- ① Reduce Join工作原理
- ② Reduce Join 案例
- ☠ 需求
- ☠ 案例分析
- ☠ 代码实现
- 封装Bean对象
- Mapper阶段
- Reducer阶段
- Driver阶段
- ☠ 总结
一、Reduce Join
① Reduce Join工作原理
- Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段(公共字段)作为key,其余部分和新加的标志作为 value,最后进行输出。
- Reduce端的主要工作:在 Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
返回顶部
② Reduce Join 案例
☠ 需求
返回顶部
☠ 案例分析
通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。
返回顶部
☠ 代码实现
封装Bean对象
package 第三章_MR框架原理.多种join应用;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class TableBean implements Writable{// id pid amountprivate String id; // 订单idprivate String pid;// 产品idprivate int amount;// 产品数量// pid pnameprivate String pname;// 产品名称private String flag; // 定义一个标记,标记是订单表还是产品表/*** 空参构造*/public TableBean() {}/*** 有参构造* @param id* @param pid* @param amount* @param pname* @param flag*/public TableBean(String id, String pid, int amount, String pname, String flag) {this.id = id;this.pid = pid;this.amount = amount;this.pname = pname;this.flag = flag;}/*** 序列化* @param dataOutput* @throws IOException*/@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(id);dataOutput.writeUTF(pid);dataOutput.writeInt(amount);dataOutput.writeUTF(pname);dataOutput.writeUTF(flag);}/*** 反序列化* @param dataInput* @throws IOException*/@Overridepublic void readFields(DataInput dataInput) throws IOException {id = dataInput.readUTF();pid = dataInput.readUTF();amount = dataInput.readInt();pname = dataInput.readUTF();flag = dataInput.readUTF();}/*** 重写toString* @return*/@Overridepublic String toString() {return id + '\t' + pname + '\t' + amount + '\'';}/*** set、get* @return*/public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}
}
返回顶部
Mapper阶段
与之前有所不同:
- 需要
重写setUp()方法
,通过切片信息获取文件名称
,判断读取的是哪个文件 map()方法中要对读取的文件判断处理
package 第三章_MR框架原理.多种join应用;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class TableMap extends Mapper<LongWritable, Text, Text, TableBean> {String name;Text k = new Text();TableBean v = new TableBean();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {// 获取文件的名称 --- 通过切片信息获取FileSplit inputSplit = (FileSplit) context.getInputSplit();name = inputSplit.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// id pid amount// 1001 01 1// pid pname// 01 小米// 1.获取一行数据String line = value.toString();// 2.判断数据来源if (name.startsWith("order")) { // 订单表数据// 3.封装输出的key、valueString[] fields = line.split("\t");String id = fields[0];String pid = fields[1];int amount = Integer.parseInt(fields[2]);v.setId(id);v.setPid(pid);v.setAmount(amount);v.setPname(""); // 没有传入空字符串,但不能不写,因为序列化的时候有v.setFlag("order"); // 设定标记 --- order表k.set(pid);// 4.写出context.write(k,v);} else { // 产品表数据// 3.封装输出的key、valueString[] fields = line.split("\t");String pid = fields[0];String pname = fields[1];v.setId(""); // 没有传入空字符串,但不能不写,因为序列化的时候有v.setPid(pid); // 没有传入默认值0v.setAmount(0);v.setPname(pname);v.setFlag("opd"); // 设定标记 --- order表k.set(pid);// 4.写出context.write(k,v);}}
}
返回顶部
Reducer阶段
package 第三章_MR框架原理.多种join应用;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;public class TableReduce extends Reducer<Text,TableBean,TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {// 1.存储所有订单的集合 --- orderArrayList<TableBean> orderBeans = new ArrayList<>();// 2.存储产品信息 --- pdTableBean pdBean = new TableBean();// 3.遍历存储for (TableBean value:values){ // 循环TableBean对象if ("order".equals(value.getFlag())){ // 读取标记,判定为订单表// 创建临时TableBean对象TableBean temp = new TableBean();try {// 将当前循环到的对象内容拷贝到临时TableBean对象BeanUtils.copyProperties(temp,value);// 将其添加到集合中orderBeans.add(temp);} catch (Exception e){e.printStackTrace();}}else{ // 读取标记,判定为产品表try {// 将当前循环到的对象内容拷贝到BeanUtils.copyProperties(pdBean,value);} catch (Exception e){e.printStackTrace();}}}// 4.将设置对应的产品名称,写出for (TableBean tableBean:orderBeans){tableBean.setPname(pdBean.getPname());context.write(tableBean,NullWritable.get());}}
}
这里说明一下,为什么一个是集合,一个是对象:
- 相同的key可以同时进入到同一个reduce()中,在MapTask的时候对key进行了默认排序
- 如下图所示,一个reduce中只能获取01部分(或02或03)的数据集
- 而对应的每个部分中的order表数据不止一条,但是pd表数据只有一条,所以分别用集合、对象来存储。
返回顶部
Driver阶段
package 第三章_MR框架原理.多种join应用;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TableDriver {public static void main(String[] args) {Job job = null;Configuration conf = new Configuration();try{// 1.获取jobjob = Job.getInstance(conf);// 2.配置job.setMapperClass(TableMap.class);job.setReducerClass(TableReduce.class);job.setJarByClass(TableDriver.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);// 设置输入输出路径FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\多种join应用\\data"));FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\多种join应用\\output"));// 提交jobboolean result = job.waitForCompletion(true);System.exit(result? 0:1);} catch (Exception e){e.printStackTrace();}}
}
返回顶部
☠ 总结
返回顶部
【MapReduce】基础案例 ---- Reduce Join 实现数据合并(表连接)相关推荐
- Python数据分析【第9天】| DataFrame的属性编码、数据合并和连接(get_dummies,merge,join,concat)
系列文章目录 第1天:读入数据 第2天:read().readline()与readlines() 第3天:进度条(tqdm模块) 第4天:命令行传参(argparse模块) 第5天:读.写json文 ...
- Mysql基础篇(1)—— 基础概念、DML基本语法和表连接
前言 Mysql基础篇相关的内容是看了康师傅的视频做的笔记吧 数据库相关概念 DB: 数据库(Database) 存储数据的仓库,本质是一个文件系统.它保存了一系列有组织的数据. DBMS:数据库 ...
- R语言数据合并与连接技巧
最近仍然在陆陆续续自学,真.生命不息学习不止,这次和大家分享一些实用的数据处理技巧,干货满满! 一.数据合并 涉及函数cbind(),rbind(),bind_rows(),bind_cols(). ...
- delete表1条件是另一个表中的数据,多表连接删除(转)
DELETE删除多表数据,怎样才能同时删除多个关联表的数据呢?这里做了深入的解释: 1. delete from t1 where 条件 2.delete t1 from t1 where 条件 3. ...
- delete表1条件是另一个表中的数据,多表连接删除
2019独角兽企业重金招聘Python工程师标准>>> 数据库中有两张表. DELETE cdb_posts,cdb_threads FROM cdb_posts ,cdb_thre ...
- Reduce Join介绍及案例
Reduce Join介绍及案例 Reduce Join介绍 Reduce Join案例 需求 1.需求说明 2.文件 案例分析 1.需求分析 2.输入数据 3.期望输出数据 4.MapTask 5. ...
- Pandas简明教程:九、表的合并、连接、拼接(数据聚合基础)
真实场景中常会遇到多种信息放在不同的表里的情况,此时我们就需要将这些表格的信息整合到一起.这种操作可以极大地减轻我们手动粘数据的工作,从而达到事半功倍的效果. 由于本篇要举的例子较多,因此直接采用官网 ...
- 数据合并之concat、append、merge和join
Pandas 是一套用于 Python 的快速.高效的数据分析工具.它可以用于数据挖掘和数据分析,同时也提供数据清洗功能.本文将详细讲解数据合并与连接,目录如下: ① concat 一.定义 conc ...
- Pandas 中DataFrame 数据合并 Contract | Merge
最近在工作中,遇到了数据合并.连接的问题,故整理如下,供需要者参考~ 参考自:象在舞:https://blog.csdn.net/gdkyxy2013/article/details/80785361 ...
最新文章
- 读后感和机翻《他们在看哪里,为什么看?在复杂的任务中共同推断人类的注意力和意图》
- 计算机房 门,标准机房门的规格
- Java-Java5.0泛型解读
- python开发mbus程序_Python pywmbus包_程序模块 - PyPI - Python中文网
- ​马卡龙配色你好夏天PPT模板​
- java应用程序的执行起点是什么方法_Java应用程序的执行起点是____________方法。(3.0分)_学小易找答案...
- Eclipse如何打出war包
- 20.从0开始的微服务架构
- KALI安装软件链接不上源,数字签名软件保护,Kali Linux 更新源 数字签名无效处理
- Android WiFi only配置
- 计算机辅助项目管理实验论文,计算机辅助项目管理课程设计--毕设论文.doc
- 全栈开发-Python的介绍
- cath数据库fasta备注_sam's note
- iOS小技巧11-Xcode中相对路径和绝对路径的使用
- RT-Thread Studio 红外Infrared使用笔记
- (二)最新版Django项目数据库迁移;读取数据库增添删改;以及显示在html或vue前端(Django+Vue+Mysql,数据库管理数据分析网站)
- 评论360和腾讯--程苓峰--《战争的本质》
- 专科生学习云计算的就业前景如何?
- 安装问题:bokeh安装报错
- 大数据产业链和生态图谱