hadoop join之semi join
SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。此实例中,还是采用第一个实例中的数据,假如我们只过滤sex为1的user,并将key存于user_id文件中(注意:每行的数据一定要带上双引号啊),如下:
"ID"
"1"
"2"
"3"
"5"
"6"
"8"
"9"
完整代码如下,此实例中我们采用新的API来写:
public class SemiJoin extends Configured implements Tool
{public static class MapClass extends Mapper<LongWritable, Text, Text, Text>{// 用于缓存user_id文件中的数据private Set<String> userIds = new HashSet<String>();private Text key = new Text();private Text value = new Text();private String[] keyValue;// 此方法会在map方法执行之前执行@Overrideprotected void setup(Context context) throws IOException, InterruptedException{BufferedReader in = null;try{// 从当前作业中获取要缓存的文件Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());String userId = null;for (Path path : paths){if (path.toString().contains("user_id")){in = new BufferedReader(new FileReader(path.toString()));while (null != (userId = in.readLine())){userIds.add(userId);}}}}catch (IOException e){e.printStackTrace();}finally{try{if(in != null){in.close(); }}catch (IOException e){e.printStackTrace();}}}public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{// 在map阶段过滤掉不需要的数据this.keyValue = value.toString().split(",");if(userIds.contains(keyValue[0])){this.key.set(keyValue[0]);this.value.set(keyValue[1]);context.write(this.key, this.value);}}}public static class Reduce extends Reducer<Text, Text, Text, Text>{private Text value = new Text();private StringBuilder sb;public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{sb = new StringBuilder();for(Text val : values){sb.append(val.toString());sb.append(",");}this.value.set(sb.deleteCharAt(sb.length()-1).toString());context.write(key, this.value);}}public int run(String[] args) throws Exception{Job job = new Job(getConf(), "SemiJoin");job.setJobName("SemiJoin");job.setJarByClass(SemiJoin.class);job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();// 我们把第一个参数的地址作为要缓存的文件路径DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());FileInputFormat.addInputPath(job, new Path(otherArgs[1]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception{int res = ToolRunner.run(new Configuration(), new SemiJoin(), args);System.exit(res);}}
转发:https://blog.csdn.net/huashetianzu/article/details/7823326
hadoop join之semi join相关推荐
- HiveSqlSparkSql —— 使用left semi join做in、exists类型子查询优化
LEFT SEMI JOIN(左半连接)介绍 SEMI JOIN (即等价于LEFT SEMI JOIN)最主要的使用场景就是解决EXISTS IN.LEFT SEMI JOIN(左半连接)是 IN/ ...
- Hive中的map join、left semi join和sort merge bucket join
map join map join是将join双方比较小的表直接分发到各个 map进程的内存中,在map进程中进行join操作,这样就不用进行reduce步骤,从而提高了速度. 如果不指定mapjoi ...
- Hive中的in、exists和left semi join
在hive sql开发的过程中,对于当前数据在另一个数据集合中,是否存在的判断有三种方式,一种是in ,一种是exists,另一种可以是left semi join,但是由于hive不支持in|not ...
- Spark源码阅读(五) --- Spark的支持的join方式以及join策略
版本变动 2021-08-30 增加了对Broadcast Hash Join小表大小的评估内容 增加了对Sort Merge Join优于Shuffle Hash Join调用的解释 目录 Spar ...
- 【Hive】left semi join(exists、in)和 left join 区别
left semi join(exists.in)和 left join 区别 left semi join 基本认识 对比 执行计划 小结 left semi join 基本认识 LEFT SEMI ...
- mysql semi join_MySQL 通过semi join 优化子查询
半连接是MySQL 5.6.5引入的,多在子查询exists中使用,对外部row source的每个键值,查找到内部row source匹配的第一个键值后就返回,如果找到就不用再查找内部row sou ...
- 转载:left join和left semi join的联系和区别
1.联系 他们都是 hive join 方式的一种,join on 属于 common join(shuffle join/reduce join),而 left semi join 则属于 map ...
- join left semi_HIVE--left semi join
实验: hive> select * from b1; OK 1 003 20170511 1 004 20170512 1 005 ...
- oracle+semijoin,Semi join 与anti join
1.semi join Oracle在处理exists或in的时候,会使用semi join的连接方式: sys@EBANK>select object_name,object_type fro ...
最新文章
- MVC开发Markdown编辑器(1)
- VS2017常用、好用的快捷键
- MySQL中使用LIMIT进行分页的方法
- 进程、后台进程以及信号
- 错误: 在类中找不到 main 方法, 请将 main 方法定义为:public static void main(String[] args)否则 JavaFX 应用程序类必须扩展javafx.ap
- 一直使用管理员权限打开PowerShell
- linux默认安装数据库密码是多少,Linux服务器上安装MySql数据库(默认安装,密码为空),首次使用需要修改密码(示例代码)...
- MTK 驱动(58)---MTK G-sensor
- 计算机二级-C语言-程序设计题-190118记录-通过数组和指针两种方式对字符串进行处理。...
- mysql中 怎么插入反斜杠_MySQL中如何插入反斜杠,反斜杠被吃掉,反斜杠转义(转)...
- ie11下下载文件,文件名乱码的解决方法
- android 壁纸制作教程,[教程]怎样制作Android手机壁纸/桌面
- mysql动态表单设计与实现_动态表单的数据库结构设计
- PS4 不支持USB存储设备的文件系统 如何解决?
- 这特么是啥系列之----HSF求求你别秀了
- java获取b站动态列表地址,java获取B站弹幕文件的两种方案
- php计算日期差天数
- idea String报错问题
- Resid------set
- 第三讲 Matlab/Simulink入门——离散系统仿真实例
热门文章
- java学习(99):车站卖票问题
- php什么设置前端代码,代码编辑器与PHPSTUDY的安装与配置过程(前端第一课)
- [周末阅读]认知和规划,以及推荐几个入门教程Github
- mysql5.7.20中文,ubuntu16.04 mysql5.7.20表中插入中文顯示???的解決方法
- mysql临时关闭索引功能_MYSQL中常用的强制性操作(例如强制索引)
- 罗斯蒙特电磁流量计8723说明书_罗斯蒙特8732E电磁流量计故障原因及解决办法!...
- 奥特曼传奇英雄存档丢了怎么找回_热血传奇:道士最帅武器—玄天
- 辨析 const指针 和 指向常量的指针
- python对excel的操作
- [Luogu] P1939 【模板】矩阵加速(数列)