MapReduce 优缺点

MapReduce 分布式运算程序的编程框架优点:使分布式编程变得简单,高容错性,适合PB级以上的数据处理缺点: 不适合实时计算

MapReduce流程

  1. 客户端submit前,获取待处理的信息,根据配置生成任务规划,比如有多少个maptask等。
  2. 客户端提交信息到yarn,提交的信息包括job的配置,jar包等。
  3. 各个节点上启动maptask,调用InputFormat中的RecordReader,读取数据转成key/value对。
  4. 将读取的key/value交给mapper处理,map中调用我们自己写代码。
  5. map中逻辑代码执行完之后会调用context.wirte(key,value),witre会调用NewOutputCollector类中的MapOutputBuffer类,进而调用write方法写到环形缓冲区。 从map开始写入环形缓冲区,到reduce之前,这一部分叫做shuffle。写入缓存区之前先分区,将分区号一并写入环形缓存区。
  6. 当环形缓存区达到80%的时候开始溢写.即从内存写数据到文件。在写文件之前需要根据key进行全排序,排序方式为快排,排序之后是按区,区中按key值有序。
  7. 每一次溢写都会有一个数据文件,还有一个索引文件
  8. 当所有的数据执行完发生了多次溢写,有多个文件,接着合并文件,合并文件用的归并排序。每一个maptask有一个文件,所以最终会有多个文件,到此mapTask执行完毕
  9. ReduceTask从map端拉数据到ReduceTask,ReduceTask首先把从不同的maptask上拉取的文件进行合,因为文件本身就是有序的,所以用归并排序进行合并,合成一个文件。 每一个reduce对应一个分区,所以每一个ReduceTask拉取的数据都是自己分区的那个数据。
  10. 按key分组输入到我们自己写的reduce中,最后调用OutputFormat写出数据到磁盘。

序列化

Hadoop没有用Java的序列化Serializable,因为它是一个重量级的序列化框架,一个对象被序列化后会带有很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输,所以Hadoop开发了一套自己的框架机制writable。 之所以快,因为Writable机制只是传输必要的信息,因为少,所以快. 实现也很简单 实现Writable接口,实现write和readFileds即可。但是如果在MapReduce中作为key的话需要实现的是CompareWritable,因为MapReduce的shuffle中需要排序,所以用到比较。

MapReduce 并行处理

默认情况下map阶段的并行度由客户端在提交Job时的切片数量决定,每一个切片分配一个map task处理,默认情况下切片的大小和block的大小一致。切片时不考虑文件的整体,只考虑单个文件,这句话的意思是一个文件夹下可能有好几个文件,作为输入的时候要按每一个具体文件切片。 切片的大小当然也可以设置,可以设置成不是block的大小,比如说设置成100M,那这样话如果数据有三个块,分别放到不同的DataNode上,block的大小是128M,就会出现网络传输,因为第一个片切分100M还剩28M,这28M会传输给下一个DataNode,下一个DataNode拿到这28M后加上本地的128M继续切分 100M,剩56M传给下一个节点交给下一个节点的map task处理,这样的话就消耗了网络流量,网络传输是比较珍贵的资源,没有和block大小一致有更高的性能。

改变切片的大小

切片的大小虽然默认是128M,但是也是可以改的,但是没有具体的哪个值是指定这个切片大小,我们可以看下FileInputFormat部分源码,因为默认使用的是FileINputFormat中的切片方法,所以我们看这个,当然还有其他的切片方法,下面会分析:

public List getSplits(JobContext job) throws IOException {  StopWatch sw = new StopWatch().start();  //最小值  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  //最大值  long maxSize = getMaxSplitSize(job);  。。。。。。。。。。  //如果允许切片  if (isSplitable(job, path)) {  long blockSize = file.getBlockSize();  //切片大小  long splitSize = computeSplitSize(blockSize, minSize, maxSize);  long bytesRemaining = length;  while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);    splits.add(makeSplit(path, length-bytesRemaining, splitSize,                blkLocations[blkIndex].getHosts(),                blkLocations[blkIndex].getCachedHosts()));    bytesRemaining -= splitSize;  }

由上面的切片大小方法computeSplitSize,可以知道由三个值决定,块的大小,最大值与最小值:

protected long computeSplitSize(long blockSize, long minSize,                                long maxSize) {  return Math.max(minSize, Math.min(maxSize, blockSize));}

看上面的计算切片大小的源码,可以知道规则是 在最大值(maxSize)和块大小(blockSize)取最小值,在最小值(minSize)和后面的结果取最大值,其实就是在minSize和maxSize,blockSiz三个值之间取了一个中间值作为切片大小。minSize默认值是多少呢?看源码:

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));protected long getFormatMinSplitSize() {  return 1;}public static long getMinSplitSize(JobContext job) {  return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);}public static final String SPLIT_MINSIZE =   "mapreduce.input.fileinputformat.split.minsize";

由上面的源码可以知道,由参数mapreduce.input.fileinputformat.split.minsize 设置,默认为1,所以如果不设置的话,默认大小为1那maxSize的大小为多少呢?

public static long getMaxSplitSize(JobContext context) {  return context.getConfiguration().getLong(SPLIT_MAXSIZE,                                             Long.MAX_VALUE);}public static final String SPLIT_MAXSIZE =   "mapreduce.input.fileinputformat.split.maxsize";

maxSize大小由mapreduce.input.fileinputformat.split.maxsize设置,默认为long的最大值。

所以最大值为long的极大值,最小值为1,块的大小为128M,取中间值所以片的大小为128M.块的大小我们不能改,所以我们如果要改切片大小的话,只能改minSize和maxSize,既然是取中间值,如果 我们想要片的大小比128M大,需要改minSize值,如果比128M小,则需要给maxSize的值。根据我的理解,注意这里只是逻辑切片,决定好MapTask的数量等值之后,提交给ResourceManager,ResourceManager分配真正的maptask,到具体的NodeManager运行。

InputFormat

那么我们在写map程序的时候,需要重写maper,maper里面有一个map方法:

protected void map(LongWritable key, Text value, Context context){}

第一个参数是偏移量,第二个是value,默认情况下是文件的一行值,有没有想过这个值是怎么来的么? 是由InputFormat这个类得来,InputFormat有两个方法:

public abstract class InputFormat {public abstract List getSplits(JobContext context) ;public abstract RecordReader createRecordReader(InputSplit split,                                         TaskAttemptContext context) }

两个方法一是计算分片,一个是获取RecordReader,而RecordReader则是用来读取文件。 默认是的InputFormat为TextInputFormat,可以在JobContextImpl中看到“:

public Class extends InputFormat,?>> getInputFormatClass()    throws ClassNotFoundException {  return (Class extends InputFormat,?>>)     conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);}

而TextInputFormat,继承自FileInputFormat,父类中getSplists就是 我们上面说的计算分片,而TextInputFormat返回的RecordReader是LineRecordReader,根据字面意思也知道返回的是行。

@InterfaceAudience.Public@InterfaceStability.Stablepublic class TextInputFormat extends FileInputFormat {  @Override  public RecordReader     createRecordReader(InputSplit split,                       TaskAttemptContext context) {    String delimiter = context.getConfiguration().get(        "textinputformat.record.delimiter");    byte[] recordDelimiterBytes = null;    if (null != delimiter)      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);    return new LineRecordReader(recordDelimiterBytes);  }  @Override  protected boolean isSplitable(JobContext context, Path file) {    final CompressionCodec codec =      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);    if (null == codec) {      return true;    }    return codec instanceof SplittableCompressionCodec;  }}

在理解LineRecordReader 这个类之前,我们应该先了解其父类RecordReader:

public abstract class RecordReader implements Closeable {public abstract void initialize(InputSplit split,TaskAttemptContext context)// 尝试读取下一个Key value 值,如果读到了则返回true,否则falsepublic abstract boolean nextKeyValue() ;//获取keypublic abstract KEYIN getCurrentKey() ;//获取valuepublic abstract VALUEIN getCurrentValue();//进度public abstract float getProgress();  public abstract void close() ;}

上面的nextKeyValue表示尝试读取下一个key/value的值,如果有的话,则调用方法getCurrentKey 和getCurrentValue来拿到key/value的值。这个规则不是我瞎掰的,有代码为证,在父类mapper中:

public void run(Context context) throws IOException, InterruptedException {  setup(context);  try {    while (context.nextKeyValue()) {      map(context.getCurrentKey(), context.getCurrentValue(), context);    }  } finally {    cleanup(context);  }}

明白了RecordReader的规则,我们再看LineRecordReader 这个类,其实只需要看下nextKeyValue这个方法就好了:

public class LineRecordReader extends RecordReader{    public boolean nextKeyValue() throws IOException {  if (key == null) {    key = new LongWritable();  }  key.set(pos);  if (value == null) {    value = new Text();  }  int newSize = 0;  // We always read one extra line, which lies outside the upper  // split limit i.e. (end - 1)  while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {    if (pos == 0) {      newSize = skipUtfByteOrderMark();    } else {      newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));      pos += newSize;    }。。。。。。。。。}@Overridepublic LongWritable getCurrentKey() {  return key;}@Overridepublic Text getCurrentValue() {  return value;}}

所以看上面的代码就可以知道,读取的时候是按行读取的,in.readLine,而maper的第一个参数LongWritable则是每一行的开头的偏移量

明白了上面的规则,我们也可以自定义自己的RecordReader,如果定义自己的RecordReader,则也需要定义自己的InputFormat。

当然既然分片是在InputFormat中的getSplits进行的,hadoop默认定义了多个InputFormat的实现类用于实现不同的分片和不同的读取数据的机制,比如说KeyValueTextInputFormat,该format集成自FileInputFormat,分片机制和TextInputFormat完全一样,但是读取数据虽然也是按行读取,但是key/value则不同,key默认是每一行的按tab键分割的第一串字符,当然按什么分割可以自己在Configration中定义,还有一个NLineInputFormat,这个分片机制则是按行来分片,具体多少行分片可以自己定义,比如按三行来切片:

 NLineInputFormat.setNumLinesPerSplit(job, 3);

分区

在这里插入图片描述

分区的数量不代表ReduceTask的数量,ReduceTask的数量可以使用job.setNumReduceTasks(2)指定。根据指定ReduceTask的数量之后,默认分区算法是用每一个key的哈希值取模ReduceTask的数量,结果代表此数据的分区号。每一个数据的分区在写入环形缓存区的时候已经计算完毕,一并写入环形缓存区。在溢写的过程中会先排序,排序的规则是个二次排序,意思是先根据分区比较,之后才是key值的比较,这样排序的结果就是先分区有序,接着是分区内数据值的有序。排序的代码是快排,类是QuickSort.java ,在排序的过程中,实际上排序的是环形缓冲区的索引,在MapOutputBuffer.java中,如下代码:

public int compare(final int mi, final int mj) {  final int kvi = offsetFor(mi % maxRec);  final int kvj = offsetFor(mj % maxRec);  final int kvip = kvmeta.get(kvi + PARTITION);  final int kvjp = kvmeta.get(kvj + PARTITION);  // sort by partition  if (kvip != kvjp) {    return kvip - kvjp;  }  // sort by key  return comparator.compare(kvbuffer,      kvmeta.get(kvi + KEYSTART),      kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),      kvbuffer,      kvmeta.get(kvj + KEYSTART),      kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));}

需要注意的是自定义分区一定要小于等于ReduceTask的数量,否则的话会抛异常,也很好理解,如果ReduceTask的数量大于分区的数量,最多ReduceTask会输出一个空文件,但是如果分区的数量大于ReduceTask,就证明该分区没有ReduceTask处理,那么这部分数据就没了意义,所以直接抛异常了。

public synchronized void collect(K key, V value, final int partition){    。。。。。。    if (partition = partitions) {     throw new IOException("Illegal partition for " + key + " (" +      partition + ")");}}

Combiner

上图中有多次用到Combiner:Combiner虽然本质上是一个reduce,但是没有默认的实现,需要自己定义并且在job中设置才可以。Combiner的作用是先在本地的合并,减少网络之间的传输量。但是并不是所有的输出都适合使用Combiner,只有那些不会改变最终结果的才适合使用,重点是不会改变最终的结果!使用它其实很简单,直接继承Reduce即可,所以Combiner的本质也是一个reduce,使用的话在job中设置一下即可:

job.setCombinerClass();

下篇再说说分组

欢迎关注我的微信公众号: 北风中独行的蜗牛

mapreduce value 排序_MapReduce知识点一相关推荐

  1. mapreduce文本排序_MapReduce:通过数据密集型文本处理

    mapreduce文本排序 自上次发布以来已经有一段时间了,因为我一直忙于Coursera提供的一些课程. 有一些非常有趣的产品,值得一看. 前一段时间,我购买了Jimmy Lin和Chris Dye ...

  2. Hadoop大数据——mapreduce的排序机制之total排序

    mapreduce的排序机制之total排序 (1)设置一个reduce task ,全局有序,但是并发度太低,单节点负载太大 (2)设置分区段partitioner,设置相应数量的reduce ta ...

  3. java程序员的大数据之路(10):MapReduce的排序

    排序 排序是MapReduce的核心技术.尽管应用本身可能不需要对数据排序,但仍对能使用MapReduce的排序功能来组织数据. 部分排序 默认情况下,MapReduce根据输入记录的键对数据集排序. ...

  4. c语言 秦九韶算法,高二数学期末复习秦九韶算法与排序的知识点

    高二数学期末复习秦九韶算法与排序的知识点 f(x)=anxn+an-1xn-1+.+a1x+a0求值问题 f(x)=anxn+an-1xn-1+.+a1x+a0=( anxn-1+an-1xn-2+. ...

  5. 大数据Hadoop(二十):MapReduce的排序和序列化

    文章目录 MapReduce的排序和序列化 概述 需求 分析 实现

  6. MapReduce统计排序和HDFS的读写

    实验材料及说明 在Ubuntu系统的/学号(每个人之间的学号)/salesInfo目录下,有买家的购买记录文件Sales,该文件记录了买家的id,购买商品的id以及购买日期,文件为名为Sales.Sa ...

  7. mapreduce python实例_MapReduce程序实例(python)

    问题背景 现在有两份数据,file1是校园新闻版块,每一条新闻点击记录:file2是校园新闻版块使用活跃度高的学生记录.用mr统计出某一天的点击记录里,使用ios/android手机的活跃学生的总的点 ...

  8. Java中的mapreduce没了_MapReduce的过程总结

    MapReduce 分为: 1) MapTask : 1.Read阶段:逻辑切片 128M / Maptask 读数据解析出一个个key/value. 2.Map阶段: 把key/value 写入到m ...

  9. Mapreduce的排序、全排序以及二次排序

    一:背景 Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序.自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可 ...

最新文章

  1. 服务器端开发经验总结 Linux C语言
  2. 你的 mixin 兼容 ECMAScript 5 吗?
  3. 398. Random Pick Index - LeetCode
  4. LeetCode Add Two Numbers
  5. 《一个操作系统的实现》——pmtest1.asm详解
  6. pytorch 三维点分类_三维点云分类与分割-PointNet
  7. php getdefaultvalue,PHP ReflectionParameter getDefaultValueConstantName()用法及代码示例
  8. thinkphp php 5.2,ThinkPHP5.2:时间查询(改进、优化)
  9. docker 配置ssh
  10. 用JavaScript写抖音很火的罗盘时钟,附源码
  11. 165-手表品牌浅看一下
  12. [转载]网络数据流的java处理
  13. linux命令行的杠“-”、杠杠“--”以及无杠
  14. pdf文件如何进行翻译?pdf文件翻译方法分享。
  15. matlab对摄像头实时运动目标检测,基于Matlab实时运动目标跟踪检测系统
  16. 木子-后端-Spring配置时间触发器的方法
  17. 一幅真实的产品世界全景图,产品小白向上突破必看!
  18. 【图像分割】ExG超绿特征灰度化、二值化分割(Matlab及Python代码实现)
  19. HTTP状态码1xx到5xx
  20. gdufe acm 1086 互质数

热门文章

  1. java utf-8文件处理bom头
  2. telegram 内联模式 介绍
  3. linux shell 检查端口开放
  4. linux socket 阻塞与非阻塞,同步与异步
  5. HTTP/2 h2 协议简介
  6. Linux7 下重新安装YUM
  7. golang指针与c指针的异同
  8. 切换Python2和Python3的方法
  9. linux清屏命令clear和reset
  10. Android--相机预览及拍照临时文件/SurfaceView