需求:

利用MapReduce程序,实现SQL语句中的join关联查询。

订单数据表order:

id

date

pid

amount

1001

20150710

P0001

2

1002

20150710

P0001

3

1002

20150710

P0002

3

1003

20150710

P0003

4

商品信息表product:

pid

pname

category_id

price

P0001

小米6

1000

2499

P0002

锤子T3

1001

2500

P0003

三星S8

1002

6999

假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算:

select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

分析:

通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联。

实现:

首先,我们将表中的数据转换成我们需要的格式:

order.txt:

1001,20150710,P0001,2

1002,20150710,P0001,3

1002,20150710,P0002,3

1003,20150710,P0003,4

product.txt:

P0001,小米6,1000,2499P0002,锤子T3,1001,2500P0003,三星S8,1002,6999

并且导入到HDFS的/join/srcdata目录下面。

因为我们有两种格式的文件,所以在map阶段需要根据文件名进行一下判断,不同的文案进行不同的处理。同理,在reduce阶段我们也要针对同一key(pid)的不同种类数据进行判断,是通过判断id是否为空字符串进行判断的。

InfoBean.java:

packagecom.darrenchan.mr.bean;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;importorg.apache.hadoop.io.Writable;/*** id date pid amount pname category_id price

*

*@authorchenchi

**/

public class InfoBean implementsWritable {private String id;//订单id

privateString date;private String pid;//产品id

privateString amount;privateString pname;privateString category_id;privateString price;publicInfoBean() {

}publicInfoBean(String id, String date, String pid, String amount, String pname, String category_id, String price) {super();this.id =id;this.date =date;this.pid =pid;this.amount =amount;this.pname =pname;this.category_id =category_id;this.price =price;

}publicString getId() {returnid;

}public voidsetId(String id) {this.id =id;

}publicString getDate() {returndate;

}public voidsetDate(String date) {this.date =date;

}publicString getPid() {returnpid;

}public voidsetPid(String pid) {this.pid =pid;

}publicString getAmount() {returnamount;

}public voidsetAmount(String amount) {this.amount =amount;

}publicString getPname() {returnpname;

}public voidsetPname(String pname) {this.pname =pname;

}publicString getCategory_id() {returncategory_id;

}public voidsetCategory_id(String category_id) {this.category_id =category_id;

}publicString getPrice() {returnprice;

}public voidsetPrice(String price) {this.price =price;

}

@OverridepublicString toString() {return "InfoBean [id=" + id + ", date=" + date + ", pid=" + pid + ", amount=" + amount + ", pname=" +pname+ ", category_id=" + category_id + ", price=" + price + "]";

}/*** id date pid amount pname category_id price*/@Overridepublic void readFields(DataInput in) throwsIOException {

id=in.readUTF();

date=in.readUTF();

pid=in.readUTF();

amount=in.readUTF();

pname=in.readUTF();

category_id=in.readUTF();

price=in.readUTF();

}

@Overridepublic void write(DataOutput out) throwsIOException {

out.writeUTF(id);

out.writeUTF(date);

out.writeUTF(pid);

out.writeUTF(amount);

out.writeUTF(pname);

out.writeUTF(category_id);

out.writeUTF(price);

}

}

Join.java:

packagecom.darrenchan.mr.join;importjava.io.IOException;importjava.lang.reflect.InvocationTargetException;importjava.util.ArrayList;importjava.util.List;importorg.apache.commons.beanutils.BeanUtils;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.FileSplit;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importcom.darrenchan.mr.bean.InfoBean;public classJoin {/*** Mapper类

*@authorchenchi

**/

public static class JoinMapper extends Mapper{//提前在这里new一个对象,剩下的就是改变它的值,不至于在map方法中创建出大量的InfoBean对象

InfoBean infoBean = newInfoBean();

Text text= new Text();//理由同上

@Overrideprotected voidmap(LongWritable key, Text value, Context context)throwsIOException, InterruptedException {//首先,要判断文件名称,读的是订单数据还是商品数据

FileSplit inputSplit =(FileSplit) context.getInputSplit();

String name= inputSplit.getPath().getName();//文件名称

if(name.startsWith("order")){//来自订单数据

String line =value.toString();

String[] fields= line.split(",");

String id= fields[0];

String date= fields[1];

String pid= fields[2];

String amount= fields[3];

infoBean.setId(id);

infoBean.setDate(date);

infoBean.setPid(pid);

infoBean.setAmount(amount);//对于订单数据来说,后面三个属性都置为""//之所以不置为null,是因为其要进行序列化和反序列化

infoBean.setPname("");

infoBean.setCategory_id("");

infoBean.setPrice("");

text.set(pid);

context.write(text, infoBean);

}else{//来自商品数据

String line =value.toString();

String[] fields= line.split(",");

String pid= fields[0];

String pname= fields[1];

String category_id= fields[2];

String price= fields[3];

infoBean.setPname(pname);

infoBean.setCategory_id(category_id);

infoBean.setPrice(price);

infoBean.setPid(pid);//对于订单数据来说,后面三个属性都置为""//之所以不置为null,是因为其要进行序列化和反序列化

infoBean.setId("");

infoBean.setDate("");

infoBean.setAmount("");

text.set(pid);

context.write(text, infoBean);

}

}

}public static class JoinReducer extends Reducer{//订单数据中一个pid会有多条数据//商品数据中一个pid只有一条

@Overrideprotected void reduce(Text key, Iterable values, Context context) throwsIOException, InterruptedException {

List list = new ArrayList();//存储订单数据中的多条

InfoBean info = new InfoBean();//存储商品数据中的一条

for(InfoBean infoBean : values) {if(!"".equals(infoBean.getId())){//来自订单数据

InfoBean infoBean2 = newInfoBean();try{

BeanUtils.copyProperties(infoBean2, infoBean);

}catch(Exception e) {

e.printStackTrace();

}

list.add(infoBean2);

}else{//来自商品数据

try{

BeanUtils.copyProperties(info, infoBean);

}catch (IllegalAccessException |InvocationTargetException e) {

e.printStackTrace();

}

}

}for(InfoBean infoBean : list) {

infoBean.setPname(info.getPname());

infoBean.setCategory_id(info.getCategory_id());

infoBean.setPrice(info.getPrice());

context.write(infoBean, NullWritable.get());

}

}

}public static void main(String[] args) throwsException {

Configuration conf= newConfiguration();

Job job=Job.getInstance(conf);

job.setJarByClass(Join.class);

job.setMapperClass(JoinMapper.class);

job.setReducerClass(JoinReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(InfoBean.class);

job.setOutputKeyClass(InfoBean.class);

job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

注:这里有一个地方需要注意,就是reduce方法的Iterable values,一定要new 新对象,不能直接赋值,因为迭代器的内容在不断变化。

执行指令:hadoop jar mywc.jar cn.darrenchan.hadoop.mr.wordcount.WCRunner /wc/src /wc/output

运行效果:

但是呢?这种方式是有缺陷的,什么缺陷呢?

这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。什么叫数据倾斜呢?比如在中国买小米6的人特别多,三星S8的人特别少,汇总的时候,当汇总小米6的pid的时候就运算压力特别大,而S8的pid的时候运算压力就特别小,显然负载不均衡。

那么我们应该用什么方法进行解决呢?就是map端join实现方式了。

我们将业务操作移到了map端,reduce甚至可以不用了,因为商品表一般内容不多,所以我们可以提前加载到内存中,运行map方法的时候直接查找即可,利用了MapReduce的分布式缓存。

代码如下:

packagecom.darrenchan.mr.mapedjoin;importjava.io.BufferedReader;importjava.io.File;importjava.io.FileInputStream;importjava.io.IOException;importjava.io.InputStreamReader;importjava.net.URI;importjava.net.URISyntaxException;importjava.util.HashMap;importjava.util.Map;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FSDataInputStream;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.FileSplit;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importcom.darrenchan.mr.bean.InfoBean;public classMapedJoin {public static class MapedJoinMapper extends Mapper{//用一个map来存储商品信息表

private Map map = new HashMap<>();//提前在这里new一个对象,剩下的就是改变它的值,不至于在map方法中创建出大量的InfoBean对象

InfoBean infoBean = newInfoBean();

@Overrideprotected void setup(Context context) throwsIOException, InterruptedException {//因为已经加载到本地目录了,所以可以本地读取

FileInputStream inputStream = new FileInputStream(new File("product.txt"));

InputStreamReader isr= newInputStreamReader(inputStream);

BufferedReader br= newBufferedReader(isr);

String line= null;while ((line = br.readLine()) != null) {

String[] fields= line.split(",");

map.put(fields[0], line);

}

br.close();

}

@Overrideprotected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {//判断文件类型,就不用读取商品数据了

FileSplit inputSplit =(FileSplit) context.getInputSplit();

String name=inputSplit.getPath().getName();if (name.startsWith("order")) {

String line=value.toString();

String[] fields= line.split(",");

String id= fields[0];

String date= fields[1];

String pid= fields[2];

String amount= fields[3];

infoBean.setId(id);

infoBean.setDate(date);

infoBean.setPid(pid);

infoBean.setAmount(amount);

String product=map.get(pid);

String[] splits= product.split(",");

String pname= splits[1];

String category_id= splits[2];

String price= splits[3];

infoBean.setPname(pname);

infoBean.setCategory_id(category_id);

infoBean.setPrice(price);

context.write(infoBean, NullWritable.get());

}

}

}public static void main(String[] args) throwsException {

Configuration conf= newConfiguration();

Job job=Job.getInstance(conf);

job.setJarByClass(MapedJoin.class);

job.setMapperClass(MapedJoinMapper.class);

job.setMapOutputKeyClass(InfoBean.class);

job.setMapOutputValueClass(NullWritable.class);//map端join的逻辑不需要reduce阶段,设置reducetask数量为0//因为即便不写reduce,它也默认启动一个reduce

job.setNumReduceTasks(0);//指定需要缓存一个文件到所有的maptask运行节点工作目录

/*job.addArchiveToClassPath(archive);*///缓存jar包到task运行节点的classpath中

/*job.addFileToClassPath(file);*///缓存普通文件到task运行节点的classpath中

/*job.addCacheArchive(uri);*///缓存压缩包文件到task运行节点的工作目录

/*job.addCacheFile(uri)*///缓存普通文件到task运行节点的工作目录//将产品表文件缓存到task工作节点的工作目录中去//就可以直接本地读取了

job.addCacheFile(new URI("/join/srcdata/product.txt"));

FileInputFormat.setInputPaths(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));boolean b = job.waitForCompletion(true);

System.exit(b? 0 : 1);

}

}

结果同上。

java关联查询实战_MapReduce实战(五)实现关联查询相关推荐

  1. mssql 将查询结果作为表名参数_MySQL·查询(一)

    最近把时间都放在了回顾SQL语言上,因为能够按照需求在数据库中提取数据是非常重要的,太久没有去碰SQL,所以就花了一段时间去重新总结关于MySQL中的查询,希望下面简单的总结可以帮助大家理解SQL的查 ...

  2. Java 日志从入门到实战

    日志和异常处理结合得当的话,会给项目维护带来非常大的价值. **日志:**就是介绍一个过程和经历的详细记录. **项目日志:**就是项目开发过程的详细记录,一般由项目经理记录. **代码里的日志:** ...

  3. SAS数据挖掘实战篇【五】

    SAS数据挖掘实战篇[五] SAS--预测模型 6.1 测模型介绍 预测型(Prediction)是指由历史的和当前的数据产生的并能推测未来数据趋势的知识.这类知识可以被认为是以时 间为关键属性的关联 ...

  4. [菜鸟SpringCloud实战入门]第五章:熔断器Hystrix的使用 + 可视化监控Hystrix Dashboard和Turbine

    前言 欢迎来到菜鸟SpringCloud实战入门系列(SpringCloudForNoob),该系列通过层层递进的实战视角,来一步步学习和理解SpringCloud. 本系列适合有一定Java以及Sp ...

  5. 基于 abp vNext 和 .NET Core 开发博客项目 - Blazor 实战系列(五)

    基于 abp vNext 和 .NET Core 开发博客项目 - Blazor 实战系列(五) 转载于:https://github.com/Meowv/Blog 上一篇完成了分类标签友链的列表查询 ...

  6. 基于 abp vNext 和 .NET Core 开发博客项目 - 博客接口实战篇(五)

    基于 abp vNext 和 .NET Core 开发博客项目 - 博客接口实战篇(五) 转载于:https://github.com/Meowv/Blog 上篇文章完成了文章详情页数据查询和清除缓存 ...

  7. java窗体添加背景图片_Java项目实战之实战之天天酷跑(四):游戏主界面

    接上文,本文将实现游戏主界面,功能如下: 移动的背景图片.动态的玩家.玩家的移动功能.五种障碍物持续出现.玩家和障碍物的碰撞.暂停.继续功能. 首先,看一下整体效果: 动图实在太大,几秒钟的 Gif ...

  8. 面试突击 004 | 如何排查 Redis 中的慢查询?视频实战篇

    这是我的第 34 篇原创文章 作者 | 老王(javacn666) 1 面试题 如何排查 Redis 中的慢查询? 2 涉及相关问题 Redis 中有没有慢查询排查工具或者相关排查手段? 慢查询日志都 ...

  9. Kotlin实战案例:实现RecyclerView分页查询功能(仿照主流电商APP,可切换列表)

    n实战案例:带你实现RecyclerView分页查询功能(仿照主流电商APP,可切换列表和网格效果) 随着Kotlin的推广,一些国内公司的安卓项目开发,已经从Java完全切成Kotlin了.虽然Ko ...

最新文章

  1. java自学入门心得体会 0.1
  2. 大脑是怎样和身体交流的?
  3. 毕业5年,我是怎么成为年薪30W的运维工程师
  4. Ubantu16.4下fabric环境搭建
  5. 大规模markpoint特效
  6. 云图说|Git云上仓库哪家好?一张图了解华为云代码托管服务
  7. vmware安装找不到虚拟网卡解决方案
  8. MongoDB 的分片技术
  9. Windows7安装VC2015-2019_redist.x64提示“设置失败0xc8000222-未指定的错误”
  10. 主成分与因子分析异同_主成分分析和因子分析有什么区别?
  11. RNN学习:利用LSTM,GRU层解决航空公司评论数据预测问题
  12. 给MK802(USB大小的Android4.0小PC)引出串口信号,变成ARM开发版
  13. 零基础转行到IT,怎么选择适合的职业?
  14. 如何内置AdobeFlashPlayer.apk
  15. 大数据是普惠金融的未来!
  16. Linux进阶 | Docker部署nginx的web服务,VOLUME的使用详解,实现数据持久化!
  17. android 水波纹扩散动画,[Android]多层波纹扩散动画——自定义View绘制
  18. 英语词根、词根、前缀、后缀大全
  19. 端口号分类及其常用端口号
  20. 服务器虚拟成多人同时使用,同一台云服务器多人同时登录吗

热门文章

  1. nyoj 202红黑树 (搜索)
  2. 数据库的两种引擎Innodb和MyIASM
  3. Android开发中高效的数据结构用SparseArray代替HashMap
  4. select case语句 is和to用法
  5. SQL Server T-SQL高级查询
  6. C++ string清空并释放内存空间的两种方法(shrink_to_fit()、swap())
  7. 傅里叶变换函数FFT的使用方法
  8. Evaluation of Deep Learning Toolkits
  9. [云炬创业基础笔记] 第三章测试1
  10. [云炬python3玩转机器学习笔记] 3-2 Jupter Notebook魔法命令