在本文中,我们继续执行一系列实现算法的系列,该算法在使用MapReduce进行数据密集型文本处理中找到,这一次讨论数据联接。 虽然我们将讨论在Hadoop中联接数据的技术并提供示例代码,但在大多数情况下,您可能不会自己编写代码来执行联接。 取而代之的是,使用可以在更高抽象级别工作的工具(例如Hive或Pig)可以更好地完成连接数据。 如果有可以帮助您处理数据的工具,为什么还要花时间学习如何联接数据呢? 可以说,联接数据是Hadoop的最大用途之一。 全面了解Hadoop如何执行联接对于确定使用哪个联接以及在出现问题时进行调试至关重要。 此外,一旦您完全了解了Hadoop中如何执行不同的联接,就可以更好地利用Hive和Pig等工具。 最后,在一种情况下,一种工具可能无法满足您的需求,因此您必须袖手旁观并自行编写代码。

加入的需要

在处理大型数据集时,如果不是必需的话,通过公用密钥连接数据的需求可能会非常有用。 通过加入数据,您可以进一步获得洞察力,例如加入时间戳以将事件与一天中的时间关联起来。 连接数据的需求多种多样。 我们将在3个单独的帖子中介绍3种类型的联接:Reduce-Side联接,Map-Side联接和Memory-Backed联接。 在这一期中,我们将考虑使用Reduce-Side联接。

减少侧面连接

在我们将要讨论的联接模式中,减少端联接是最容易实现的。 简化方联接的直接原因是Hadoop将相同的密钥发送到相同的reducer,因此默认情况下,数据是为我们组织的。 要执行联接,我们只需要缓存一个密钥并将其与传入密钥进行比较。 只要键匹配,我们就可以结合来自相应键的值。 由于所有数据在整个网络上都经过混洗,因此使用减少侧连接进行权衡是性能。 在减少侧连接中,我们将考虑两种不同的方案:一对一和一对多。 我们还将探索不需要跟踪传入密钥的选项; 给定键的所有值都将在简化器中分组在一起。

一对一加入

一对一联接的情况是数据集“ X”中的值与数据集“ Y”中的值共享一个公共密钥。 由于Hadoop保证将相等的键发送到同一reducer,因此在两个数据集上进行映射将为我们处理联接。 由于仅对键进行排序,因此值的顺序未知。 我们可以使用辅助排序轻松解决这种情况。 我们二级排序的实现方式是用“ 1”或“ 2”标记键,以确定值的顺序。 我们需要采取一些额外的步骤来实施我们的标记策略。

实现一个WritableComparable

首先,我们需要编写一个实现WritableComparable接口的类,该接口将用于包装密钥。

public class TaggedKey implements Writable, WritableComparable<TaggedKey> {private Text joinKey = new Text();private IntWritable tag = new IntWritable();@Overridepublic int compareTo(TaggedKey taggedKey) {int compareValue = this.joinKey.compareTo(taggedKey.getJoinKey());if(compareValue == 0 ){compareValue = this.tag.compareTo(taggedKey.getTag());}return compareValue;}//Details left out for clarity}

当我们对TaggedKey类进行排序时,具有相同joinKey值的键将在tag字段的值上进行次要排序,以确保我们想要的顺序。

编写自定义分区程序

接下来,我们需要编写一个自定义分区程序,该分区程序仅在确定复合键和数据发送到哪个减速器时才考虑连接键:

public class TaggedJoiningPartitioner extends Partitioner<TaggedKey,Text> {@Overridepublic int getPartition(TaggedKey taggedKey, Text text, int numPartitions) {return taggedKey.getJoinKey().hashCode() % numPartitions;}
}

至此,我们拥有了连接数据并确保值顺序的条件。 但是,当键进入reduce()方法时,我们不想跟踪它们。 我们希望将所有价值观归为一体。 为此,我们将使用Comparator ,该Comparator在决定如何对值进行分组时仅考虑联接键。

编写组比较器

用于分组的比较器如下所示:

public class TaggedJoiningGroupingComparator extends WritableComparator {public TaggedJoiningGroupingComparator() {super(TaggedKey.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {TaggedKey taggedKey1 = (TaggedKey)a;TaggedKey taggedKey2 = (TaggedKey)b;return taggedKey1.getJoinKey().compareTo(taggedKey2.getJoinKey());}
}

数据结构

现在,我们需要确定将用于密钥的哪些数据。 对于我们的样本数据,我们将使用从Fakenames Generator生成的CSV文件。 第一列是GUID,它将用作我们的联接键。 我们的样本数据包含诸如姓名,地址,电子邮件,工作信息,信用卡和拥有的汽车之类的信息。 为了演示的目的,我们将使用GUID,名称和地址字段,并将它们放置在一个结构如下的文件中:

cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY

然后,我们将使用GUID,电子邮件地址,用户名,密码和信用卡号字段,然后将其放置在另一个文件中,该文件应类似于:

cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,517-706-9565,EstherJGarner@teleworm.us,Waskepter38,noL2ieghie,MasterCard,
5305687295670850
81a43486-07e1-4b92-b92b-03d0caa87b5f,508-307-3433,TimothyDDuncan@einrot.com,Conerse,Gif4Edeiba,MasterCard,
5265896533330445
aef52cf1-f565-4124-bf18-47acdac47a0e,212-780-4015,BrettMRamsey@dayrep.com,Subjecall,AiKoiweihi6,MasterCard,524

现在,我们需要有一个Mapper,它将知道如何处理我们的数据以提取正确的联接键并设置正确的标签。

创建映射器

这是我们的Mapper代码:

public class JoiningMapper extends Mapper<LongWritable, Text, TaggedKey, Text> {private int keyIndex;private Splitter splitter;private Joiner joiner;private TaggedKey taggedKey = new TaggedKey();private Text data = new Text();private int joinOrder;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));String separator = context.getConfiguration().get("separator");splitter = Splitter.on(separator).trimResults();joiner = Joiner.on(separator);FileSplit fileSplit = (FileSplit)context.getInputSplit();joinOrder = Integer.parseInt(context.getConfiguration().get(fileSplit.getPath().getName()));}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {List<String> values = Lists.newArrayList(splitter.split(value.toString()));String joinKey = values.remove(keyIndex);String valuesWithOutKey = joiner.join(values);taggedKey.set(joinKey, joinOrder);data.set(valuesWithOutKey);context.write(taggedKey, data);}}

让我们回顾一下setup()方法中发生的事情。

  1. 首先,从启动作业时在“配置”中设置的值获取连接键的索引和文本中使用的分隔符。
  2. 然后,我们创建一个Guava拆分器,用于拆分从对context.getConfiguration().get("separator")的调用中检索到的分隔符上的数据。 我们还创建了一个Guava Joiner,用于在提取密钥后将数据重新放在一起。
  3. 接下来,我们获取此映射器将要处理的文件的名称。 我们使用文件名来提取此配置中存储的文件的连接顺序。

我们还应该讨论map()方法中发生的事情:

  1. 分散数据并创建值列表
  2. 从列表中删除联接密钥
  3. 重新将数据重新合并为单个字符串
  4. 设置连接密钥,连接顺序和剩余数据
  5. 写出数据

因此,我们已经读入数据,提取了密钥,设置了连接顺序,然后将数据写回了。 让我们看一下如何结合数据。

联接数据

现在让我们看一下数据如何在化简器中联接:

public class JoiningReducer extends Reduce<TaggedKey, Text, NullWritable, Text> {private Text joinedText = new Text();private StringBuilder builder = new StringBuilder();private NullWritable nullKey = NullWritable.get();@Overrideprotected void reduce(TaggedKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {builder.append(key.getJoinKey()).append(",");for (Text value : values) {builder.append(value.toString()).append(",");}builder.setLength(builder.length()-1);joinedText.set(builder.toString());context.write(nullKey, joinedText);builder.setLength(0);}
}

因为带有“ 1”标签的密钥首先到达了还原器,所以我们知道名称和地址数据是第一个值,而电子邮件,用户名,密码和信用卡数据是第二个值。 因此,我们不需要跟踪任何键。 我们只需遍历值并将它们连接在一起。

一对一加入结果

这是运行我们的一对一MapReduce作业的结果:

cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,517-706-9565,EstherJGarner@teleworm.us,Waskepter38,noL2ieghie,MasterCard,
5305687295670850
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,508-307-3433,TimothyDDuncan@einrot.com,Conerse,Gif4Edeiba,MasterCard,
5265896533330445
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,212-780-4015,BrettMRamsey@dayrep.com,Subjecall,AiKoiweihi6,MasterCard,
5243379373546690

正如我们可以看到的,以上示例数据中的两条记录已合并为一条记录。 我们已经成功地将GUID,名称,地址,电子邮件地址,用户名,密码和信用卡字段加入到一个文件中。

指定加入顺序

此时,我们可能会问如何为多个文件指定连接顺序? 答案就在我们的ReduceSideJoinDriver类中,该类充当MapReduce程序的驱动程序。

public class ReduceSideJoinDriver {public static void main(String[] args) throws Exception {Splitter splitter = Splitter.on('/');StringBuilder filePaths = new StringBuilder();Configuration config = new Configuration();config.set("keyIndex", "0");config.set("separator", ",");for(int i = 0; i< args.length - 1; i++) {String fileName = Iterables.getLast(splitter.split(args[i]));config.set(fileName, Integer.toString(i+1));filePaths.append(args[i]).append(",");}filePaths.setLength(filePaths.length() - 1);Job job = Job.getInstance(config, "ReduceSideJoin");job.setJarByClass(ReduceSideJoinDriver.class);FileInputFormat.addInputPaths(job, filePaths.toString());FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));job.setMapperClass(JoiningMapper.class);job.setReducerClass(JoiningReducer.class);job.setPartitionerClass(TaggedJoiningPartitioner.class);job.setGroupingComparatorClass(TaggedJoiningGroupingComparator.class);job.setOutputKeyClass(TaggedKey.class);job.setOutputValueClass(Text.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
  1. 首先,我们在第5行上创建一个番石榴分割器,该分割器将用“ /”分割字符串。
  2. 然后在第8-10行上,设置连接键的索引和文件中使用的分隔符。
  3. 在第12-17行中,我们为要连接的输入文件设置标签。 命令行上文件名的顺序决定了它们在联接中的位置。 当我们从命令行循环遍历文件名时,我们将拆分整个文件名并通过Guava Iterables.getLast()方法检索最后一个值(基本文件名)。 然后,我们使用文件名作为键调用config.set() ,并使用i + 1作为值,这将设置标签或连接顺序。 args数组中的最后一个值在循环中被跳过,因为它用于第23行的MapReduce作业的输出路径。在循环的最后一行,我们将每个文件路径附加到StringBuilder中,稍后使用( 22)设置作业的输入路径。
  4. 我们只需要对所有文件使用一个映射器,即JoiningMapper,该映射器在第25行设置。
  5. 第27和28行分别设置了我们的自定义分区程序和组比较器,以确保键和值到达化简器的顺序,并使用正确的键正确地对值进行分组。

通过使用分区程序和分组比较器,我们知道第一个值属于第一个键,并且可以用于将Iterable包含的所有其他值连接到给定键的reduce()方法中。 现在是时候考虑一​​对多联接了。

一对多加入

好消息是到目前为止,我们已经完成了所有工作,实际上我们可以使用代码执行一对多连接。 对于一对多联接,我们可以考虑两种方法:1)一个包含单个记录的小文件,另一个包含具有相同键的多个记录的文件,以及2)同样具有单个记录的小文件,但是N每个文件包含与第一个文件匹配的记录的文件数。 主要区别在于,采用第一种方法时,除了前两个键的联接之外,值的顺序将是未知的。 但是,使用第二种方法,我们将“标记”每个联接文件,以便我们可以控制所有联接值的顺序。 对于我们的示例,第一个文件将保留为我们的GUID名称-地址文件,并且我们将拥有3个其他文件,其中将包含汽车,雇主和工作描述记录。 这可能不是最现实的情况,但将用于演示。 以下是在进行联接之前数据外观的示例:

//The single person records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY
//Automobile records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,2003 Holden Cruze
81a43486-07e1-4b92-b92b-03d0caa87b5f,2012 Volkswagen T5
aef52cf1-f565-4124-bf18-47acdac47a0e,2009 Renault Trafic
//Employer records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Creative Wealth
81a43486-07e1-4b92-b92b-03d0caa87b5f,Susie's Casuals
aef52cf1-f565-4124-bf18-47acdac47a0e,Super Saver Foods
//Job Description records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Data entry clerk
81a43486-07e1-4b92-b92b-03d0caa87b5f,Precision instrument and equipment repairer
aef52cf1-f565-4124-bf18-47acdac47a0e,Gas and water service dispatcher

一对多加入结果

现在,让我们看一下一对多联接结果的示例(使用上面的相同值来辅助比较):

cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,2003 Holden Cruze,Creative Wealth,Data entry clerk
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,2012 Volkswagen T5,Susie's Casuals,Precision instrument and equipment repairer
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,2009 Renault Trafic,Super Saver Foods,Gas and water service dispatcher

结果表明,我们已经能够成功地以指定顺序连接多个值。

结论

我们已经成功演示了如何在MapReduce中执行约简边连接。 即使该方法并不太复杂,我们也可以看到在Hadoop中执行联接可能涉及编写大量代码。 虽然学习联接的工作方式是一项有用的练习,但是在大多数情况下,使用Hive或Pig这样的工具联接数据要好得多。 谢谢你的时间。

资源资源

  • Jimmy Lin和Chris Dyer 使用MapReduce进行的数据密集型处理
  • Hadoop: Tom White 的权威指南
  • 来自博客的源代码和测试
  • 爱德华·卡普里奥洛(Edward Capriolo),迪恩·沃普勒(Dean Wampler)和杰森·卢瑟格伦(Jason Rutherglen)的编程蜂巢
  • 通过Alan Gates对Pig进行编程
  • Hadoop API
  • MRUnit用于单元测试Apache Hadoop映射减少工作
参考: MapReduce算法–了解数据 ,是我们的JCG合作伙伴 Bill Bejeck在“ 随机思考编码”博客上的第1部分 。

翻译自: https://www.javacodegeeks.com/2013/07/mapreduce-algorithms-understanding-data-joins-part-1.html

MapReduce算法–了解数据联接第1部分相关推荐

  1. mapreduce 算法_MapReduce算法–了解数据联接第二部分

    mapreduce 算法 自从我上一次发布以来已经有一段时间了,就像我上一次大休息一样,我正在Coursera上一些课程. 这次是Scala中的函数式编程 原理和React式编程原理 . 我发现它们都 ...

  2. mapreduce 算法_MapReduce算法–了解数据联接第1部分

    mapreduce 算法 在本文中,我们继续执行一系列实现算法的系列,该算法在使用MapReduce进行数据密集型文本处理中找到,这一次讨论数据联接. 虽然我们将讨论在Hadoop中联接数据的技术并提 ...

  3. MapReduce算法–了解数据连接第二部分

    自从我上一次发布以来已经有一段时间了,就像我上一次大休息一样,我正在Coursera上一些课程. 这次是Scala中的函数式编程 原理和反应式编程原理 . 我发现它们都是不错的课程,如果有时间的话,建 ...

  4. mapreduce 算法_MapReduce算法–顺序反转

    mapreduce 算法 这篇文章是介绍MapReduce算法的系列文章的另一部分,该书在使用MapReduce进行数据密集型文本处理中找到. 先前的文章是Local Aggregation , Lo ...

  5. MapReduce算法–顺序反转

    这篇文章是介绍MapReduce算法的系列文章的另一部分,该书在使用MapReduce进行数据密集型文本处理中找到. 先前的文章是Local Aggregation , Local Aggregati ...

  6. MapReduce算法–二级排序

    我们将继续进行有关实现MapReduce算法的系列文章,该系列可在使用MapReduce进行数据密集型文本处理中找到. 本系列的其他文章: 使用MapReduce进行数据密集型文本处理 使用MapRe ...

  7. MapReduce:处理数据密集型文本处理–局部聚合第二部分

    这篇文章继续进行有关使用MapReduce进行数据密集型处理的书中实现算法的系列文章. 第一部分可以在这里找到. 在上一篇文章中,我们讨论了使用本地聚合技术来减少通过网络进行混洗和传输的数据量的方法. ...

  8. MapReduce:通过数据密集型文本处理

    自上次发布以来已经有一段时间了,因为我一直在忙于Coursera提供的一些课程. 有一些非常有趣的产品,值得一看. 前一段时间,我购买了Jimmy Lin和Chris Dyer的MapReduce数据 ...

  9. 简单解释 MapReduce 算法

    一个有趣的例子 你想数出一摞牌中有多少张黑桃.直观方式是一张一张检查并且数出有多少张是黑桃? MapReduce方法则是: 给在座的所有玩家中分配这摞牌 让每个玩家数自己手中的牌有几张是黑桃,然后把这 ...

最新文章

  1. c 汇编语言用标准函数代替,C与汇编的接口技术
  2. 数据结构-栈在括号匹配中的应用
  3. ubuntu解压和压缩文件
  4. TensorFlow下载与安装
  5. 个人技术博客--团队Git规范(参考西瓜学长)
  6. 测试操作数据库mysql数据库吗_软件测试-MySQL(六:数据库函数)
  7. python开源聊天框架_转载:15个最受欢迎的Python开源框架-阿里云开发者社区
  8. yb3防爆电机型号含义_温州出租大型发电机定做-智慧动力机械设备租赁
  9. 拼音模糊查询+java,Java将汉语转换成拼音,用于字母的模糊查询
  10. 超尴尬婆婆对儿媳的新婚之夜的指导
  11. openssl版本升级
  12. 阿里云ECS服务器修复漏洞
  13. 电脑jpg图片怎么批量转换成png
  14. “双一流”霸气官宣:博士生,涨薪!
  15. 百度网盘加速无限试用_百度网盘临时加速正式上线,最低 2 元
  16. Android打造带透明圆弧的ImageView
  17. cppcheck使用指南
  18. 下列关于虚电路网络的叙述中,错误的是( )
  19. 计算机管理事件id10016,事件 ID 10016 已记录Windows - Windows Client | Microsoft Docs
  20. 射频连接器的分类和主要指标

热门文章

  1. arm linux串口控制led,通信程序设计 - Linux下ARM和单片机的串口通信设计
  2. C语言的运算符的优先级与结合性+ASCII表
  3. cli parser_Java命令行界面(第27部分):cli-parser
  4. java-ee-api_刷新器-Java EE 7概览
  5. java8 函数式编程_使用Javaslang进行Java 8中的函数式编程
  6. openshift_OpenShift上具有NetBeans的Java EE
  7. 内存泄露严重吗_内存泄漏–测量频率和严重性
  8. java jsoup解析_3使用Jsoup解析Java中HTML文件的示例
  9. gradle docker_带有Gradle的Docker容器分为4个步骤
  10. java 从未导入_Java 8的10个您从未听说过的功能