2019独角兽企业重金招聘Python工程师标准>>>

hadoop datajoin 之reduce side join

hadoop提供了一个叫datajoin的jar包,用于解决两张表关联的问题。jar位于/hadoop/contrib/datajoin将jar包引入进行开发。

涉及的几个概念:

1.Data Source:基本与关系数据库中的表相似,形式为:(例子中为CSV格式)

      Customers                  Orders
      1,Stephanie Leung,555-555-5555      3,A,12.95,02-Jun-2008
      2,Edward Kim,123-456-7890         1,B,88.25,20-May-2008
      3,Jose Madriz,281-330-8004         2,C,32.00,30-Nov-2007
      4,David Stork,408-555-0000          3,D,25.02,22-Jan-2009

2.Tag:由于记录类型(Customers或Orders)与记录本身分离,标记一个Record会确保特殊元数据会一致存在于记录中。在这个目的下,我们将使用每个record自身的Data source名称标记每个record。

3.Group Key:Group Key类似于关系数据库中的链接键(join key),在我们的例子中,group key就是Customer ID(第一列的3)。由于datajoin包允许用户自定义group key,所以其较之关系数据库中的join key更一般、平常。

使用以下样例数据

customers-20140716

1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000

orders-20140716

3,A,12.95,02-Jun-2008
1,B,88.25,20-May-2008
2,C,32.00,30-Nov-2007
3,D,25.02,22-Jan-2009
请看流程图

第一部分,自定义的数据类型。数据类型主要包含两部分tag和data,tag是给数据打上的标签用来表示数据来源于哪个文件。data是数据记录。

上代码:

public class TaggedWritable extends TaggedMapOutput {private Text data;public TaggedWritable() {this.tag = new Text("");this.data = new Text("");}public TaggedWritable(Text data) {this.data = data;}@Overridepublic void readFields(DataInput in) throws IOException {this.tag.readFields(in);this.data.readFields(in);}@Overridepublic void write(DataOutput out) throws IOException {this.tag.write(out);this.data.write(out);}@Overridepublic Text getData() {return data;}
}

第二部分,map函数

public class Mapclass extends DataJoinMapperBase{@Overrideprotected Text generateGroupKey(TaggedMapOutput aRecord) {String line = aRecord.getData().toString();String[] tokens = line.split(",");String groupKey = tokens[0];return new Text(groupKey);}@Overrideprotected Text generateInputTag(String inputFile) {String datasource = inputFile.split("-")[0];return new Text(datasource);}@Overrideprotected TaggedWritable generateTaggedMapOutput(Object value) {TaggedWritable retv = new TaggedWritable(new Text(value.toString()));retv.setTag(this.inputTag);return retv;}
}

map函数中要特别注意protected TaggedWritable generateTaggedMapOutput(Object value) 该方法的返回类型为你第一步定义的类型。

第三部分,reduce

public class Reduce extends DataJoinReducerBase{@Overrideprotected TaggedMapOutput combine(Object[] tags, Object[] values) {if (tags.length < 2) {return null;    }String joinedStr = "";   for (int i=0; i<values.length; i++) {  if (i > 0){joinedStr += ","; }TaggedWritable tw = (TaggedWritable) values[i];  String line = tw.getData().toString();  System.out.println("line=========:"+line);String[] tokens = line.split(",", 2);  joinedStr += tokens[1];  }  TaggedWritable retv = new TaggedWritable(new Text(joinedStr));  retv.setTag((Text) tags[0]);   return retv;  }
}

reduce过程会将主键与数据组合在一起输出,你拼接的字符串中无需写入主键。

public class Datajoin extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {Configuration conf = this.getConf();JobConf job = new JobConf(conf, Datajoin.class);job.setJarByClass(Datajoin.class);Path in = new Path("hdfs://172.16.0.87:9000/user/jeff/datajoin/");Path out = new Path("hdfs://172.16.0.87:9000/user/jeff/datajoin/out");FileInputFormat.setInputPaths(job, in);FileOutputFormat.setOutputPath(job, out);job.setJobName("DataJoin");job.setMapperClass(Mapclass.class);job.setReducerClass(Reduce.class);job.setInputFormat(TextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TaggedWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.set("mapred.textoutputformat.separator", ",");JobClient.runJob(job);return 0;}public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Configuration(), new Datajoin(), args);System.exit(res);}
}

运行mapreduce任务后的输出为:

1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008
2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007
3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008
3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009

可以在reduce的combin函数中控制函数的输出形式,左联,或者右联。

转载于:https://my.oschina.net/hanjiafu/blog/291614

hadoop datajoin相关推荐

  1. hadoop学习;datajoin;chain签名;combine()

    hadoop有种简化机制来管理job和control的非线性作业之间的依赖.job对象时mapreduce的表现形式.job对象的实例化可通过传递一个jobconf对象到作业的构造函数中来实现. x. ...

  2. MapReduce DataJoin 链接多数据源

    主要介绍用DataJoin类来链接多数据源,先看一下例子,假设二个数据源customs和orders customer ID       Name      PhomeNumber 1         ...

  3. hadoop data join

    概念: Hadoop有一个叫DataJoin的包为Data Join提供相应的框架.它的Jar包存在于contrib/datajoin/hadoop-*-datajoin. 为区别于其他的data j ...

  4. Hadoop in action 第45678章

    第四五章     MapReduce基础         实例             使用专利局的数据             开发最好基于一个模板             单个类完整定义每个Map ...

  5. hadoop找出QQ共同好友算法实现

    背景 A:B,C,D,E,F 表示A有bcdef好友 B:C,D,H,Y 以上可知道AB的共同好友为CD 思路: 1:我们先找出一个人被哪几个人共同拥有 测试数据: 2:第一阶段mr程序: packa ...

  6. hadoop 添加删除机器以及设置免密登录

    添加hadoop机器 先在slaves中添加机器 然后启动datanode $: ./usr/hadoop-0.20.2-cdh3u4/bin/hadoop-daemon.sh start datan ...

  7. linux环境下快速配置hadoop集群免密登录

    背景 在hadoop的日常使用过程中经常需要登录某些机器,如何更好的免密登录呢?这将为我们节省大量的时间 操作 假设你需要在A机器上免密登录B机器,那么你首先要确定B机器下是有秘钥文件的.如何确定是否 ...

  8. hadoop问题小结

    20220322 https://blog.csdn.net/lt5227/article/details/119459827 hadoop控制台设置密码 访问验证 20220314 进入hive 高 ...

  9. hadoop,spark,scala,flink 大数据分布式系统汇总

    20220314 https://shimo.im/docs/YcPW8YY3T6dT86dV/read 尚硅谷大数据文档资料 iceberg相当于对hive的读写,starrocks相当于对mysq ...

最新文章

  1. Matlab与数据结构 -- 求向量或矩阵的最大值
  2. 关于System.TypeInitializationException异常
  3. MySQL(五)MySQL事务
  4. Git详细安装教程,翻译
  5. [开源第一波]SmartScript
  6. Android 工具:TraceView
  7. texlive2021
  8. 数学建模十大算法01-蒙特卡洛算法(Monte Carlo)
  9. 有梦想,就去追,不犹豫,不后悔
  10. x轴z轴代表的方向图片_x y z三个轴的方向 x轴、y轴和z轴分别代表的是什么?
  11. 将Unity虚拟相机视角画面显示在一个平面上
  12. MM41/MM42/MM43零售物料主数据BAPI创建示例(WRF_MATERIAL_MAINTAINDATA_RT)
  13. 下载亚马逊Amazon页面产品视频办法(亚马逊视频下载解决方案)
  14. Cydia崩溃错误修复
  15. c# 编程入门第六课常量,枚举,枚举转换成其他类型变量(枚举和int,转 string字符串转枚举),结构体,数组,冒泡排序,实际使用排序,方法(函数)
  16. docker实现elasticsearch集群实战
  17. mongo按季度统计_27省份公布前三季度quot;成绩单quot;
  18. Efficient Cohesive Subgraphs Detection in Parallel
  19. 实现传智播客部分首页代码
  20. 基因相关性心律失常_哪些心律失常可以做基因检测?

热门文章

  1. api权限管理系统与前后端分离实践
  2. SVM: 实际中使用SVM的一些问题
  3. oracle 11gR2 RAC 安装
  4. 为ASP.NET MVC扩展异步Action功能(下)
  5. 2015年个人年度目标总结-产品狗版
  6. 【双十二】电商们的文案大战,猫狗快被玩坏了!
  7. CUDA学习(三)之使用GPU进行两个数组相加
  8. 一个案例说出python的十余个语法知识点
  9. x和伪全面屏(18:9)关于图片切片有白条
  10. linux与windows互传文件、用户与用户组管理、密码配置文件