项目github地址:bitcarmanlee easy-algorithm-interview-and-practice
欢迎大家star,留言,一起学习进步

在MR中,类似于join类的操作非常常见。在关系型数据库中,join就是最强大的功能之一。在hive中,jion操作也十分常见。现在,本博主就手把手教会大家怎么在MR中实现join操作。为了方便起见,本文就以left join为视角来实现。

1.数据准备

关于什么是join,什么是left join,本文就先不讨论了。先准备如下数据:

cat employee.txt
jd,david
jd,mike
tb,mike
tb,lucifer
elong,xiaoming
elong,ali
tengxun,xiaoming
tengxun,lilei
xxx,aaa
cat salary.txt
jd,1600
tb,1800
elong,2000
tengxun,2200

然后将两个文件分别put到hdfs上面

hadoop fs -put employee.txt /tmp/wanglei/employee/employee.txt
hadoop fs -put salary.txt /tmp/wanglei/salary/salary.txt

我们想用employee左关联salary。至此,数据准备已经完毕。

2.新建一个maven项目

新建一个maven项目,pom文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>leilei.bit.edu</groupId><artifactId>testjoin</artifactId><version>1.0</version><packaging>jar</packaging><name>testjoin</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><jarfile.name>testjoin</jarfile.name><jar.out.dir>jar</jar.out.dir></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.apache.mrunit</groupId><artifactId>mrunit</artifactId><version>1.0.0</version><classifier>hadoop2</classifier></dependency><dependency><groupId>org.anarres.lzo</groupId><artifactId>lzo-hadoop</artifactId><version>1.0.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.4</version><configuration><finalName>${jarfile.name}</finalName><outputDirectory>${jar.out.dir}</outputDirectory></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.6</source><target>1.6</target><encoding>UTF-8</encoding><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin></plugins></build></project>

然后开始实现left join的逻辑:

package leilei.bit.edu.testjoin;import java.io.IOException;
import java.util.Vector;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class LeftJoin extends Configured implements Tool{public static final String DELIMITER = ",";public static class LeftJoinMapper extendsMapper<LongWritable,Text,Text,Text> {protected void map(LongWritable key, Text value, Context context)throws IOException,InterruptedException {/** 拿到两个不同文件,区分出到底是哪个文件,然后分别输出*/String filepath = ((FileSplit)context.getInputSplit()).getPath().toString();String line = value.toString();if (line == null || line.equals("")) return; if (filepath.indexOf("employee") != -1) {String[] lines = line.split(DELIMITER);if(lines.length < 2) return;String company_id = lines[0];String employee = lines[1];context.write(new Text(company_id),new Text("a:"+employee));}else if(filepath.indexOf("salary") != -1) {String[] lines = line.split(DELIMITER);if(lines.length < 2) return;String company_id = lines[0];String salary = lines[1];context.write(new Text(company_id), new Text("b:" + salary));}}}public static class LeftJoinReduce extends Reducer<Text, Text, Text, Text> {protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException{Vector<String> vecA = new Vector<String>();Vector<String> vecB = new Vector<String>();for(Text each_val:values) {String each = each_val.toString();if(each.startsWith("a:")) {vecA.add(each.substring(2));} else if(each.startsWith("b:")) {vecB.add(each.substring(2));}}for (int i = 0; i < vecA.size(); i++) {/** 如果vecB为空的话,将A里的输出,B的位置补null。*/if (vecB.size() == 0) {context.write(key, new Text(vecA.get(i) + DELIMITER + "null"));} else {for (int j = 0; j < vecB.size(); j++) {context.write(key, new Text(vecA.get(i) + DELIMITER + vecB.get(j)));}}}}}public int run(String[] args) throws Exception {Configuration conf = getConf();GenericOptionsParser optionparser = new GenericOptionsParser(conf, args);conf = optionparser.getConfiguration();Job job = new Job(conf,"leftjoin");job.setJarByClass(LeftJoin.class);FileInputFormat.addInputPaths(job, conf.get("input_dir"));Path out = new Path(conf.get("output_dir"));FileOutputFormat.setOutputPath(job, out);job.setNumReduceTasks(conf.getInt("reduce_num",2));job.setMapperClass(LeftJoinMapper.class);job.setReducerClass(LeftJoinReduce.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);conf.set("mapred.textoutputformat.separator", ",");return (job.waitForCompletion(true) ? 0 : 1);}public static void main(String[] args) throws Exception{int res = ToolRunner.run(new Configuration(),new LeftJoin(),args);System.exit(res);}}

稍微说一下处理逻辑:
1.map阶段,把所有输入拆分为k,v形式。其中k是company_id,即我们要关联的字段。如果输入是employee相关的文件,那么map阶段的value加上标识符"a",表示是employee的输出。对于salary文件,加上标识符"b"。
2.reduce阶段,将每个k下的value列表拆分为分别来自employee和salary的两部分,然后双层循环做笛卡尔积即可。
3.注意的是,因为是left join,所以在reduce阶段,如果employee对应的company_id有,而salary没有,注意要输出此部分数据。

3.打包

将上面的项目用maven打包,并上传到服务器上。

4.使用shell脚本run起来

在服务器上写一个最简单的shell脚本,将代码run起来:

vim run_join.sh#!/bin/bashoutput=/tmp/wanglei/leftjoinif hadoop fs -test -d $output
thenhadoop fs -rm -r $output
fihadoop jar testjoin.jar leilei.bit.edu.testjoin.LeftJoin \-Dinput_dir=/tmp/wanglei/employee,/tmp/wanglei/salary \-Doutput_dir=$output \-Dmapred.textoutputformat.separator=","

执行此shell脚本:

./run_join.sh

5.最终输出

等job跑完以后,查看最终的输出结果:

hadoop fs -cat /tmp/wanglei/leftjoin/*
elong,ali,2000
elong,xiaoming,2000
tengxun,lilei,2200
tengxun,xiaoming,2200
jd,mike,1600
jd,david,1600
tb,lucifer,1800
tb,mike,1800
xxx,aaa,null

最终的输出结果,准确无误!至此,我们用mr完美实现了left join的功能!

hadoop 用MR实现join操作相关推荐

  1. 使用MapReduce实现join操作

    2019独角兽企业重金招聘Python工程师标准>>> 在关系型数据库中,要实现join操作是非常方便的,通过sql定义的join原语就可以实现.在hdfs存储的海量数据中,要实现j ...

  2. Spark+hadoop+mllib及相关概念与操作笔记

    Spark+hadoop+mllib及相关概念与操作笔记 作者: lw 版本: 0.1 时间: 2016-07-18 1.调研相关注意事项 a) 理解调研 调研的意义在于了解当前情况,挖掘潜在的问题, ...

  3. MapReduce实现join操作

    前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地.今天终于费了些功夫把整个流程走了一遭,期间经历了诸多麻烦并最终得以将其一一搞定,再次深切体会到,什么叫从计算模型到 ...

  4. 5、HIVE DML操作、load数据、update、Delete、Merge、where语句、基于分区的查询、HAVING子句、LIMIT子句、Group By语法、Hive 的Join操作等

    目录: 4.2.1 Load文件数据到表中 4.2.2查询的数据插入到表中 4.2.3将Hive查询的结果存到本地Linux的文件系统目录中 4.2.4通过SQL语句的方式插入数据 4.2.5 UPD ...

  5. MapReduce之join操作

    一  前言 在很多时候,我们可能需要处理的不是一个单独的文件,而是几个有关联的文件,比如账户信息和订单信息=> 账户信息:customerIdname address telephone 订单信 ...

  6. MapReduce之Map join操作

    MapReduce之Map join操作(分布式缓存) 文章目录 MapReduce之Map join操作(分布式缓存) 案例结合 利用MapReduce中的setup方法与DistributedCa ...

  7. hadoop之mr案例

    mr案例 (1)创建maven项目 (2)在po,.xml添加下面代码 <dependencies><dependency><groupId>junit</g ...

  8. Flink学习笔记:Operators之CoGroup及Join操作

    本文为<Flink大数据项目实战>学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程: Flink大数据项目实战:http://t.cn/EJtKhaz ...

  9. shell中join链接多个域_shell 如何实现两个表的join操作

    shell 如何实现两个表的join操作 今天研究的一个问题是:在Shell 脚本中如何实现两个表的 join 操作,这里说的两个表示的其实是 两个文件,但是文件是列表的形式,有固定的分割符号,即就相 ...

  10. 离线轻量级大数据平台Spark之JavaRDD关联join操作

    对两个RDD进行关联操作,如: 1)文件post_data.txt包含:post_id\title\content 2)文件train.txt包含:dev_id\post_id\praise\time ...

最新文章

  1. 刷爆技术圈的《知识图谱》终于补货了,最后 968 份,低至 2 折,抢完不补!...
  2. mysql top limit_MySQL中如何实现select top n ----Limit
  3. 小分子溶液当硬盘!布朗大学逆天研究:用代谢分子存储照片,准确率达99%
  4. Spring boot请求拦截
  5. 4、JVM垃圾回收机制、新生代的GC、GC(Minor GC、FullGC)、GC日志、JVM参数选项、元空间(笔记)
  6. HDU - 4248 A Famous Stone Collector(dp+组合数学)
  7. TCL:花开刹那还是浴火重生
  8. Intent.ACTION_MAIN
  9. springcloud云服务架构-HongHu commonservice-eureka项目构建过程
  10. 网络基础---IP编址
  11. 学习python时间安排_Python时间和日期学习
  12. 最新Hadoop环境搭建流程
  13. 天邑TY1608_S905L3B_支持RTL8822CS、MT7668_线刷/卡刷_刷机固件包
  14. 关于“Fatal signal 11 (SIGSEGV) at 0x00000004 (code=1), thread 7592 (xample.hellojni)”android NDK错误排查
  15. 单细胞三大R包之Seurat
  16. R可视乎|创建乐高版马赛克图
  17. 报表打印(rdlc)
  18. python调整图片亮度_python 调整图片亮度的示例
  19. 计算机一级必考知识点,计算机一级考试基础知识点汇总.doc
  20. gitlab cicd配置

热门文章

  1. 一篇就让你懂线程池原理
  2. Intel Optane P4800X评测(序):不用缓存和电容保护的SSD?
  3. CentOS 6.6安装配置LAMP服务器(Apache+PHP5+MySQL)
  4. internet与Internet的区别
  5. 菜鸟从零学习数据库(三)——存储过程
  6. Idea中找不到Java Application 的xml配置文件问题研究
  7. hashmap在jdk7和8线程不安全的区别
  8. 正则表达式功能以及应用
  9. 深度学习在图像分类中的应用
  10. Rust 1.34替代Cargo注册表 与crates.io共存