【hadoop权威指南第四版】第五章MR应用【笔记+代码】
5.1 API的配置
- 配置文件
<?xml version="1.0"?>
<configuration>
<property>
<name>color</name>
<value>yellow</value>
<description>Color</description>
</property>
<property>
<name>size</name>
<value>10</value>
<description>Size</description>
</property>
<property>
<name>weight</name>
<value>heavy</value>
<final>true</final>
<description>Weight</description>
</property>
<property>
<name>size-weight</name>
<value>${size},${weight}</value>
<description>Size and weight</description>
</property>
</configuration>
usage
Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
assertThat(conf.get("color"), is("yellow"));
assertThat(conf.getInt("size", 0), is(10));
assertThat(conf.get("breadth", "wide"), is("wide"));
5.1.1 合并资源
多个资源定义一个配置的时候,后添加的会覆盖前面的。
<?xml version="1.0"?>
<configuration>
<property>
<name>size</name>
<value>12</value>
</property>
<property>
<name>weight</name>
<value>light</value>
</property>
</configuration>
usage
Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
conf.addResource("configuration-2.xml");
5.3 最高气温
- main
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.*;public class MaxTemperatureMapperTest {@Testpublic void processesValidRecord() throws IOException, InterruptedException {Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +// Year ^^^^"99999V0203201N00261220001CN9999999N9-00111+99999999999");// Temperature ^^^^^new MapDriver<LongWritable, Text, Text, IntWritable>().withMapper(new MaxTemperatureMapper()).withInput(new LongWritable(0), value).withOutput(new Text("1950"), new IntWritable(-11)).runTest();}
}
- map
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String year = line.substring(15, 19);int airTemperature = Integer.parseInt(line.substring(87, 92));context.write(new Text(year), new IntWritable(airTemperature));}
}
- reduce
5.4本地运行测试数据
map的数量
map的数量通常是由hadoop集群的DFS块大小确定的,也就是输入文件的总块数,正常的map数量的并行规模大致是每一个Node是10~100个,对于CPU消耗较小的作业可以设置Map数量为300个左右,但是由于hadoop的每一个任务在初始化时需要一定的时间,因此比较合理的情况是每个map执行的时间至少超过1分钟。具体的数据分片是这样的,InputFormat在默认情况下会根据hadoop集群的DFS块大小进行分片,每一个分片会由一个map任务来进行处理,当然用户还是可以通过参数mapred.min.split.size参数在作业提交客户端进行自定义设置。还有一个重要参数就是mapred.map.tasks,这个参数设置的map数量仅仅是一个提示,只有当InputFormat 决定了map任务的个数比mapred.map.tasks值小时才起作用。同样,Map任务的个数也能通过使用JobConf 的conf.setNumMapTasks(int num)方法来手动地设置。这个方法能够用来增加map任务的个数,但是不能设定任务的个数小于Hadoop系统通过分割输入数据得到的值。当然为了提高集群的并发效率,可以设置一个默认的map数量,当用户的map数量较小或者比本身自动分割的值还小时可以使用一个相对交大的默认值,从而提高整体hadoop集群的效率。reduece的数量
reduce在运行时往往需要从相关map端复制数据到reduce节点来处理,因此相比于map任务。reduce节点资源是相对比较缺少的,同时相对运行较慢,正确的reduce任务的个数应该是0.95或者1.75 *(节点数 ×mapred.tasktracker.tasks.maximum参数值)。如果任务数是节点个数的0.95倍,那么所有的reduce任务能够在 map任务的输出传输结束后同时开始运行。
如果任务数是节点个数的1.75倍,那么高速的节点会在完成他们第一批reduce任务计算之后开始计算第二批 reduce任务,这样的情况更有利于负载均衡。
同时需要注意增加reduce的数量虽然会增加系统的资源开销,但是可以改善负载匀衡,降低任务失败带来的负面影响。同样,Reduce任务也能够与 map任务一样,通过设定JobConf 的conf.setNumReduceTasks(int num)方法来增加任务个数。
reduce数量为0
有些作业不需要进行归约进行处理,那么就可以设置reduce的数量为0来进行处理,这种情况下用户的作业运行速度相对较高,map的输出会直接写入到 SetOutputPath(path)设置的输出目录,而不是作为中间结果写到本地。同时Hadoop框架在写入文件系统前并不对之进行排序。1.reduce任务的数量并非由输入数据的大小决定,而是特别指定的。可以设定mapred.tasktracker.map.task.maximum和mapred.tasktracker.reduce.task.maximum属性的值来指定map和reduce的数量。
2.reduce最优个数与集群中可用的reduce任务槽相关,总槽数由节点数乘以每个节点的任务槽。
3.本地作业运行器上,只支持0个或者1个reduce任务。
4.在一个tasktracker上能同时运行的任务数取决于一台机器有多少个处理器,还与相关作业的cpu使用情况有关,经验法则是任务数(包括map和reduce)处理器的比值为1到2.
5.如果有多个reduce任务,则每个map输出通过哈希函数分区,每个分区对应一个reduce任务。
6.一个小例子: 一个客户端有8个处理器,计划在处理器上跑2个进程,则可以将mapred.tasktracker.map.task.maximum和mapred.tasktracker.reduce.task.maximum都设置为7(考虑还有datanode和tasktracker进程,所以那两项不能设置为8)。
public class NcdcRecordParser {private static final int MISSING_TEMPERATURE = 9999;private String year;private int airTemperature;private String quality;public void parse(String record) {year = record.substring(15, 19);String airTemperatureString;// Remove leading plus sign as parseInt doesn't like them (pre-Java 7)if (record.charAt(87) == '+') {airTemperatureString = record.substring(88, 92);} else {airTemperatureString = record.substring(87, 92);}airTemperature = Integer.parseInt(airTemperatureString);quality = record.substring(92, 93);}public void parse(Text record) {parse(record.toString());}public boolean isValidTemperature() {return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");}public String getYear() {return year;}public int getAirTemperature() {return airTemperature;}
}
5.5 集群上运行
5.5.1打包
单机上运行的代码不需要修改就能在集群上运行,但是需要吧程序打包成JAR文件,然后发到集群。使用Ant能简化过程。
5.5.5调试作业
添加错误信息输出到map
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {enum Temperature {OVER_100}private NcdcRecordParser parser = new NcdcRecordParser();@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {parser.parse(value);if (parser.isValidTemperature()) {int airTemperature = parser.getAirTemperature();if (airTemperature > 1000) {System.err.println("Temperature over 100 degrees for input: " + value);context.setStatus("Detected possibly corrupt record: see logs.");context.getCounter(Temperature.OVER_100).increment(1);}context.write(new Text(parser.getYear()), new IntWritable(airTemperature));}}
}
遇到了错误信息,将输出一行到标准错误流以代表有问题的行,同时使用reporter的setStatus()方法来更新map中的状态信息,提示我们查看日志。其中还增加了一个计数器,用java的enum类型来表示。上例中的OVER_100。
检测温度前的加号减号
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {enum Temperature {MALFORMED}private NcdcRecordParser parser = new NcdcRecordParser();@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {parser.parse(value);if (parser.isValidTemperature()) {int airTemperature = parser.getAirTemperature();context.write(new Text(parser.getYear()), new IntWritable(airTemperature));} else if (parser.isMalformedTemperature()) {System.err.println("Ignoring possibly corrupt input: " + value);context.getCounter(Temperature.MALFORMED).increment(1);}}
}
5.6作业调优
- 作业调优检查表
【hadoop权威指南第四版】第五章MR应用【笔记+代码】相关推荐
- 【hadoop权威指南第四版】第六章MR的工作原理【笔记+代码】
6.1 运行MR作业 工作原理 四大模块: 客户端,提交MR作业. jobtracker,协调作业的运行.jobtracker 是一个java应用程序,主类是Jobtracker. tasktrack ...
- 【hadoop权威指南第四版】第七章MR的类型与格式【笔记+代码】
7.1MR类型 7.2 输入格式 7.2.1输入分片与记录 InputFormat类的层次结构 每一个map操作只处理一个输入分片,并且一个一个地处理每条记录,也就是一个键值对. 在数据库中,一个输入 ...
- 【hadoop权威指南第四版】第三章hadoop分布式文件系统【笔记+代码】
3.1块 显示块信息 % hdfs fsck / -files -blocks 3.5 Java接口 3.5.1从hadoop URL读取数据 使用java.net.URL 对象来打开一个数据流 In ...
- 《CSS权威指南第三版》第二章的读书笔记
第2章 选择器 CSS的主要优点,就是很容易向所有同类型的元素应用一组样式. 1. 基本规则 修改所有h2: h2{color:gray;} a. 规则结构 每个规则有两个基本部分:选择器 ...
- hadoop权威指南第三版 发布说明
(此文摘自http://hadoopbook.com) hadoop权威指南第三版发行说明: 第三版会在2012年5月发行.你现在可以预定一份电子版,或购买"Early Release&qu ...
- Hadoop权威指南(第3版) 修订版(带目录书签) 中文PDF--高清晰
一.下载地址(永久有效) 百度云盘下载(公开永久):Hadoop权威指南(第3版) 修订版(带目录书签) 中文PDF高清晰 CSDN积分下载:Hadoop权威指南(第3版)+高清晰 二.数据的存储和分 ...
- 《JavaScript权威指南第四版》 电子版 电子书下载
JavaScript权威指南第四版 图书评价:★★★★☆ 图书语言:简体图书 图书大小:19.11MB 图书格式:PDF 图书作者:David Flanagan 更新日期:2006-05-23 下载次 ...
- 翻译:《JavaScript 权威指南(第5版)》第一章(一)
声明:翻译只有一个目的:学习用途.若有版权问题请及时联系本人. 本贴文根据篇幅将第一章的翻译分为两个部分,这是第一部分的内容. Chapter 1. Introduction to JavaScrip ...
- Hadoop权威指南(第二版)pdf中文版
今天终于找到 hadoop权威指南第二版的中文pdf版本了,发给大家共享一下 下载地址:http://dl.dbank.com/c0hh1arjiz ------------------------- ...
最新文章
- 【C 语言】文件操作 ( 读取文件中的结构体数组 | feof 函数使用注意事项 )
- 关于64位WIN7下正确建立JAVA开发环境(转
- android 官方DrawerLayout的介绍和使用
- python笔记全_Python笔记
- 前端技术之_CSS详解第五天
- PHP防止数字太大转化为科学计数法的方法
- Mysql中间件代理 Atlas
- c语言中的内存4区域模型(堆,栈,全局区,代码区)
- 是时候让《武林外传》教你反内卷了
- 华米科技:庇佑之下,黄汪难设温柔乡
- 人工智能是否将拥有人类意识?
- 适合大学生用的网课作业搜题找答案的神器分享~~
- 51单片机驱动P10单元板
- 算法提高课-图论-欧拉回路和欧拉路径-AcWing 1123. 铲雪车:披着欧拉回路外衣的小学数学题
- 思考题:一条信息可通过如图所示的网络线由上(A点)往下向各站点传送,例如信息到达b2点可由经a1的站点送达,也可以由经a2的站点送达,共有两条途径传送,那么信息由A点到达d3的不同途径共有多少条
- 普通人学会Python到底具体能做什么呢?
- python 十进制转二进制
- 3D Max中布尔运算介绍
- Base64(本地存储加密解密)
- Codeforces Round #787 (Div. 3) F. Vlad and Unfinished Business