JobSplitWriter被作业客户端用于写分片相关文件,包括分片数据文件job.split和分片元数据信息文件job.splitmetainfo。它有两个静态成员变量,如下:

[java] view plaincopy
  1. // 分片版本,当前默认为1
  2. private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
  3. // 分片文件头部,为UTF-8格式的字符串"SPL"的字节数组"SPL"
  4. private static final byte[] SPLIT_FILE_HEADER;

并且,提供了一个静态方法,完成SPLIT_FILE_HEADER的初始化,代码如下:

[java] view plaincopy
  1. // 静态方法,加载SPLIT_FILE_HEADER为UTF-8格式的字符串"SPL"的字节数组byte[]
  2. static {
  3. try {
  4. SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
  5. } catch (UnsupportedEncodingException u) {
  6. throw new RuntimeException(u);
  7. }
  8. }

JobSplitWriter实现其功能的为createSplitFiles()方法,它有三种实现,我们先看其中的public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,Configuration conf, FileSystem fs, T[] splits),代码如下:

[java] view plaincopy
  1. // 创建分片文件
  2. public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,
  3. Configuration conf, FileSystem fs, T[] splits)
  4. throws IOException, InterruptedException {
  5. // 调用createFile()方法,创建分片文件,并获取文件系统数据输出流FSDataOutputStream实例out,
  6. // 对应路径为jobSubmitDir/job.split,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID
  7. FSDataOutputStream out = createFile(fs,
  8. JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
  9. // 调用writeNewSplits()方法,将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info
  10. SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
  11. // 关闭输出流
  12. out.close();
  13. // 调用writeJobSplitMetaInfo()方法,将分片元数据信息写入分片元数据文件
  14. writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
  15. new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
  16. info);
  17. }

createSplitFiles()方法的逻辑很清晰,大体如下:

1、调用createFile()方法,创建分片文件,并获取文件系统数据输出流FSDataOutputStream实例out,对应路径为jobSubmitDir/job.split,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID;

2、调用writeNewSplits()方法,将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info;

3、关闭输出流out;

4、调用writeJobSplitMetaInfo()方法,将分片元数据信息写入分片元数据文件。

我们先来看下createFile()方法,代码如下:

[java] view plaincopy
  1. private static FSDataOutputStream createFile(FileSystem fs, Path splitFile,
  2. Configuration job)  throws IOException {
  3. // 调用HDFS文件系统FileSystem的create()方法,获取文件系统数据输出流FSDataOutputStream实例out,
  4. // 对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--
  5. FSDataOutputStream out = FileSystem.create(fs, splitFile,
  6. new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
  7. // 获取副本数replication,取参数mapreduce.client.submit.file.replication,参数未配置默认为10
  8. int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);
  9. // 通过文件系统FileSystem实例fs的setReplication()方法,设置splitFile的副本数位10
  10. fs.setReplication(splitFile, (short)replication);
  11. // 调用writeSplitHeader()方法写入分片头信息
  12. writeSplitHeader(out);
  13. // 返回文件系统数据输出流out
  14. return out;
  15. }

首先,调用HDFS文件系统FileSystem的create()方法,获取文件系统数据输出流FSDataOutputStream实例out,对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;

其次,获取副本数replication,取参数mapreduce.client.submit.file.replication,参数未配置默认为10;

接着,通过文件系统FileSystem实例fs的setReplication()方法,设置splitFile的副本数位10;

然后,调用writeSplitHeader()方法写入分片头信息;

最后,返回文件系统数据输出流out。

writeSplitHeader()方法专门用于将分片头部信息写入分片文件,代码如下:

[java] view plaincopy
  1. private static void writeSplitHeader(FSDataOutputStream out)
  2. throws IOException {
  3. // 文件系统数据输出流out写入byte[],内容为UTF-8格式的"SPL"
  4. out.write(SPLIT_FILE_HEADER);
  5. // 文件系统数据输出流out写入int,分片版本号,目前为1
  6. out.writeInt(splitVersion);
  7. }

很简单,首先文件系统数据输出流out写入byte[],内容为UTF-8格式的"SPL",然后文件系统数据输出流out写入int,分片版本号,目前为1。

接下来,我们再看下writeNewSplits()方法,它将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info,代码如下:

[java] view plaincopy
  1. @SuppressWarnings("unchecked")
  2. private static <T extends InputSplit>
  3. SplitMetaInfo[] writeNewSplits(Configuration conf,
  4. T[] array, FSDataOutputStream out)
  5. throws IOException, InterruptedException {
  6. // 根据array的大小,构造同等大小的分片元数据信息SplitMetaInfo数组info,
  7. // array其实是传入的分片数组
  8. SplitMetaInfo[] info = new SplitMetaInfo[array.length];
  9. if (array.length != 0) {// 如果array中有数据
  10. // 创建序列化工厂SerializationFactory实例factory
  11. SerializationFactory factory = new SerializationFactory(conf);
  12. int i = 0;
  13. // 获取最大的数据块位置maxBlockLocations,取参数mapreduce.job.max.split.locations,参数未配置默认为10
  14. int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
  15. MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
  16. // 通过输出流out的getPos()方法获取输出流out的当前位置offset
  17. long offset = out.getPos();
  18. // 遍历数组array中每个元素split
  19. for(T split: array) {
  20. // 通过输出流out的getPos()方法获取输出流out的当前位置prevCount
  21. long prevCount = out.getPos();
  22. // 往输出流out中写入String,内容为split对应的类名
  23. Text.writeString(out, split.getClass().getName());
  24. // 获取序列化器Serializer实例serializer
  25. Serializer<T> serializer =
  26. factory.getSerializer((Class<T>) split.getClass());
  27. // 打开serializer,接入输出流out
  28. serializer.open(out);
  29. // 将split序列化到输出流out
  30. serializer.serialize(split);
  31. // 通过输出流out的getPos()方法获取输出流out的当前位置currCount
  32. long currCount = out.getPos();
  33. // 通过split的getLocations()方法,获取位置信息locations
  34. String[] locations = split.getLocations();
  35. if (locations.length > maxBlockLocations) {
  36. LOG.warn("Max block location exceeded for split: "
  37. + split + " splitsize: " + locations.length +
  38. " maxsize: " + maxBlockLocations);
  39. locations = Arrays.copyOf(locations, maxBlockLocations);
  40. }
  41. // 构造split对应的元数据信息,并加入info指定位置,
  42. // offset为当前split在split文件中的起始位置,数据长度为split.getLength(),位置信息为locations
  43. info[i++] =
  44. new JobSplit.SplitMetaInfo(
  45. locations, offset,
  46. split.getLength());
  47. // offset增加当前split已写入数据大小
  48. offset += currCount - prevCount;
  49. }
  50. }
  51. // 返回分片元数据信息SplitMetaInfo数组info
  52. return info;
  53. }

writeNewSplits()方法的逻辑比较清晰,大体如下:

1、根据array的大小,构造同等大小的分片元数据信息SplitMetaInfo数组info,array其实是传入的分片数组;

2、如果array中有数据:

2.1、创建序列化工厂SerializationFactory实例factory;

2.2、获取最大的数据块位置maxBlockLocations,取参数mapreduce.job.max.split.locations,参数未配置默认为10;

2.3、通过输出流out的getPos()方法获取输出流out的当前位置offset;

2.4、遍历数组array中每个元素split:

2.4.1、通过输出流out的getPos()方法获取输出流out的当前位置prevCount;

2.4.2、往输出流out中写入String,内容为split对应的类名;

2.4.3、获取序列化器Serializer实例serializer;

2.4.4、打开serializer,接入输出流out;

2.4.5、将split序列化到输出流out;

2.4.6、通过输出流out的getPos()方法获取输出流out的当前位置currCount;

2.4.7、通过split的getLocations()方法,获取位置信息locations;

2.4.8、确保位置信息locations的长度不能超过maxBlockLocations,超过则截断;

2.4.9、构造split对应的元数据信息,并加入info指定位置,offset为当前split在split文件中的起始位置,数据长度为split.getLength(),位置信息为locations;

2.4.10、offset增加当前split已写入数据大小;

3、返回分片元数据信息SplitMetaInfo数组info。

其中,序列化split对象时,我们以FileSplit为例来分析,其write()方法如下:

[java] view plaincopy
  1. @Override
  2. public void write(DataOutput out) throws IOException {
  3. // 写入文件路径全名
  4. Text.writeString(out, file.toString());
  5. // 写入分片在文件中的起始位置
  6. out.writeLong(start);
  7. // 写入分片在文件中的长度
  8. out.writeLong(length);
  9. }

比较简单,分别写入文件路径全名、分片在文件中的起始位置、分片在文件中的长度三个信息。

综上所述,分片文件job.split文件的内容为:

1、文件头:"SPL"+int类型版本号1;

2、分片类信息:String类型split对应类名;

3、分片数据信息:String类型文件路径全名+Long类型分片在文件中的起始位置+Long类型分片在文件中的长度。

而在最后,构造分片元数据信息时,产生的是JobSplit的静态内部类SplitMetaInfo对象,包括分片位置信息locations、split在split文件中的起始位置offset、分片长度split.getLength()。

下面,我们再看下分片的元数据信息文件是如何产生的,让我们来研究下writeJobSplitMetaInfo()方法,代码如下:

[java] view plaincopy
  1. // 写入作业分片元数据信息
  2. private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,
  3. FsPermission p, int splitMetaInfoVersion,
  4. JobSplit.SplitMetaInfo[] allSplitMetaInfo)
  5. throws IOException {
  6. // write the splits meta-info to a file for the job tracker
  7. // 调用HDFS文件系统FileSystem的create()方法,生成分片元数据信息文件,并获取文件系统数据输出流FSDataOutputStream实例out,
  8. // 对应文件路径为jobSubmitDir/job.splitmetainfo,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID
  9. // 对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--
  10. FSDataOutputStream out =
  11. FileSystem.create(fs, filename, p);
  12. // 写入分片元数据头部信息UTF-8格式的字符串"META-SPL"的字节数组byte[]
  13. out.write(JobSplit.META_SPLIT_FILE_HEADER);
  14. // 写入分片元数据版本号splitMetaInfoVersion,当前为1
  15. WritableUtils.writeVInt(out, splitMetaInfoVersion);
  16. // 写入分片元数据个数,为分片元数据信息SplitMetaInfo数组个数allSplitMetaInfo.length
  17. WritableUtils.writeVInt(out, allSplitMetaInfo.length);
  18. // 遍历分片元数据信息SplitMetaInfo数组allSplitMetaInfo中每个splitMetaInfo,挨个写入输出流
  19. for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
  20. splitMetaInfo.write(out);
  21. }
  22. // 关闭输出流out
  23. out.close();
  24. }

writeJobSplitMetaInfo()方法的主体逻辑也十分清晰,大体如下:

1、调用HDFS文件系统FileSystem的create()方法,生成分片元数据信息文件,并获取文件系统数据输出流FSDataOutputStream实例out,对应文件路径为jobSubmitDir/job.splitmetainfo,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID,对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;

2、写入分片元数据头部信息UTF-8格式的字符串"META-SPL"的字节数组byte[];

3、写入分片元数据版本号splitMetaInfoVersion,当前为1;

4、写入分片元数据个数,为分片元数据信息SplitMetaInfo数组个数allSplitMetaInfo.length;

5、遍历分片元数据信息SplitMetaInfo数组allSplitMetaInfo中每个splitMetaInfo,挨个写入输出流;

6、关闭输出流out。
        我们看下如何序列化JobSplit.SplitMetaInfo,将其写入文件,JobSplit.SplitMetaInfo的write()如下:

[java] view plaincopy
  1. public void write(DataOutput out) throws IOException {
  2. // 将分片位置个数写入分片元数据信息文件
  3. WritableUtils.writeVInt(out, locations.length);
  4. // 遍历位置信息,写入分片元数据信息文件
  5. for (int i = 0; i < locations.length; i++) {
  6. Text.writeString(out, locations[i]);
  7. }
  8. // 写入分片元数据信息的起始位置
  9. WritableUtils.writeVLong(out, startOffset);
  10. // 写入分片大小
  11. WritableUtils.writeVLong(out, inputDataLength);
  12. }

每个分片的元数据信息,包括分片位置个数、分片文件位置、分片元数据信息的起始位置、分片大小等内容。

总结

JobSplitWriter被作业客户端用于写分片相关文件,包括分片数据文件job.split和分片元数据信息文件job.splitmetainfo。分片数据文件job.split存储的主要是每个分片对应的HDFS文件路径,和其在HDFS文件中的起始位置、长度等信息,而分片元数据信息文件job.splitmetainfo存储的则是每个分片在分片数据文件job.split中的起始位置、分片大小等信息。

job.split文件内容:文件头 + 分片 + 分片 + ... + 分片

文件头:"SPL" + 版本号1

分片:分片类 + 分片数据,分片类=String类型split对应类名,分片数据=String类型HDFS文件路径全名+Long类型分片在HDFS文件中的起始位置+Long类型分片在HDFS文件中的长度

job.splitmetainfo文件内容:文件头 + 分片元数据个数 + 分片元数据 + 分片元数据 + ... + 分片元数据

文件头:"META-SPL" + 版本号1

分片元数据个数:分片元数据的个数

分片元数据:分片位置个数+分片位置+在分片文件job.split中的起始位置+分片大小

转载于:https://www.cnblogs.com/jirimutu01/p/5556356.html

MapReduce源码分析之JobSplitWriter相关推荐

  1. MapReduce 源码分析(一)准备阶段

    MapReduce 源码分析 本篇博客根据wordCount代码进行分析底层源码的.以下称它为WC类. package com.henu;import org.apache.hadoop.conf.C ...

  2. MapReduce源码分析_李孟_新浪博客

    job.waitForCompletion(true);进入源码 submit()-> connect();连接,客户端获取服务端的代理对象 connect()->new Cluster( ...

  3. Mapreduce源码分析(一):FileInputFormat切片机制,源码详解

    FileInputFormat切片机制,源码详解 1.InputFormat:抽象类 只有两个抽象方法 public abstract List<InputSplit> getSplits ...

  4. MapReduce源码分析之作业Job状态机解析(一)简介与正常流程浅析

    作业Job状态机维护了MapReduce作业的整个生命周期,即从提交到运行结束的整个过程.Job状态机被封装在JobImpl中,其主要包括14种状态和19种导致状态发生的事件. 作业Job的全部状态维 ...

  5. MapReduce源码分析总结

    http://blog.csdn.net/HEYUTAO007/article/details/5725379 参考: 1 caibinbupt的源代码分析http://caibinbupt.java ...

  6. MapReduce中Client提交Job源码分析

    回顾 在进行submit源码分析之前,先来回顾一下WordCount案例(点击查看WordCount案例).仔细回想一下曾经Client都干了点啥?获取对象-->一通set-->job.w ...

  7. hadoop之MapReduce框架TaskTracker端心跳机制分析(源码分析第六篇)

    1.概述 MapReduce框架中的master/slave心跳机制是整个集群运作的基础,是沟通TaskTracker和JobTracker的桥梁.TaskTracker周期性地调用心跳RPC函数,汇 ...

  8. 《MapReduce 2.0源码分析与编程实战》一第1章 HBase介绍

    本节书摘来异步社区<MapReduce 2.0源码分析与编程实战>一书中的第1章,作者: 王晓华 责编: 陈冀康,更多章节内容可以访问云栖社区"异步社区"公众号查看. ...

  9. 《MapReduce 2.0源码分析与编程实战》一1.5 看,大象也会跳舞

    本节书摘来异步社区<MapReduce 2.0源码分析与编程实战>一书中的第1章,第1.5节,作者: 王晓华 责编: 陈冀康,更多章节内容可以访问云栖社区"异步社区"公 ...

最新文章

  1. 机器学习驱动技术是生物学进步的下一个突破
  2. k8s组件说明:ETCD存储组件
  3. Actuator对于JMX支持
  4. foreach是同步还是异步JAVA,Java中foreach与正常for循环效率对比
  5. 解决Centos 7安装在虚拟机中没有图形界面的问题
  6. Java程序设计语言基础05:Java的类和对象
  7. matlab求半径范围内的点,matlab怎么快速搜索距离某点球形范围内的所有点
  8. ssms 连接 ssis_在SSMS中手动设置SSIS包加密
  9. 博弈论(取石子专题)
  10. 栅栏密码 - Python脚本
  11. ✿2020医疗行业CTF✿多余的音符
  12. JDK8 toMap之key重复报Duplicate key xxxx异常解决
  13. win10装debian 双系统_如何安装win10和linux [ubuntu14]双系统
  14. 西安2020EC游记
  15. cocos2dx-是男人就坚持20s 练手项目
  16. SQL基础操作(3):对表中元进行简单删,改,查的操作【增在2中】
  17. 从兴电子笔试题目小结
  18. 论文画图——eps格式的图
  19. Java中获取class对象
  20. Tomcat优化之JDK优化之熵池策略

热门文章

  1. Matplotlib常见问题总结
  2. CAN总线基础(三)
  3. unity导出fbx模型_ARTBOOK艺书专栏:Fbx导出杂谈
  4. 父类的静态方法能否被子类重写?
  5. 内存泄漏和内存溢出有什么区别
  6. mat opencv 修改roi_OpenCV中如何提取不规则ROI区域
  7. MyBatis之使用JSONObject代替JavaBean优雅返回多表查询结果
  8. dotnet安装包时找不到依赖关系_无法加载文件或程序集'Microsoft.AspNet.TelemetryCorrelation'或其依赖项之一 . 该系统找不到指定的文件...
  9. 怎么屏蔽跳出来的登陆窗口_我是怎么从机构跳出来做兼职的(二)
  10. java 实现二叉树操作