揭秘InputFormat:掌控Map Reduce任务执行的利器
随着越来越多的公司采用Hadoop,它所处理的问题类型也变得愈发多元化。随着Hadoop适用场景数量的不断膨胀,控制好怎样执行以及何处执行map任务显得至关重要。实现这种控制的方法之一就是自定义InputFormat实现。
InputFormat 类是Hadoop Map Reduce框架中的基础类之一。该类主要用来定义两件事情:
- 数据分割(Data splits)
- 记录读取器(Record reader)
数据分割 是Hadoop Map Reduce框架中的基础概念之一,它定义了单个Map任务的大小及其可能的执行服务器信息。记录读取器 主要负责从输入文件实际读取数据并将它们(以键值对的形式)提交给mapper。尽管有不少文章介绍过怎样实现自定义的记录读取器(例如,参考文章[1]),但是关于如何进行分割(split)的介绍却相当粗略。这里我们将会解释什么是分割,并介绍怎样实现自定义分割来完成特定任务。
剖析分割
任何分割操作的实现都继承自Apache抽象基类——InputSplit,它定义了分割的长度及位置。分割长度 是指分割数据的大小(以字节为单位),而分割位置 是分割所在的机器结点名称组成的列表,其中待分割的数据都会于本地存在。分割位置可以方便调度器决定在哪个机器上执行此次分割。简化后的[1]作业跟踪器(job tracker)工作流程如下:
- 接受来自某个任务跟踪器(task tracker)的心跳通信,得到该位置map的可用情况。
- 为队列等候中的分割任务找到可用的“本地”结点。
- 向任务跟踪器提交分割请求以待执行。
数据局部性(Locality)的相关程度因存储机制和整体的执行策略的不同而不同。例如,在Hadoop分布式文件系统(HDFS)中,分割通常对应一个物理数据块大小以及该数据块物理定位所在的一系列机器(其中机器总数由复制因子定义)的位置。这就是FileInputFormat 计算分割的过程。
而HBase的实现则采用了另外一套方法。在HBase中,分割 对应于一系列属于某个表区域(table region)的表键(table keys),而位置则为正在运行区域服务器的机器。
计算密集型应用
Map Reduce应用中有一类特殊的应用叫做计算密集型应用(Compute-Intensive application)。这类应用的特点在于Mapper.map()函数执行的时间要远远长于数据访问的时间,且至少要差一个数量级。从技术角度来说,虽然这类应用仍然可以使用“标准”输入格式的实现,但是它会带来数据存放结点过少而集群内剩余结点没能充分利用的问题(见图1)。
(点击图片进行放大)
图1:数据局部性情况下的结点使用图
图1中显示了针对计算密集型应用,使用“标准”数据局部性导致的结点使用率上的巨大差异——有些结点(红色标注)被过度使用,而其他结点(黄色和浅绿色标注)则使用不足。由此可见,在针对计算密集型应用时,需要重新思考对“局部性”概念的认识。在这种情况下,“局部性”意味着所有可用结点之间map任务的均匀分布——即最大化地使用集群机器的计算能力。
使用自定义InputFormat改变“局部性”
假定源数据以文件序列的形式存在,那么一个简单的ComputeIntensiveSequenceFileInputFormat 类(见清单1)便可以实现将分割生成的结果均匀地分布在集群中的所有服务器上。
package com.navteq.fr.mapReduce.InputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
public class ComputeIntensiveSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> {
private static final double SPLIT_SLOP = 1.1; // 10% slop
static final String NUM_INPUT_FILES = "mapreduce.input.num.files";
/**
* Generate the list of files and make them into FileSplits.
*/
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// get servers in the cluster
String[] servers = getActiveServersList(job);
if(servers == null)
return null;
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus>files = listStatus(job);
int currentServer = 0;
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
new String[] {servers[currentServer]}));
currentServer = getNextServer(currentServer, servers.length);
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
new String[] {servers[currentServer]}));
currentServer = getNextServer(currentServer, servers.length);
}
} else if (length != 0) {
splits.add(new FileSplit(path, 0, length,
new String[] {servers[currentServer]}));
currentServer = getNextServer(currentServer, servers.length);
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files in the job-conf
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
return splits;
}
private String[] getActiveServersList(JobContext context){
String [] servers = null;
try {
JobClient jc = new JobClient((JobConf)context.getConfiguration());
ClusterStatus status = jc.getClusterStatus(true);
Collection<String> atc = status.getActiveTrackerNames();
servers = new String[atc.size()];
int s = 0;
for(String serverInfo : atc){
StringTokenizer st = new StringTokenizer(serverInfo, ":");
String trackerName = st.nextToken();
StringTokenizer st1 = new StringTokenizer(trackerName, "_");
st1.nextToken();
servers[s++] = st1.nextToken();
}
}catch (IOException e) {
e.printStackTrace();
}
return servers;
}
private static int getNextServer(int current, int max){
current++;
if(current >= max)
current = 0;
return current;
}
}
清单1:ComputeIntensiveSequenceFileInputFormat类
该类继承自 SequenceFileInputFormat 并重写了getSplits()方法,虽然计算分割的过程与FileInputFormat完全一样,但是它为分割指定了“局部性”,以便从集群中找出可用的服务器。getSplits()利用到了以下两个方法:
- getActiveServersList() 方法,返回集群中当前可用的服务器(名称)组成的数组。
- getNextServer() 方法,返回服务器数组中下一个服务器索引,且当服务器数组中元素全部用尽时,返回数组头部重新开始。
虽然上述实现(见清单1)将map任务的执行均匀地分布在了集群中的所有服务器上,但是它却完全忽略了数据的实际位置。稍微好点的getSplits方法实现(见清单2)可以试图将两种策略结合在一起:既尽量多地放置针对数据的本地作业,且保持剩余作业在集群上的良好平衡。[2]
public List<InputSplit> getSplits(JobContext job) throws IOException {
// get splits
List<InputSplit> originalSplits = super.getSplits(job);
// Get active servers
String[] servers = getActiveServersList(job);
if(servers == null)
return null;
// reassign splits to active servers
List<InputSplit> splits = new ArrayList<InputSplit>(originalSplits.size());
int numSplits = originalSplits.size();
int currentServer = 0;
for(int i = 0; i < numSplits; i++, currentServer = i>getNextServer(currentServer,
servers.length)){
String server = servers[currentServer]; // Current server
boolean replaced = false;
// For every remaining split
for(InputSplit split : originalSplits){
FileSplit fs = (FileSplit)split;
// For every split location
for(String l : fs.getLocations()){
// If this split is local to the server
if(l.equals(server)){
// Fix split location
splits.add(new FileSplit(fs.getPath(), fs.getStart(),
fs.getLength(), new String[] {server}));
originalSplits.remove(split);
replaced = true;
break;
}
}
if(replaced)
break;
}
// If no local splits are found for this server
if(!replaced){
// Assign first available split to it
FileSplit fs = (FileSplit)splits.get(0);
splits.add(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(),
new String[] {server}));
originalSplits.remove(0);
}
}
return splits;
}
清单2:优化过的getSplits方法
在此实现中,我们首先使用父类(FileInputSplit)来得到包含位置计算在内的分割以确保数据局部性。然后我们计算出可用的服务器列表,并为每一个存在的服务器尝试分配与其同处本地的分割。
延迟公平调度
虽然清单1和清单2中的代码都正确地计算出了分割位置,但当我们试图在Hadoop集群上运行代码时,就会发现结果与服务器之间产生均匀分布相去甚远。参考文章[2]中很好的描述了我们观察到的这个问题,并为该问题描述了一种解决方案——即延迟公平调度。
假设已经设置好了公平调度程序,那么下面的程序段应当加入到mapred-site.xml文件中以激活某个延迟调度程序[3]:
<property><name>mapred.fairscheduler.locality.delay</name><value>360000000</value> <property>
适当借助延迟公平调度程序,作业执行将可以利用整个集群(见图2)。此外,根据我们的实验,这种情况下的执行时间相比“数据局部性”的做法要节省约30%。
(点击图片进行放大)
图2:执行局部性情况下的结点使用图
其他注意事项
用于测试的计算作业共使用了96个split和mapper任务。测试集群拥有19个数据结点,其中每个数据结点拥有8个mapper槽,因此该集群共有152个可用槽。当该作业运行时,它并没有充分利用集群中的所有槽。
Ganglia的两份截图都展示了我们所使用的测试集群,其中前三个结点为控制结点,而第四个结点为边缘结点,主要用来启动作业。图中展示了中央处理器/机器的负载情况。在图1中,有一些结点被过度使用(红色显示),而集群中的其他结点则未得到充分利用。在图2中,虽然我们得到了更加平衡的分布,然而集群仍然未被充分利用。用于测试的作业也可以运行多线程,这么做会增加中央处理器的负载,但同时也会降低在每次Mapper.map()迭代上的整体时间花费。正如图3所示,通过增加线程数量,我们可以更好地利用集群资源,并进一步减少完成作业所花费的时间。通过改变作业区域性,我们可以在不牺牲性能的情况下更好地利用群集处理远程作业数据。
(点击图片进行放大)
图3:使用多线程Map作业的执行区域性情况下的结点使用图
即使机器中央处理器处于高负荷状态,它仍然可以允许其他磁盘I/O密集型作业运行在开放槽中,要注意的是,这么做会带来些许的性能下降。
自定义分割
本文中提到的方法对大文件非常适用,但是对于小文件而言,并没有足够的分割来让其使用集群中的多台机器。一种可行的方法是使用更小的分割块,但是这么做会给集群命名结点带来更多的负担(内存需求方面)。一种更好的做法是修改清单1中的代码,以使用自定义的块大小(而不是文件块大小)。这种方法可以计算出所需的分割块数量,而不用理会实际的文件大小。
总结
在这篇文章中,我们已经展示了如何利用自定义的InputFormats来更紧密地控制Map Reduce中的map任务在可用服务器间的分布。这种控制对于一类特殊应用——计算密集型应用非常重要,控制过程将Hadoop Map Reduce做为通用的并行执行框架使用。
关于作者
Boris Lublinsky是NAVTEQ公司的首席架构师,在这家公司中他的工作是为大型数据管理、处理以及SOA定义架构愿景,并且实施各种NAVTEQ的项目。他还是InfoQ的SOA编辑,OASIS的SOA RA工作组的参与者。Boris是一位作者并经常发表演讲,他最新的一本书叫做《Applied SOA》。
Michael Segel在过去二十多年里一直不断与客户合作,帮助他们发现并解决业务上的问题。Michael做过许多不同类型的工作,也在不同的行业圈摸打滚爬过。他是一位独立顾问,并且总是期望能够解决所有具有挑战性的问题。Michael拥有俄亥俄州立大学的软件工程学位。
参考
1. Boris Lublinsky, Mike Segel. Custom XML Records Reader.
2. Matei Zaharia, Dhruba Borthakur, Joydeep Sen Sarma, Khaled Elmeleegy, Scott Shenker, Ion Stoica. Delay Scheduling: A Simple Technique for Achieving Locality and Fairness in Cluster Scheduling.
[1] 这是一个简化后的解释。真实的调度算法要复杂得多;考虑更多的参数而不仅仅是分割的位置。
[2] 虽然我们将其列作一个选项,但是如果花费在Mapper.map()方法上的时间高与远程访问数据时间的一个或多个数量级时,清单1中的代码将不会有任何性能上的提升。但尽管如此,它也许会带来网络利用率上的略微提高。
[3] 请注意这里的延迟以毫秒为单位,在改变该值之后需要重新启动作业跟踪器。
查看英文原文:Uncovering mysteries of InputFormat: Providing better control for your Map Reduce execution.
转自:http://www.infoq.com/cn/articles/HadoopInputFormat-map-reduce
转载于:https://www.cnblogs.com/linguoqian/archive/2012/08/29/2662110.html
揭秘InputFormat:掌控Map Reduce任务执行的利器相关推荐
- [ZZ]Map/Reduce hadoop 细节
转自:Venus神庙原文:http://www.cnblogs.com/duguguiyu/archive/2009/02/28/1400278.html 分布式计算(Map/Reduce) 分布式计 ...
- Hadoop Map/Reduce教程
Hadoop Map/Reduce教程 目的 先决条件 概述 输入与输出 例子:WordCount v1.0 源代码 用法 ...
- .NET Core开发实战(第21课:中间件:掌控请求处理过程的关键)--学习笔记(下)...
21 | 中间件:掌控请求处理过程的关键 如果在 Map 的时候逻辑复杂一点,不仅仅判断它的 URL 地址,而且要做特殊的判断的话,可以这么做把判断逻辑变成一个委托 我们要判断当我们的请求地址包含 a ...
- .NET Core开发实战(第21课:中间件:掌控请求处理过程的关键)--学习笔记(上)...
21 | 中间件:掌控请求处理过程的关键 这一节讲解一下如何通过中间件来管理请求处理过程 中间件工作原理 next 表示后面有一个委托,每一层每一层套下去可以在任意的中间件来决定在后面的中间件之前执行 ...
- 一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)
Map/Reduce用户界面 本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现.配置和调优进行具体的设置.然而,开发时候还是要相应着API进行相关操作. 首先我们须要了解M ...
- DophinScheduler server部分 核心代码详细解析——掌控任务和进程的呼吸与脉搏:log、monitor与registry
2021SC@SDUSC 文章目录 一.整体结构 二.具体分析 1.log 1.LoggerRequestProcessor 2.LoggerServer 3.MasterLogFilter 2.mo ...
- 多巴胺如何驱使我们克服复杂情况、逆境、情绪, 让我们掌控周遭的环境的
来源:本文摘自<贪婪的多巴胺> 仅仅是"想要"很少能让你得到任何东西.你必须弄清楚如何获得它,以及它是否值得拥有.事实上,如果我们做事时不考虑怎么做和下一步做什么,失败 ...
- 开发日记-20190824 关键词 读书笔记《掌控习惯》DAY 3
<掌控习惯> 第三章 培养良好习惯的四步法 习惯是重复了足够多的次数后而变得自动化的行为.习惯形成的过程始于反复尝试.每当你在生活中遇到新的情况,你的大脑就要做出决定.对此我该如何回应?你 ...
- 开发日记-20190822 关键词 读书笔记《Unix环境高级编程(第二版)》《掌控习惯》DAY 2
Preface 话说,昨天开始尝试着去改变自己,从基础的习惯开始,11:30准时睡觉,平时差不多12:30才睡觉.按理说,比平时早了一个小时睡觉吧,然后我就把闹钟提前了45分钟,想着还能比平常多睡15 ...
- TED:如何掌控你的自由时间以及让自己变得更好,这样就能看到爱情应有的样子...
TED:如何掌控你的自由时间以及让自己变得更好,这样就能看到爱情应有的样子 一.<如何掌控你的自由时间> (1)时间管理的传统思维:守时和节省零散的时间.演讲者认为这个观点已经彻底落后. ...
最新文章
- Kaggle心得(一)
- ThinkPHP模板之二
- 网游运营基础知识与专业术语
- epoll(eventpoll)是干嘛的?IO多路转接技术(相较select、poll的优点)
- Docker容器化部署python
- 《大数据管理概论》一2.5 知识融合技术
- iptables(3)
- ef 子表和父表不同时保存_canon粉不懂镜头参数?我只能嘲笑你
- 4.3 createjs
- insert into 多条数据_最全总结 | 聊聊 Python 数据处理全家桶(MongoDB 篇)
- 天线座和连接器SMA、U.FL、IPX、IPEX
- 各个省市mysql表附带行政id(一)
- android 多媒体音频占用情况监听
- GTX1060 Windows10 旧版显卡驱动下载链接
- Re10:读论文 Are we really making much progress? Revisiting, benchmarking, and refining heterogeneous gr
- Mac pro 安装ubuntu系统
- 运动蓝牙耳机挑选要注意什么?蓝牙耳机知识科普
- 数显之家快讯:【SHIO世硕心语】2021,对你的老板好一点!
- 被称为偏执的企业家,他成功跻身中国民企500强
- 单片机上电后没有运转,需要从这些方面考虑
热门文章
- app.honeycomb.Shell$HomeActivity failed to start
- 让MySQL不区分大小写
- MongoDB 将Json数据直接写入MongoDB的方法
- OGRE: OgreOverlaySystem.h: No such file or directory
- 全面综合的管理平台,让所有网络都有管理员
- 编译与运行ORB-SLAM的问题:1、unistd.h 2、virtual memory exhausted 3、internal compiler error 4、共享文件夹设置
- 4月23 nuTonomy的语义层(人行横道,人行道,交通信号灯,停车线,车道等)的扩展包
- 3月22 坐标系转换,旋转矩阵,仿射变换,例子,相机与世界,欧拉角与轴角公式,一个坐标系下面的轨迹
- 交换局域网(链路层+以太网+交换机)
- 数据结构:二叉查找树