hadoop datajoin
2019独角兽企业重金招聘Python工程师标准>>>
hadoop datajoin 之reduce side join
hadoop提供了一个叫datajoin的jar包,用于解决两张表关联的问题。jar位于/hadoop/contrib/datajoin将jar包引入进行开发。
涉及的几个概念:
1.Data Source:基本与关系数据库中的表相似,形式为:(例子中为CSV格式)
Customers Orders
1,Stephanie Leung,555-555-5555 3,A,12.95,02-Jun-2008
2,Edward Kim,123-456-7890 1,B,88.25,20-May-2008
3,Jose Madriz,281-330-8004 2,C,32.00,30-Nov-2007
4,David Stork,408-555-0000 3,D,25.02,22-Jan-2009
2.Tag:由于记录类型(Customers或Orders)与记录本身分离,标记一个Record会确保特殊元数据会一致存在于记录中。在这个目的下,我们将使用每个record自身的Data source名称标记每个record。
3.Group Key:Group Key类似于关系数据库中的链接键(join key),在我们的例子中,group key就是Customer ID(第一列的3)。由于datajoin包允许用户自定义group key,所以其较之关系数据库中的join key更一般、平常。
使用以下样例数据
customers-20140716
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000
orders-20140716
3,A,12.95,02-Jun-2008
1,B,88.25,20-May-2008
2,C,32.00,30-Nov-2007
3,D,25.02,22-Jan-2009
请看流程图
第一部分,自定义的数据类型。数据类型主要包含两部分tag和data,tag是给数据打上的标签用来表示数据来源于哪个文件。data是数据记录。
上代码:
public class TaggedWritable extends TaggedMapOutput {private Text data;public TaggedWritable() {this.tag = new Text("");this.data = new Text("");}public TaggedWritable(Text data) {this.data = data;}@Overridepublic void readFields(DataInput in) throws IOException {this.tag.readFields(in);this.data.readFields(in);}@Overridepublic void write(DataOutput out) throws IOException {this.tag.write(out);this.data.write(out);}@Overridepublic Text getData() {return data;}
}
第二部分,map函数
public class Mapclass extends DataJoinMapperBase{@Overrideprotected Text generateGroupKey(TaggedMapOutput aRecord) {String line = aRecord.getData().toString();String[] tokens = line.split(",");String groupKey = tokens[0];return new Text(groupKey);}@Overrideprotected Text generateInputTag(String inputFile) {String datasource = inputFile.split("-")[0];return new Text(datasource);}@Overrideprotected TaggedWritable generateTaggedMapOutput(Object value) {TaggedWritable retv = new TaggedWritable(new Text(value.toString()));retv.setTag(this.inputTag);return retv;}
}
map函数中要特别注意protected TaggedWritable generateTaggedMapOutput(Object value) 该方法的返回类型为你第一步定义的类型。
第三部分,reduce
public class Reduce extends DataJoinReducerBase{@Overrideprotected TaggedMapOutput combine(Object[] tags, Object[] values) {if (tags.length < 2) {return null; }String joinedStr = ""; for (int i=0; i<values.length; i++) { if (i > 0){joinedStr += ","; }TaggedWritable tw = (TaggedWritable) values[i]; String line = tw.getData().toString(); System.out.println("line=========:"+line);String[] tokens = line.split(",", 2); joinedStr += tokens[1]; } TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; }
}
reduce过程会将主键与数据组合在一起输出,你拼接的字符串中无需写入主键。
public class Datajoin extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {Configuration conf = this.getConf();JobConf job = new JobConf(conf, Datajoin.class);job.setJarByClass(Datajoin.class);Path in = new Path("hdfs://172.16.0.87:9000/user/jeff/datajoin/");Path out = new Path("hdfs://172.16.0.87:9000/user/jeff/datajoin/out");FileInputFormat.setInputPaths(job, in);FileOutputFormat.setOutputPath(job, out);job.setJobName("DataJoin");job.setMapperClass(Mapclass.class);job.setReducerClass(Reduce.class);job.setInputFormat(TextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TaggedWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.set("mapred.textoutputformat.separator", ",");JobClient.runJob(job);return 0;}public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Configuration(), new Datajoin(), args);System.exit(res);}
}
运行mapreduce任务后的输出为:
1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008
2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007
3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008
3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009
可以在reduce的combin函数中控制函数的输出形式,左联,或者右联。
转载于:https://my.oschina.net/hanjiafu/blog/291614
hadoop datajoin相关推荐
- hadoop学习;datajoin;chain签名;combine()
hadoop有种简化机制来管理job和control的非线性作业之间的依赖.job对象时mapreduce的表现形式.job对象的实例化可通过传递一个jobconf对象到作业的构造函数中来实现. x. ...
- MapReduce DataJoin 链接多数据源
主要介绍用DataJoin类来链接多数据源,先看一下例子,假设二个数据源customs和orders customer ID Name PhomeNumber 1 ...
- hadoop data join
概念: Hadoop有一个叫DataJoin的包为Data Join提供相应的框架.它的Jar包存在于contrib/datajoin/hadoop-*-datajoin. 为区别于其他的data j ...
- Hadoop in action 第45678章
第四五章 MapReduce基础 实例 使用专利局的数据 开发最好基于一个模板 单个类完整定义每个Map ...
- hadoop找出QQ共同好友算法实现
背景 A:B,C,D,E,F 表示A有bcdef好友 B:C,D,H,Y 以上可知道AB的共同好友为CD 思路: 1:我们先找出一个人被哪几个人共同拥有 测试数据: 2:第一阶段mr程序: packa ...
- hadoop 添加删除机器以及设置免密登录
添加hadoop机器 先在slaves中添加机器 然后启动datanode $: ./usr/hadoop-0.20.2-cdh3u4/bin/hadoop-daemon.sh start datan ...
- linux环境下快速配置hadoop集群免密登录
背景 在hadoop的日常使用过程中经常需要登录某些机器,如何更好的免密登录呢?这将为我们节省大量的时间 操作 假设你需要在A机器上免密登录B机器,那么你首先要确定B机器下是有秘钥文件的.如何确定是否 ...
- hadoop问题小结
20220322 https://blog.csdn.net/lt5227/article/details/119459827 hadoop控制台设置密码 访问验证 20220314 进入hive 高 ...
- hadoop,spark,scala,flink 大数据分布式系统汇总
20220314 https://shimo.im/docs/YcPW8YY3T6dT86dV/read 尚硅谷大数据文档资料 iceberg相当于对hive的读写,starrocks相当于对mysq ...
最新文章
- Matlab与数据结构 -- 求向量或矩阵的最大值
- 关于System.TypeInitializationException异常
- MySQL(五)MySQL事务
- Git详细安装教程,翻译
- [开源第一波]SmartScript
- Android 工具:TraceView
- texlive2021
- 数学建模十大算法01-蒙特卡洛算法(Monte Carlo)
- 有梦想,就去追,不犹豫,不后悔
- x轴z轴代表的方向图片_x y z三个轴的方向 x轴、y轴和z轴分别代表的是什么?
- 将Unity虚拟相机视角画面显示在一个平面上
- MM41/MM42/MM43零售物料主数据BAPI创建示例(WRF_MATERIAL_MAINTAINDATA_RT)
- 下载亚马逊Amazon页面产品视频办法(亚马逊视频下载解决方案)
- Cydia崩溃错误修复
- c# 编程入门第六课常量,枚举,枚举转换成其他类型变量(枚举和int,转 string字符串转枚举),结构体,数组,冒泡排序,实际使用排序,方法(函数)
- docker实现elasticsearch集群实战
- mongo按季度统计_27省份公布前三季度quot;成绩单quot;
- Efficient Cohesive Subgraphs Detection in Parallel
- 实现传智播客部分首页代码
- 基因相关性心律失常_哪些心律失常可以做基因检测?