这篇文章继续进行有关使用MapReduce进行数据密集型处理的书中实现算法的系列文章。 第一部分可以在这里找到。 在上一篇文章中,我们讨论了使用本地聚合技术来减少通过网络进行混洗和传输的数据量的方法。 减少传输的数据量是提高MapReduce作业效率的主要方法之一。 单词计数MapReduce作业用于演示本地聚合。 由于结果只需要总数,因此我们可以为合并器重新使用相同的化简器,因为更改加数的顺序或分组不会影响总和。

但是,如果您想要平均水平呢? 然后,由于计算平均值的平均值不等于原始数字集的平均值,因此相同的方法将行不通。 尽管有了一点见识,我们仍然可以使用本地聚合。 对于这些示例,我们将使用Hadoop最终指南书中使用的NCDC天气数据集的示例。 我们将计算1901年每个月的平均温度。可以在MapReduce的数据密集型处理的第3.1.3章中找到组合器和映射器内组合选项的平均值算法。

一种尺寸并不适合所有人

上一次,我们介绍了两种用于在MapReduce作业中减少数据的方法:Hadoop组合器和映射器内组合方法。 Hadoop框架将组合器视为一种优化,并且无法保证调用组合器的次数(如果有的话)。 结果,映射器必须以减速器期望的形式发出数据,因此,如果不涉及组合器,则最终结果不会更改。 要针对计算平均值进行调整,我们需要返回到映射器并更改其输出。

映射器更改

在单词计数示例中,未优化的映射器仅发出单词和1的计数。合并器和映射器内组合映射器通过将每个单词保留为哈希映射中的键(总计数为n)来优化此输出。值。 每次看到一个单词,计数都将增加1。在这种设置下,如果未调用组合器,则缩减器将接收到该单词作为键,并将一长串的1?s加在一起,从而得到相同的输出(当然,使用映射器内组合映射器可以避免此问题,因为可以保证合并结果是映射器代码的一部分)。 为了计算平均值,我们将使基本映射器发出一个字符串键(将天气观测的年和月连接在一起)和一个自定义可写对象,称为TemperatureAveragingPair。 TemperatureAveragingPair对象将包含两个数字(IntWritables),获取的温度和一个计数。 我们将从Hadoop:权威指南中获取MaximumTemperatureMapper,并以此为灵感来创建AverageTemperatureMapper:

public class AverageTemperatureMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {//sample line of weather data//0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999private Text outText = new Text();private TemperatureAveragingPair pair = new TemperatureAveragingPair();private static final int MISSING = 9999;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String yearMonth = line.substring(15, 21);int tempStartPosition = 87;if (line.charAt(tempStartPosition) == '+') {tempStartPosition += 1;}int temp = Integer.parseInt(line.substring(tempStartPosition, 92));if (temp != MISSING) {outText.set(yearMonth);pair.set(temp, 1);context.write(outText, pair);}}
}

通过使映射器输出键和TemperatureAveragingPair对象,无论调用组合器如何,我们的MapReduce程序都可以保证具有正确的结果。

合路器

我们需要减少发送的数据量,因此我们将对温度求和,对计数求和并分别存储。 这样,我们将减少发送的数据,但保留计算正确平均值所需的格式。 如果/在调用组合器时,它将采用所有传入的TemperatureAveragingPair对象,并为同一键发出单个TemperatureAveragingPair对象,其中包含温度和计数值的总和。 这是合并器的代码:

public class AverageTemperatureCombiner extends Reducer<Text,TemperatureAveragingPair,Text,TemperatureAveragingPair> {private TemperatureAveragingPair pair = new TemperatureAveragingPair();@Overrideprotected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {int temp = 0;int count = 0;for (TemperatureAveragingPair value : values) {temp += value.getTemp().get();count += value.getCount().get();}pair.set(temp,count);context.write(key,pair);}
}

但是我们非常有兴趣确保我们减少了发送到reducer的数据量,因此我们将看看下一步如何实现。

在Mapper合并平均值中

类似于单词计数示例,为了计算平均值,映射器内组合映射器将使用哈希图,将连接的年+月作为键,将TemperatureAveragingPair作为值。 每次获得相同的年+月组合时,我们都会将对对象从地图中取出,添加温度并将计数增加一个。 调用cleanup方法后,我们将发出所有对及其各自的键:

public class AverageTemperatureCombiningMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {//sample line of weather data//0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999private static final int MISSING = 9999;private Map<String,TemperatureAveragingPair> pairMap = new HashMap<String,TemperatureAveragingPair>();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String yearMonth = line.substring(15, 21);int tempStartPosition = 87;if (line.charAt(tempStartPosition) == '+') {tempStartPosition += 1;}int temp = Integer.parseInt(line.substring(tempStartPosition, 92));if (temp != MISSING) {TemperatureAveragingPair pair = pairMap.get(yearMonth);if(pair == null){pair = new TemperatureAveragingPair();pairMap.put(yearMonth,pair);}int temps = pair.getTemp().get() + temp;int count = pair.getCount().get() + 1;pair.set(temps,count);}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {Set<String> keys = pairMap.keySet();Text keyText = new Text();for (String key : keys) {keyText.set(key);context.write(keyText,pairMap.get(key));}}
}

通过遵循在映射调用之间跟踪数据的相同模式,我们可以通过实现映射器内合并策略来实现可靠的数据减少。 同样的注意事项适用于在对映射器的所有调用中保持状态,但是考虑使用这种方法可以提高处理效率,这是值得考虑的。

减速器

在这一点上,编写我们的reducer很容易,为每个键列出一对配对,将所有温度和计数求和,然后将温度的总和除以计数的总和。

public class AverageTemperatureReducer extends Reducer<Text, TemperatureAveragingPair, Text, IntWritable> {private IntWritable average = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {int temp = 0;int count = 0;for (TemperatureAveragingPair pair : values) {temp += pair.getTemp().get();count += pair.getCount().get();}average.set(temp / count);context.write(key, average);}
}


结果

使用合并器和映射器内合并映射器选项可以预测结果,从而显着减少数据输出。
未优化的映射器选项:

12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:05:28 INFO mapred.JobClient:     Combine output records=0
12/10/10 23:05:28 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce shuffle bytes=111594
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:05:28 INFO mapred.JobClient:     Spilled Records=13128
12/10/10 23:05:28 INFO mapred.JobClient:     Map output bytes=98460
12/10/10 23:05:28 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:05:28 INFO mapred.JobClient:     Combine input records=0
12/10/10 23:05:28 INFO mapred.JobClient:     Map output records=6564
12/10/10 23:05:28 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input records=6564

组合器选项:

12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:07:19 INFO mapred.JobClient:     Combine output records=12
12/10/10 23:07:19 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce shuffle bytes=210
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:07:19 INFO mapred.JobClient:     Spilled Records=24
12/10/10 23:07:19 INFO mapred.JobClient:     Map output bytes=98460
12/10/10 23:07:19 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:07:19 INFO mapred.JobClient:     Combine input records=6564
12/10/10 23:07:19 INFO mapred.JobClient:     Map output records=6564
12/10/10 23:07:19 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input records=12

映射器内合并选项:

12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:09:09 INFO mapred.JobClient:     Combine output records=0
12/10/10 23:09:09 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce shuffle bytes=210
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:09:09 INFO mapred.JobClient:     Spilled Records=24
12/10/10 23:09:09 INFO mapred.JobClient:     Map output bytes=180
12/10/10 23:09:09 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:09:09 INFO mapred.JobClient:     Combine input records=0
12/10/10 23:09:09 INFO mapred.JobClient:     Map output records=12
12/10/10 23:09:09 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input records=12

计算结果:
(注意:示例文件中的温度以摄氏度* 10为单位)

未优化 合路器 映射器内合并器映射器
190101 -25
190102 -91
190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77
190101 -25
190102 -91
190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77
190101 -25
190102 -91
190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77


结论

对于简单的情况(可以将reducer重用为组合器)和更复杂的情况(对于如何构造数据同时仍能从本地聚集数据以提高处理效率)有所了解,我们已经介绍了本地聚集。

进一步阅读

  • Jimmy Lin和Chris Dyer 使用MapReduce进行的数据密集型处理
  • Hadoop: Tom White 的权威指南
  • 来自博客的源代码
  • Hadoop API
  • MRUnit用于单元测试Apache Hadoop映射减少工作
  • Gutenberg项目提供了大量纯文本格式的书籍,非常适合在本地测试Hadoop作业。

参考: 使用MapReduce进行数据密集型文本处理-本地聚合第二部分,来自我们的JCG合作伙伴 Bill Bejeck,来自“ 随机编码思考”博客。

翻译自: https://www.javacodegeeks.com/2012/10/mapreduce-working-through-data-2.html

MapReduce:处理数据密集型文本处理–局部聚合第二部分相关推荐

  1. mapreduce 聚合_MapReduce:处理数据密集型文本处理–局部聚合第二部分

    mapreduce 聚合 这篇文章继续进行有关使用MapReduce进行数据密集型处理的书中实现算法的系列文章. 第一部分可以在这里找到. 在上一篇文章中,我们讨论了使用本地聚合技术来减少通过网络进行 ...

  2. mapreduce文本排序_MapReduce:通过数据密集型文本处理

    mapreduce文本排序 自上次发布以来已经有一段时间了,因为我一直忙于Coursera提供的一些课程. 有一些非常有趣的产品,值得一看. 前一段时间,我购买了Jimmy Lin和Chris Dye ...

  3. MapReduce:通过数据密集型文本处理

    自上次发布以来已经有一段时间了,因为我一直在忙于Coursera提供的一些课程. 有一些非常有趣的产品,值得一看. 前一段时间,我购买了Jimmy Lin和Chris Dyer的MapReduce数据 ...

  4. 《数据密集型计算和模型》第二章大数据时代的计算机体系结构复习

    <数据密集型计算和模型>第二章的有关内容.主要复习内容为:计算部件.存储部件.网络部件.软件定义部件.虚拟资源管理系统等. 文章目录 大数据时代的计算机体系结构 一.计算部件 1. 多核和 ...

  5. spark 数据倾斜之两阶段聚合(局部聚合+全局聚合)

    两阶段聚合(局部聚合+全局聚合) 方案适用场景: 对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案. 方案 ...

  6. 地区的json数据_数据密集型系统基础:数据模型与查询语言

    (点击上方公众号,可快速关注) 转自:知了小巷-知了笔记 [导读]:大多数应用程序是通过一层一层叠加数据模型来构建的.每一层都面临的关键问题是:「如何将其用下一层来表示?」复杂的应用程序可能会有更多的 ...

  7. 设计数据密集型应用 第二章:数据模型与查询语言

    第二章:数据模型与查询语言 语言的边界就是思想的边界. -- 路德维奇·维特根斯坦,<逻辑哲学>(1922) 文章目录 第二章:数据模型与查询语言 关系模型与文档模型 NoSQL的诞生 对 ...

  8. Appboy 基于 MongoDB 的数据密集型实践

    摘要:Appboy 正在过手机等新兴渠道尝试一种新的方法,让机构可以与顾客建立更好的关系,可以说是市场自动化产业的一个前沿探索者.在移动端探索上,该公司已经取得了一定的成功,知名产品有 iHeartM ...

  9. 《大数据》2015年第3期“网络大数据专题”——网络大数据的文本内容分析

    网络大数据的文本内容分析 程学旗,兰艳艳 (中国科学院计算技术研究所 北京 100019) 摘要:文本内容分析是实现大数据的理解与价值发现的有效手段.尝试从短文本主题建模.单词表达学习和网页排序学习3 ...

最新文章

  1. 样本不均衡SMOTE算法代码实列
  2. 剑指offer(49)把字符串转换成整数。
  3. php中json_encode中文编码问题分析
  4. Ubuntu解决gedit warning问题的方法
  5. 用#ifndef、#define、#endif避免头文件的重定义
  6. EasyUI中Dialog对话框的简单使用
  7. Nginx隐藏PHP入口文件index.php
  8. 重构——解决过长参数列表(long parameter list)
  9. python数据科学库_Python数据科学库
  10. C#复习笔记(2)--C#1所搭建的核心基础
  11. FastDFS 原理介绍
  12. mybatis 的大于号 小于号 大于等于 小于等于
  13. tplink服务器无响应dns,tplink路由器dns异常
  14. js 实现 图片刷新 验证码 看不清 换一张
  15. python爬取整个网页,教师节不知道给老师送什么?
  16. NYOJ-110-剑客决斗
  17. 【听】《斯坦福极简经济学》,国家调控
  18. 数字IC设计的基本流程和主流EDA工具
  19. Java继承(extends )
  20. 从燃油车布局新能源,汽车服务商们谋破局

热门文章

  1. 自动化测试框架 2019_2019年用于自动化的5个最佳Java测试框架
  2. envoy api 网关_在边缘,作为网关或在网格中构建控制平面以管理Envoy代理的指南...
  3. react 线程_React式服务中的线程本地状态可用性
  4. java parse_Java命令行界面(第9部分):parse-cmd
  5. 动态代码生成 静态代码生成_将速度提升到自己的个人代码生成器中
  6. perl大骆驼和小骆驼_快速的骆驼和云消息传递
  7. 每个Java学习者都会犯的10大常见错误1
  8. Apache Camel 2.23发布
  9. .net2.0 orm_Hibernate 4.3 ORM工具
  10. arrays.sort(._Arrays.sort与Arrays.parallelSort