MapReduce - 读取数据

通过InputFormat决定读取的数据的类型,然后拆分成一个个InputSplit,每个InputSplit对应一个Map处理,RecordReader读取InputSplit的内容给Map

InputFormat

决定读取数据的格式,可以是文件或数据库等

功能

  1. 验证作业输入的正确性,如格式等

  2. 将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的Map任务

  3. 提供RecordReader实现,读取InputSplit中的"K-V对"供Mapper使用

方法

List getSplits(): 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题

RecordReader createRecordReader(): 创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题

类结构

TextInputFormat: 输入文件中的每一行就是一个记录,Key是这一行的byte offset,而value是这一行的内容

KeyValueTextInputFormat: 输入文件中每一行就是一个记录,第一个分隔符字符切分每行。在分隔符字符之前的内容为Key,在之后的为Value。分隔符变量通过key.value.separator.in.input.line变量设置,默认为(\t)字符。

NLineInputFormat: 与TextInputFormat一样,但每个数据块必须保证有且只有N行,mapred.line.input.format.linespermap属性,默认为1

SequenceFileInputFormat: 一个用来读取字符流数据的InputFormat,为用户自定义的。字符流数据是Hadoop自定义的压缩的二进制数据格式。它用来优化从一个MapReduce任务的输出到另一个MapReduce任务的输入之间的数据传输过程。

InputSplit

代表一个个逻辑分片,并没有真正存储数据,只是提供了一个如何将数据分片的方法

Split内有Location信息,利于数据局部化

一个InputSplit给一个单独的Map处理

public abstract class InputSplit {      /**       * 获取Split的大小,支持根据size对InputSplit排序.       */      public abstract long getLength() throws IOException, InterruptedException;      /**       * 获取存储该分片的数据所在的节点位置.       */      public abstract String[] getLocations() throws IOException, InterruptedException;}

RecordReader

将InputSplit拆分成一个个对给Map处理,也是实际的文件读取分隔对象

问题

大量小文件如何处理

CombineFileInputFormat可以将若干个Split打包成一个,目的是避免过多的Map任务(因为Split的数目决定了Map的数目,大量的Mapper Task创建销毁开销将是巨大的)

怎么计算split的

通常一个split就是一个block(FileInputFormat仅仅拆分比block大的文件),这样做的好处是使得Map可以在存储有当前数据的节点上运行本地的任务,而不需要通过网络进行跨节点的任务调度

通过mapred.min.split.size, mapred.max.split.size, block.size来控制拆分的大小

如果mapred.min.split.size大于block size,则会将两个block合成到一个split,这样有部分block数据需要通过网络读取

如果mapred.max.split.size小于block size,则会将一个block拆成多个split,增加了Map任务数(Map对split进行计算并且上报结果,关闭当前计算打开新的split均需要耗费资源)

先获取文件在HDFS上的路径和Block信息,然后根据splitSize对文件进行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默认splitSize 就等于blockSize的默认值(64m)

public ListgetSplits(JobContext job) throws IOException {    // 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));    long maxSize = getMaxSplitSize(job);    // generate splits    List splits = new ArrayList();    List files = listStatus(job);    for (FileStatus file: files) {        Path path = file.getPath();        long length = file.getLen();        if (length != 0) {              FileSystem fs = path.getFileSystem(job.getConfiguration());            // 获取该文件所有的block信息列表[hostname, offset, length]              BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);            // 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割              if (isSplitable(job, path)) {                long blockSize = file.getBlockSize();                // 计算分片大小                // 即 Math.max(minSize, Math.min(maxSize, blockSize));                long splitSize = computeSplitSize(blockSize, minSize, maxSize);                long bytesRemaining = length;                // 循环分片。                // 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,停止分片                while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {                      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);                      splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));                      bytesRemaining -= splitSize;                }                // 处理余下的数据                if (bytesRemaining != 0) {                    splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));                }            } else {                // 不可split,整块返回                splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));            }        } else {            // 对于长度为0的文件,创建空Hosts列表,返回            splits.add(makeSplit(path, 0, length, new String[0]));        }    }    // 设置输入文件数量    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());    LOG.debug("Total # of splits: " + splits.size());    return splits;}

分片间的数据如何处理

split是根据文件大小分割的,而一般处理是根据分隔符进行分割的,这样势必存在一条记录横跨两个split

解决办法是只要不是第一个split,都会远程读取一条记录。不是第一个split的都忽略到第一条记录

public class LineRecordReader extends RecordReader<LongWritable, Text> {    private CompressionCodecFactory compressionCodecs = null;    private long start;    private long pos;    private long end;    private LineReader in;    private int maxLineLength;    private LongWritable key = null;    private Text value = null;    // initialize函数即对LineRecordReader的一个初始化    // 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片经过压缩的情况等    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {        FileSplit split = (FileSplit) genericSplit;        Configuration job = context.getConfiguration();        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);        start = split.getStart();        end = start + split.getLength();        final Path file = split.getPath();        compressionCodecs = new CompressionCodecFactory(job);        final CompressionCodec codec = compressionCodecs.getCodec(file);        // 打开文件,并定位到分片读取的起始位置        FileSystem fs = file.getFileSystem(job);        FSDataInputStream fileIn = fs.open(split.getPath());        boolean skipFirstLine = false;        if (codec != null) {            // 文件是压缩文件的话,直接打开文件            in = new LineReader(codec.createInputStream(fileIn), job);            end = Long.MAX_VALUE;        } else {            // 只要不是第一个split,则忽略本split的第一行数据            if (start != 0) {                skipFirstLine = true;                --start;                // 定位到偏移位置,下次读取就会从偏移位置开始                fileIn.seek(start);            }            in = new LineReader(fileIn, job);        }        if (skipFirstLine) {            // 忽略第一行数据,重新定位start            start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start));        }        this.pos = start;    }    public boolean nextKeyValue() throws IOException {        if (key == null) {            key = new LongWritable();        }        key.set(pos);// key即为偏移量        if (value == null) {            value = new Text();        }        int newSize = 0;        while (pos < end) {            newSize = in.readLine(value, maxLineLength,    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));            // 读取的数据长度为0,则说明已读完            if (newSize == 0) {                break;            }            pos += newSize;            // 读取的数据长度小于最大行长度,也说明已读取完毕            if (newSize < maxLineLength) {                break;            }            // 执行到此处,说明该行数据没读完,继续读入        }        if (newSize == 0) {            key = null;            value = null;            return false;        } else {            return true;        }    }}

internetreadfile读取数据长度为0_Hadoop 读取数据相关推荐

  1. 微型计算机可以处理的二进制数据长度,可以处理二进制数据长度的是

    大家好,我是时间财富网智能客服时间君,上述问题将由我为大家进行解答. 字长是计算机信息处理中能同时处理的二进制数据的长度.二进制是计算技术中广泛采用的一种数制.二进制数据是用0和1两个数码来表示的数. ...

  2. C++ 读取wav文件中的PCM数据

    前言 wav文件通常会使用PCM格式数据存储音频,这种格式的数据读取出来直接就可以播放,要在wav文件中读取数据,我们首先要获取头部信息,wav的文件结构里面分为多个chunk,我们要做的就是识别这些 ...

  3. Excel导出表格时,下拉框数据长度超过255出现的问题及解决办法

    文章目录 1.直接添加下拉框,数据量过多会有问题 2.使用隐藏sheet的方式实现 3.多选下拉框 4.参考: 1.直接添加下拉框,数据量过多会有问题 /*** 创建下拉列表选项(单元格下拉框数据小于 ...

  4. internetreadfile读取数据长度为0_Go发起HTTP2.0请求流程分析(后篇)——标头压缩

    阅读建议 这是HTTP2.0系列的最后一篇,笔者推荐阅读顺序如下: Go中的HTTP请求之--HTTP1.1请求流程分析 Go发起HTTP2.0请求流程分析(前篇) Go发起HTTP2.0请求流程分析 ...

  5. internetreadfile读取数据长度为0_【完结】TensorFlow2.0 快速上手手册

    大家好,这是专栏<TensorFlow2.0>的第五篇文章,我们对专栏<TensorFlow2.0>进行一个总结. 我们知道全新的TensorFlow2.0 Alpha已经于2 ...

  6. read函数 读取指定长度的数据

    read方法: 1. 不定长参数,表示读取文件中的所有数据 2. 指定数据长度,读取指定长度的数据   2.1  如果文件的操作模式是r模式,read(5) 表示这一次最多读取5个字符串长度的数据   ...

  7. php基础系列之 数据的存储和读取

    ·文件处理 ·写入一个文件 1,打开这个文件.如果这个文件不存在,需要先创建它 2,将数据写入这个文件 3,关闭这个文件 ·从一个文件读出数据 1,打开这个文件.如果这个文件不能打开(例如,文件不存在 ...

  8. python读取成功_Python如何从文件读取数据()

    Python编写一个文件读写程序(命令行程序) def readfromfile(filename): with open(filename, 'rt') as handle: return hand ...

  9. 数据导入(excel读取,存储进数据库【多表】)

    QQ:1187362408 欢迎技术交流和学习 数据导入(excel读取,存储进数据库[多表]),业务需求 TODO: 1,选择导入地区 2,数据校验: (角色类别[1]:一个用户具有多角色      ...

最新文章

  1. R语言学生化的极差分布函数Studentized Range Distribution(ptukey qtukey )实战
  2. Codeforces 1314 题解
  3. 天津大学仁爱学院c语言期末考试题,天津大学《C语言程序设计》2016年7月考试期末大作业...
  4. css定位position
  5. linux文件系统及bash基础特性
  6. 剑指offer--二维数组的查找
  7. SpringBoot————JPA快速使用
  8. pytorch源码解析2——数据处理torch.utils.data
  9. STM32——HAL库函数版——AD7656驱动程序
  10. hash表的一些基本知识
  11. 关于浮点数据类型和布尔数据类型以及最后的总结
  12. RealFlow在线教程翻译(5)——Shattered Glass (破碎的玻璃杯)
  13. 服务器被攻击显示,怎么查看服务器被攻击
  14. 新一代手机声音传音器THA-2开始发售,大家快来体验吧!
  15. 电池BMS软件架构设计和电池安全标准及测试
  16. The NTVDM CPU has encountered an illegal instruction. CS:0006 IP:130a ....
  17. 网站域名服务器加密,网站实现全站https加密可以防止DNS劫持吗?
  18. 【mysql的设计与优化专题(5)】慢查询详解
  19. Life with qmail -- 中文版(英文版本16 Aug 2003)
  20. C语言基础教程 之 系统关键字

热门文章

  1. 世界坐标系空间,页面空间,设备空间,物理设备空间
  2. 巧妙启用Windows 2003的远程桌面
  3. 《oracle大型数据库系统在AIX/unix上的实战详解》讨论31: oracle、sybase 数据库的不同访问...
  4. spintboot学习笔记
  5. web客户端 http error 413
  6. 10g启动归档模式及报ORA-00265错处理
  7. ESX中的Linux热添加磁盘
  8. GRE over IPSEC ×××
  9. C#编程利器系列文章
  10. IT人士群聚喝酒的讲究(转载)