前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地。今天终于费了些功夫把整个流程走了一遭,期间经历了诸多麻烦并最终得以将其一一搞定,再次深切体会到,什么叫从计算模型到算法实现还有很多路要走。

数据准备

首先是准备好数据。这个倒已经是一个熟练的过程,所要做的是把示例数据准备好,记住路径和字段分隔符。
准备好下面两张表:
(1)m_ys_lab_jointest_a(以下简称表A)
建表语句为:
[sql] view plaincopyprint?
  1. create table if not exists m_ys_lab_jointest_a (
  2. id bigint,
  3. name string
  4. )
  5. row format delimited
  6. fields terminated by '9'
  7. lines terminated by '10'
  8. stored as textfile;
数据:
id     name
1     北京
2     天津
3     河北
4     山西
5     内蒙古
6     辽宁
7     吉林
8     黑龙江
(2)m_ys_lab_jointest_b(以下简称表B)
建表语句为:

[sql] view plaincopyprint?
  1. create table if not exists m_ys_lab_jointest_b (
  2. id bigint,
  3. statyear bigint,
  4. num bigint
  5. )
  6. row format delimited
  7. fields terminated by '9'
  8. lines terminated by '10'
  9. stored as textfile;

数据:

id     statyear     num
1     2010     1962
1     2011     2019
2     2010     1299
2     2011     1355
4     2010     3574
4     2011     3593
9     2010     2303
9     2011     2347

我们的目的是,以id为key做join操作,得到以下表:

m_ys_lab_jointest_ab
id     name    statyear     num
1       北京    2011    2019
1       北京    2010    1962
2       天津    2011    1355
2       天津    2010    1299
4       山西    2011    3593
4       山西    2010    3574

计算模型

整个计算过程是:
(1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。
(2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。
如下图所示:

代码

代码如下:

[java] view plaincopyprint?
  1. import java.io.IOException;
  2. import java.util.HashMap;
  3. import java.util.Iterator;
  4. import java.util.Vector;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.io.Writable;
  8. import org.apache.hadoop.mapred.FileSplit;
  9. import org.apache.hadoop.mapred.JobConf;
  10. import org.apache.hadoop.mapred.MapReduceBase;
  11. import org.apache.hadoop.mapred.Mapper;
  12. import org.apache.hadoop.mapred.OutputCollector;
  13. import org.apache.hadoop.mapred.RecordWriter;
  14. import org.apache.hadoop.mapred.Reducer;
  15. import org.apache.hadoop.mapred.Reporter;
  16. /**
  17. * MapReduce实现Join操作
  18. */
  19. public class MapRedJoin {
  20. public static final String DELIMITER = "\u0009"; // 字段分隔符
  21. // map过程
  22. public static class MapClass extends MapReduceBase implements
  23. Mapper<LongWritable, Text, Text, Text> {
  24. public void configure(JobConf job) {
  25. super.configure(job);
  26. }
  27. public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
  28. Reporter reporter) throws IOException, ClassCastException {
  29. // 获取输入文件的全路径和名称
  30. String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
  31. // 获取记录字符串
  32. String line = value.toString();
  33. // 抛弃空记录
  34. if (line == null || line.equals("")) return;
  35. // 处理来自表A的记录
  36. if (filePath.contains("m_ys_lab_jointest_a")) {
  37. String[] values = line.split(DELIMITER); // 按分隔符分割出字段
  38. if (values.length < 2) return;
  39. String id = values[0]; // id
  40. String name = values[1]; // name
  41. output.collect(new Text(id), new Text("a#"+name));
  42. }
  43. // 处理来自表B的记录
  44. else if (filePath.contains("m_ys_lab_jointest_b")) {
  45. String[] values = line.split(DELIMITER); // 按分隔符分割出字段
  46. if (values.length < 3) return;
  47. String id = values[0]; // id
  48. String statyear = values[1]; // statyear
  49. String num = values[2]; //num
  50. output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));
  51. }
  52. }
  53. }
  54. // reduce过程
  55. public static class Reduce extends MapReduceBase
  56. implements Reducer<Text, Text, Text, Text> {
  57. public void reduce(Text key, Iterator<Text> values,
  58. OutputCollector<Text, Text> output, Reporter reporter)
  59. throws IOException {
  60. Vector<String> vecA = new Vector<String>(); // 存放来自表A的值
  61. Vector<String> vecB = new Vector<String>(); // 存放来自表B的值
  62. while (values.hasNext()) {
  63. String value = values.next().toString();
  64. if (value.startsWith("a#")) {
  65. vecA.add(value.substring(2));
  66. } else if (value.startsWith("b#")) {
  67. vecB.add(value.substring(2));
  68. }
  69. }
  70. int sizeA = vecA.size();
  71. int sizeB = vecB.size();
  72. // 遍历两个向量
  73. int i, j;
  74. for (i = 0; i < sizeA; i ++) {
  75. for (j = 0; j < sizeB; j ++) {
  76. output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));
  77. }
  78. }
  79. }
  80. }
  81. protected void configJob(JobConf conf) {
  82. conf.setMapOutputKeyClass(Text.class);
  83. conf.setMapOutputValueClass(Text.class);
  84. conf.setOutputKeyClass(Text.class);
  85. conf.setOutputValueClass(Text.class);
  86. conf.setOutputFormat(ReportOutFormat.class);
  87. }
  88. }

技术细节

下面说一下其中的若干技术细节:
(1)由于输入数据涉及两张表,我们需要判断当前处理的记录是来自表A还是来自表B。Reporter类getInputSplit()方法可以获取输入数据的路径,具体代码如下:
String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();

(2)map的输出的结果,同id的所有记录(不管来自表A还是表B)都在同一个key下保存在同一个列表中,在reduce阶段需要将其拆开,保存为相当于笛卡尔积的m x n条记录。由于事先不知道m、n是多少,这里使用了两个向量(可增长数组)来分别保存来自表A和表B的记录,再用一个两层嵌套循环组织出我们需要的最终结果。
(3)在MapReduce中可以使用System.out.println()方法输出,以方便调试。不过System.out.println()的内容不会在终端显示,而是输出到了stdout和stderr这两个文件中,这两个文件位于logs/userlogs/attempt_xxx目录下。可以通过web端的历史job查看中的“Analyse This Job”来查看stdout和stderr的内容。

MapReduce实现join操作相关推荐

  1. 使用MapReduce实现join操作

    2019独角兽企业重金招聘Python工程师标准>>> 在关系型数据库中,要实现join操作是非常方便的,通过sql定义的join原语就可以实现.在hdfs存储的海量数据中,要实现j ...

  2. MapReduce之join操作

    一  前言 在很多时候,我们可能需要处理的不是一个单独的文件,而是几个有关联的文件,比如账户信息和订单信息=> 账户信息:customerIdname address telephone 订单信 ...

  3. [MapReduce_add_4] MapReduce 的 join 操作

    0. 说明 Map 端 join && Reduce 端 join 1. Map 端 join Map 端 join:大表+小表 => 将小表加入到内存,迭代大表每一行,与之进行 ...

  4. MapReduce之Map join操作

    MapReduce之Map join操作(分布式缓存) 文章目录 MapReduce之Map join操作(分布式缓存) 案例结合 利用MapReduce中的setup方法与DistributedCa ...

  5. 5、HIVE DML操作、load数据、update、Delete、Merge、where语句、基于分区的查询、HAVING子句、LIMIT子句、Group By语法、Hive 的Join操作等

    目录: 4.2.1 Load文件数据到表中 4.2.2查询的数据插入到表中 4.2.3将Hive查询的结果存到本地Linux的文件系统目录中 4.2.4通过SQL语句的方式插入数据 4.2.5 UPD ...

  6. Hive是如何让MapReduce实现SQL操作的?

    learn from 从0开始学大数据(极客时间) 1. MapReduce 实现 SQL 的原理 SELECT pageid, age, count(1) FROM pv_users GROUP B ...

  7. Flink学习笔记:Operators之CoGroup及Join操作

    本文为<Flink大数据项目实战>学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程: Flink大数据项目实战:http://t.cn/EJtKhaz ...

  8. shell中join链接多个域_shell 如何实现两个表的join操作

    shell 如何实现两个表的join操作 今天研究的一个问题是:在Shell 脚本中如何实现两个表的 join 操作,这里说的两个表示的其实是 两个文件,但是文件是列表的形式,有固定的分割符号,即就相 ...

  9. 离线轻量级大数据平台Spark之JavaRDD关联join操作

    对两个RDD进行关联操作,如: 1)文件post_data.txt包含:post_id\title\content 2)文件train.txt包含:dev_id\post_id\praise\time ...

最新文章

  1. 高效管理 GitHub Star,用这几个插件就能做到!
  2. 在Eclipse中运行hadoop程序
  3. linux redis安装报错,Linux安装Redis实现过程及报错解决方案
  4. 验证gpu版pytorch是否可用
  5. C#中实现空间的登录加密密码脚本里的方法
  6. Linux下Mail命令收集
  7. django-xadmin出现Models aren't loaded yet错误
  8. 设计模式是什么鬼(原型)
  9. LINUX如何让内存FREE变大,(转)Linux中显示空闲内存空间的free命令的基本用法
  10. 2011年度IT博客大赛 “博”乐大行动(已结束)
  11. qq拼音输入法下载|qq拼音输入法纯净版下载
  12. MTK平台Camera驱动流程分析
  13. 平面坐标转大地坐标(经纬度)
  14. python 3des加密_python3使用3des加密
  15. 九章算术 五:《商功》
  16. 【Python】python中[-1]、[:-1]、[::-1]、[n::-1]使用方法;random.choice()
  17. (原创)巧用通道作颜色网
  18. 数组的下标为什么从0开始而不是从1开始
  19. 【前端应该掌握的一些知识点】
  20. 用Cocos Creator 模拟书本翻页效果

热门文章

  1. box-sizing详解
  2. java.lang.ClassNotFoundException: org.springframework.web.servlet.DispatcherServlet
  3. 学计算机对显卡要求大吗,【5人回答】学AE的电脑配置要求高吗?需要什么样的电脑配置-3D溜溜网...
  4. 一元流量显示服务器繁忙,流量充不进去老退款
  5. mysql b-a全局索引_MySQL中B+树索引的使用
  6. andriod 多个Activity之间共享数据
  7. 深度学习和目标检测系列教程 16-300:通过全球小麦数据集训练第一个yolov5模型
  8. 7.04 -2018-长沙机场笔试总结
  9. 黑马Go语言与区块链学习笔记
  10. 正视长尾挑战!颜水成、冯佳时团队发布首篇《深度长尾学习》综述