转载自:http://blog.csdn.net/lipeng_bigdata/article/details/53738376

一、综述

HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode、DataNode、DFSClient等众多角色的分工与合作。

首先上一段代码,客户端是如何写文件的:

[java] view plain copy

  1. Configuration conf = new Configuration();
  2. FileSystem fs = FileSystem.get(conf);
  3. Path file = new Path("demo.txt");
  4. FSDataOutputStream outStream = fs.create(file);
  5. out.write("Welcome to HDFS Java API !!!".getBytes("UTF-8"));
  6. outStream.close();

只有简单的6行代码,客户端封装的如此简洁,各组件间的RPC调用、异常处理、容错等均对客户端透明。

总体来说,最简单的HDFS写文件大体流程如下:

1、客户端获取文件系统实例FileSyStem,并通过其create()方法获取文件系统输出流outputStream;

1.1、首先会联系名字节点NameNode,通过ClientProtocol.create()RPC调用,在名字节点上创建文件元数据,并获取文件状态FileStatus;

1.2、通过文件状态FileStatus构造文件系统输出流outputStream;

2、通过文件系统输出流outputStream写入数据;

2.1、首次写入会首先向名字节点申请数据块,名字节点能够掌握集群DataNode整体状况,分配数据块后,连同DataNode列表信息返回给客户端;

2.2、客户端采用流式管道的方式写入数据节点列表中的第一个DataNode,并由列表中的前一个DataNode将数据转发给后面一个DataNode;

2.3、确认数据包由DataNode经过管道依次返回给上游DataNode和客户端;

2.4、写满一个数据块后,向名字节点提交一个数据;

2.5、再次重复2.1-2.4过程;

3、向名字节点提交文件(complete file),即告知名字节点文件已写完,然后关闭文件系统输出流outputStream等释放资源。

可以看出,在不考虑异常等的情况下,上述过程还是比较复杂的。本文,我将着重阐述下HDFS写数据时,客户端是如何实现的,关于NameNode、DataNode等的配合等,后续文章将陆续推出,敬请关注!

二、实现分析

我们将带着以下问题来分析客户端写入数据过程:

1、如何获取数据输出流?

2、如何通过数据输出流写入数据?

3、数据输出流关闭时都做了什么?

4、如果发生异常怎么办?即如何容错?

(一)如何获取数据输出流?

HDFS客户端获取数据流是一个复杂的过程,流程图如下:

以DistributedFileSystem为例,create()是其入口方法,DistributedFileSystem内部封装了一个DFS的客户端,如下:

[java] view plain copy

  1. DFSClient dfs;

在DistributedFileSystem的初始化方法initialize()中,会构造这个文件系统客户端,如下:

[java] view plain copy

  1. this.dfs = new DFSClient(uri, conf, statistics);

而create()方法就是通过这个文件系统客户端dfs获取数据输出流的,如下:

[java] view plain copy

  1. @Override
  2. public FSDataOutputStream create(final Path f, final FsPermission permission,
  3. final EnumSet<CreateFlag> cflags, final int bufferSize,
  4. final short replication, final long blockSize, final Progressable progress,
  5. final ChecksumOpt checksumOpt) throws IOException {
  6. statistics.incrementWriteOps(1);
  7. Path absF = fixRelativePart(f);
  8. return new FileSystemLinkResolver<FSDataOutputStream>() {
  9. /*
  10. * 创建文件系统数据输出流
  11. */
  12. @Override
  13. public FSDataOutputStream doCall(final Path p)
  14. throws IOException, UnresolvedLinkException {
  15. // 调用create()方法创建文件,并获取文件系统输出流
  16. final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
  17. cflags, replication, blockSize, progress, bufferSize,
  18. checksumOpt);
  19. return dfs.createWrappedOutputStream(dfsos, statistics);
  20. }
  21. @Override
  22. public FSDataOutputStream next(final FileSystem fs, final Path p)
  23. throws IOException {
  24. return fs.create(p, permission, cflags, bufferSize,
  25. replication, blockSize, progress, checksumOpt);
  26. }
  27. }.resolve(this, absF);
  28. }

FileSystemLinkResolver是一个文件系统链接解析器(抽象类),我们待会再分析它,这里只要知道,该抽象类实例化后会通过resolve()方法--doCall()方法得到数据输出流即可。接着往下DFSClient的create()方法,省略部分代码,如下:

[java] view plain copy

  1. // 为create构建一个数据输出流
  2. final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
  3. src, masked, flag, createParent, replication, blockSize, progress,
  4. buffersize, dfsClientConf.createChecksum(checksumOpt),
  5. getFavoredNodesStr(favoredNodes));
  6. // 开启文件租约
  7. beginFileLease(result.getFileId(), result);
  8. return result;

实际上,它又通过DFSOutputStream的newStreamForCreate()方法来获取数据输出流,并开启文件租约。租约的内容我们后续再讲,继续看下如何获取文件输出流的,如下:

[java] view plain copy

  1. /**
  2. * 为创建文件构造一个新的输出流
  3. */
  4. static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
  5. FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
  6. short replication, long blockSize, Progressable progress, int buffersize,
  7. DataChecksum checksum, String[] favoredNodes) throws IOException {
  8. TraceScope scope =
  9. dfsClient.getPathTraceScope("newStreamForCreate", src);
  10. try {
  11. HdfsFileStatus stat = null;
  12. // Retry the create if we get a RetryStartFileException up to a maximum
  13. // number of times
  14. boolean shouldRetry = true;
  15. int retryCount = CREATE_RETRY_COUNT;
  16. while (shouldRetry) {
  17. shouldRetry = false;
  18. try {
  19. // 首先,通过DFSClient中nameNode的Create()方法,在HDFS文件系统名字节点中创建一个文件,并返回文件状态
  20. stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
  21. new EnumSetWritable<CreateFlag>(flag), createParent, replication,
  22. blockSize, SUPPORTED_CRYPTO_VERSIONS);
  23. break;
  24. catch (RemoteException re) {
  25. IOException e = re.unwrapRemoteException(
  26. AccessControlException.class,
  27. DSQuotaExceededException.class,
  28. FileAlreadyExistsException.class,
  29. FileNotFoundException.class,
  30. ParentNotDirectoryException.class,
  31. NSQuotaExceededException.class,
  32. RetryStartFileException.class,
  33. SafeModeException.class,
  34. UnresolvedPathException.class,
  35. SnapshotAccessControlException.class,
  36. UnknownCryptoProtocolVersionException.class);
  37. if (e instanceof RetryStartFileException) {
  38. if (retryCount > 0) {
  39. shouldRetry = true;
  40. retryCount--;
  41. else {
  42. throw new IOException("Too many retries because of encryption" +
  43. " zone operations", e);
  44. }
  45. else {
  46. throw e;
  47. }
  48. }
  49. }
  50. Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
  51. // 构造一个数据输出流
  52. final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
  53. flag, progress, checksum, favoredNodes);
  54. // 启动数据输出流
  55. out.start();
  56. return out;
  57. finally {
  58. scope.close();
  59. }
  60. }

大体可以分为三步:

1、首先,通过DFSClient中nameNode的Create()方法,在HDFS文件系统名字节点中创建一个文件,并返回文件状态HdfsFileStatus;

2、构造一个数据输出流;

3、启动数据输出流。

上述连接NameNode节点创建文件的过程中,如果发生瞬时错误,会充分利用重试机制,增加系统容错性。DFSClient中nameNode的Create()方法,实际上是调用的是客户端与名字节点间的RPC--ClientProtocol的create()方法,该方法的作用即是在NameNode上创建一个空文件,并返回文件状态。文件状态主要包括以下信息:

[java] view plain copy

  1. // 文件路径
  2. private final byte[] path;  // local name of the inode that's encoded in java UTF8
  3. // 符号连接
  4. private final byte[] symlink; // symlink target encoded in java UTF8 or null
  5. private final long length;// 文件长度
  6. private final boolean isdir;// 是否为目录
  7. private final short block_replication;// 数据块副本数
  8. private final long blocksize;// 数据块大小
  9. private final long modification_time;// 修改时间
  10. private final long access_time;// 访问时间
  11. private final FsPermission permission;// 权限
  12. private final String owner;// 文件所有者
  13. private final String group;// 文件所属组
  14. private final long fileId;// 文件ID

继续看如何构造一个数据输出流,实际上它是通过构造DFSOutputStream实例获取的,而DFSOutputStream的构造方法如下:

[java] view plain copy

  1. /** Construct a new output stream for creating a file. */
  2. private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
  3. EnumSet<CreateFlag> flag, Progressable progress,
  4. DataChecksum checksum, String[] favoredNodes) throws IOException {
  5. this(dfsClient, src, progress, stat, checksum);
  6. this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
  7. // 计算数据包块大小
  8. computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
  9. // 构造数据流对象
  10. streamer = new DataStreamer(stat, null);
  11. if (favoredNodes != null && favoredNodes.length != 0) {
  12. streamer.setFavoredNodes(favoredNodes);
  13. }
  14. }

首先计算数据包块大小,然后构造数据流对象,后续就依靠这个数据流对象来通过管道发送流式数据。接下来便是启动数据输出流,如下:

[java] view plain copy

  1. private synchronized void start() {
  2. streamer.start();
  3. }

很简单,实际上也就是启动数据流对象,通过这个数据流对象实现数据的发送。

中间为什么会有计算数据包块大小这一步呢?原来,数据的发送是通过一个个数据包发送出去的,而不是通过数据块发送的。设想下,如果按照一个数据块(默认128M)大小发送数据,合理吗?至于数据包大小是如何确定的,我们后续再讲。

(二)如何通过数据输出流写入数据?

下面,该看看如何通过数据输出流写入数据了。要解决这个问题,首先分析下DFSOutputStream和DataStreamer是什么。

1、DFSOutputStream

DFSOutputStream是分布式文件系统输出流,它内部封装了两个队列:发送数据包队列和确认数据包队列,如下:

[java] view plain copy

  1. // 发送数据包队列
  2. private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>();
  3. // 确认数据包队列
  4. private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>();

客户端写入的数据,会addLast入发送数据包队列dataQueue,然后交给DataStreamer处理。

2、DataStreamer

DataStreamer是一个后台工作线程,它负责在数据流管道中往DataNode发送数据包。它从NameNode申请获取一个新的数据块ID和数据块位置,然后开始往DataNode的管道写入流式数据包。每个数据包都有一个序列号sequence number。当一个数据块所有的数据包被发送出去,并且每个数据包的确认信息acks被接收到的话,DataStreamer关闭当前数据块,然后再向NameNode申请下一个数据块。

所以,才会有上述发送数据包和确认数据包这两个队列。

DataStreamer内部有很多变量,大体如下:

[java] view plain copy

  1. // streamer关闭标志位
  2. private volatile boolean streamerClosed = false;
  3. // 扩展块,它的长度是已经确认ack的bytes大小
  4. private ExtendedBlock block; // its length is number of bytes acked
  5. private Token<BlockTokenIdentifier> accessToken;
  6. // 数据输出流
  7. private DataOutputStream blockStream;
  8. // 数据输入流:即回复流
  9. private DataInputStream blockReplyStream;
  10. // 响应处理器
  11. private ResponseProcessor response = null;
  12. // 当前块的数据块列表
  13. private volatile DatanodeInfo[] nodes = null; // list of targets for current block
  14. // 存储类型
  15. private volatile StorageType[] storageTypes = null;
  16. // 存储ID
  17. private volatile String[] storageIDs = null;
  18. // 需要排除的节点
  19. private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
  20. CacheBuilder.newBuilder()
  21. .expireAfterWrite(
  22. dfsClient.getConf().excludedNodesCacheExpiry,
  23. TimeUnit.MILLISECONDS)
  24. .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
  25. @Override
  26. public void onRemoval(
  27. RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
  28. DFSClient.LOG.info("Removing node " +
  29. notification.getKey() + " from the excluded nodes list");
  30. }
  31. })
  32. .build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
  33. @Override
  34. public DatanodeInfo load(DatanodeInfo key) throws Exception {
  35. return key;
  36. }
  37. });
  38. // 优先节点
  39. private String[] favoredNodes;
  40. // 是否存在错误
  41. volatile boolean hasError = false;
  42. volatile int errorIndex = -1;
  43. // Restarting node index
  44. // 从哪个节点重试的索引
  45. AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
  46. private long restartDeadline = 0; // Deadline of DN restart
  47. // 当前数据块构造阶段
  48. private BlockConstructionStage stage;  // block construction stage
  49. // 已发送数据大小
  50. private long bytesSent = 0; // number of bytes that've been sent
  51. private final boolean isLazyPersistFile;
  52. /** Nodes have been used in the pipeline before and have failed. */
  53. private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
  54. /** The last ack sequence number before pipeline failure. */
  55. // 管道pipeline失败前的最后一个确认包序列号
  56. private long lastAckedSeqnoBeforeFailure = -1;
  57. // 管道恢复次数
  58. private int pipelineRecoveryCount = 0;
  59. /** Has the current block been hflushed? */
  60. // 当前数据块是否已被Hflushed
  61. private boolean isHflushed = false;
  62. /** Append on an existing block? */
  63. // 是否需要在现有块上append
  64. private final boolean isAppend;

有很多比较简单,不再赘述。这里只讲解几个比较重要的:

1、BlockConstructionStage stage

当前数据块构造阶段。针对create()这种写入 来说,开始时默认是BlockConstructionStage.PIPELINE_SETUP_CREATE,即管道初始化时需要向NameNode申请数据块及所在数据节点的状态,这个很容易理解。有了数据块和其所在数据节点所在列表,才能形成管道列表不是?在数据流传输过程中,即一个数据块写入的过程中,虽然有多次数据包写入,但状态始终为DATA_STREAMING,即正在流式写入的阶段。而当发生异常时,则是PIPELINE_SETUP_STREAMING_RECOVERY状态,即需要从流式数据中进行恢复,如果一个数据块写满,则会进入下一个周期,PIPELINE_SETUP_CREATE->DATA_STREAMING,最后数据全部写完后,状态会变成PIPELINE_CLOSE,并且如果发生异常的话,会有一个特殊状态对应,即PIPELINE_CLOSE_RECOVERY。而append开始时则是对应的状态PIPELINE_SETUP_APPEND及异常状态PIPELINE_SETUP_APPEND_RECOVERY,其它则一致。

2、volatile boolean hasError = false

这个状态位用来标记数据写入过程中,是否存在错误,方便进行容错。

3、ResponseProcessor response

响应处理器。这个也是后台工作线程,它会处理来自DataNode回复流中的确认包,确认数据是否发送成功,如果成功,将确认包从确认数据包队列中移除,否则进行容错处理。

create()模式下的DataStreamer构造比较简单,如下:

[java] view plain copy

  1. private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
  2. isAppend = false;
  3. isLazyPersistFile = isLazyPersist(stat);
  4. this.block = block;
  5. stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
  6. }

isAppend设置为false,即不是append写入,BlockConstructionStage默认为PIPELINE_SETUP_CREATE,即需要向NameNode写入数据块。

我们首先看下DataStreamer是如何发送数据的。上面讲到过,DFSOutputStream中包括两个队列:发送数据包队列和确认数据包队列。这类似于两个生产者消--费者模型。针对发送数据包队列,外部写入者为生产者,DataStreamer为消费者。外部持续写入数据至发送数据包队列,DataStreamer则从中消费数据,判断是否需要申请数据块,然后写入数据节点流式管道。而确认数据包队列,DataStreamer为生产者,ResponseProcessor为消费者。首先,确认数据包队列数据的产生,是DataStreamer发送数据给DataNode后,从发送数据包队列挪过来的,而当ResponseProcessor线程确认接收到数据节点的ack确认包后,再从数据确认队列中删除。

关于ResponseProcessor线程,稍后再讲。

数据写入过程之DataStreamer

首先看DataStreamer的run()方法,它会在数据流没有关闭,且dfs客户端正在运行的情况下,一直循环,循环内处理的大体流程如下:

1、如果遇到一个错误(hasErro),且响应器尚未关闭,关闭响应器,使之join等待;

2、如果有DataNode相关IO错误,先预先处理,初始化一些管道和流的信息,并决定外部是否等待,等待意即可以进行容错处理,不等待则数目错误比较严重,无法进行容错处理:这里还判断了errorIndex标志位和restartingNodeIndex的大小,意思是是否是由某个具体数据节点引起的错误,如果是的话,这种错误理论上是可以处理的;

3、没有数据时,等待一个数据包发送:等待的条件是:当前流没有关闭(!streamerClosed)、没有错误(hasError)、dfs客户端正在 运行(dfsClient.clientRunning )、dataQueue队列大小为0,且当前阶段不是DATA_STREAMING,或者在需要sleep(doSleep)或者上次发包距离本次时间未超过阈值的情况下为DATA_STREAMING

意思是各种标记为正常,数据流处于正常发送的过程或者可控的非正常发送过程中,可控表现在状态位doSleep,即上传错误检查中认为理论上可以进行修复,但是需要sleep已完成recovery的初始化,或者距离上次发送未超过时间的阈值等。

4、如果数据流关闭、存在错误、客户端正常运行标志位异常时,执行continue:这个应该是对容错等的处理,让程序及时响应错误;

5、获取将要发送的数据包:

如果数据发送队列为空,构造一个心跳包;否则,取出队列中第一个元素,即待发送数据包。

6、如果当前阶段是PIPELINE_SETUP_CREATE,申请数据块,设置pipeline,初始化数据流:append的setup阶段则是通过setupPipelineForAppendOrRecovery()方法完成的,并同样会初始化数据流;

7、获取数据块中的上次数据位置lastByteOffsetInBlock,如果超过数据块大小,报错;

8、 如果是数据块的最后一个包:等待所有的数据包被确认,即等待datanodes的确认包acks,如果数据流关闭,或者数据节点IO存在错误,或者客户端不再正常运行,continue,设置阶段为pipeline关闭

9、发送数据包:将数据包从dataQueue队列挪至ackQueue队列,通知dataQueue的所有等待者,将数据写入远端的DataNode节点,并flush,如果发生异常,尝试标记主要的数据节点错误,方便容错处理;

10、更新已发送数据大小:可以看出,数据包中存储了其在数据块中的位置LastByteOffsetBlock,也就标记了已经发送数据的总大小;

11、数据块写满了吗?如果是最后一个数据块,等待确认包,调用endBlock()方法结束一个数据块 ;

如果上述流程发生错误,hasError标志位设置为true,并且如果不是一个DataNode引起的原因,流关闭标志设置为true。

最后,没有数据需要发送,或者发生致命错误的情况下,调用closeInternal()方法关闭内部资源。

客户端实现PFSPacket

一、简介

HDFS在数据传输过程中,针对数据块Block,不是整个block进行传输的,而是将block切分成一个个的数据包进行传输。而DFSPacket就是HDFS数据传输过程中对数据包的抽象。

二、实现

HDFS客户端在往DataNodes节点写数据时,会以数据包packet的形式写入,且每个数据包包含一个包头,n个连续的校验和数据块checksum chunks和n个连续的实际数据块 actual data chunks,每个校验和数据块对应一个实际数据块,被用来做数据校验,防止数据传输过程中因网络原因等发生的数据丢包。

DFSPacket内数据的逻辑组织形式如下:

DFSPacket的物理实现如下:

FSPacket在内部持有一个数据缓冲区buf,类型为byte[]

buf用来按顺序存储三类数据,header、checksum chunks、data chunks,分别对应上面的header区域、cccc…cccc区域和dddd…dddd区域

header、checksum chunks和data chunks都是提前分配好的,灰色代表已经写入数据区域,白色代表可以写入数据区域

Header是数据包的头部,它是在后续数据写完后才添加到数据包的头部。因为Header中包含了数据长度等信息,需要在数据写完后进行计算,故头部信息最后生成。Header内部封装了一个Protobuf对象,持有数据在Block中的位置offsetInBlock、数据包序列号seqno、是否为Block的最后一个数据包lastPacketInBlock、数据长度dataLen等信息,Header在写入DFSPacket中时,会在序列化Protobuf对象的前面追加一个数据长度大小和protobuf序列化大小,方便DataNode等进行解析。

DFSPacket内部有四个指针,分别为

1、checksumStart:标记数据校验和区域起始位置

2、checksumPos:标记数据校验和区域当前写入位置

3、dataStart:标记真实数据区域起始位置

4、dataPos:标记真实数据区域当前写入位置

数据包是按照一组组数据块写入的,先写校验和数据块,再写真实数据块,然后再写下一组校验和数据块和真实数据块,最后再写上header头部信息,至此整个数据包写完。

每个DFSPacket都对应一个序列号seqno,还存储了数据在数据块中的位置offsetInBlock、数据包中的数据块(chunks)数量numChunks、数据包中的最大数据块数maxChunks、是否为block中最后一个数据包lastPacketInBlock等信息。

三、源码分析

(一)初始化

DFSPacket的初始化分为以下几步:

1、首先计算缓冲区数据大小

1.1、首先,计算writePacketSize,即写包大小

这个是系统配置参数决定的。该大小默认是认为包含头部信息的,意即客户端自己指定的数据包大小,但是实际大小还需要后续计算得到。writePacketSize取自参数dfs.client-write-packet-size,表示客户端写入数据时数据包大小,默认为64*1024,即64KB

1.2、其次,计算bytesPerChecksum,即每多少数据计算校验和

这个是通过DataChecksum实例checksum的getBytesPerChecksum()方法得到的,如下:

[java] view plain copy

  1. public int getBytesPerChecksum() {
  2. return bytesPerChecksum;
  3. }

而DataChecksum构造时通过校验和选项ChecksumOpt决定每个数据校验和块大小bytesPerChecksum,如下:

[java] view plain copy

  1. DataChecksum dataChecksum = DataChecksum.newDataChecksum(
  2. myOpt.getChecksumType(),
  3. myOpt.getBytesPerChecksum());

ChecksumOpt中的ChecksumType取自参数dfs.checksum.type,默认为CRC32C,每个需要校验和的数据块大小bytesPerChecksum取自参数dfs.bytes-per-checksum,默认为512B。

1.3、计算数据包body大小

bodySize = writePacketSize- PacketHeader.PKT_MAX_HEADER_LEN

最大头部PacketHeader.PKT_MAX_HEADER_LEN大小是一个合理的预估值,它是通过模拟构造一个protobuf对象,然后序列化成byte[]数组后,再加上一个固定的大小(Ints.BYTES + Shorts.BYTES);

Int所占区域用来存放数据包实际数据(含校验和,即除头部区域外的)大小,Short所占区域用来存放header protobuf对象序列化的大小,头部所占区域剩余的地方就是存放头部信息byte[];

1.4、计算chunkSize大小

chunkSize = bytesPerChecksum + getChecksumSize(),getChecksumSize()是获取校验和的大小,chunkSize意思是包含数据校验和块、真实数据块的大小

1.5、计算每个包能包含的块数

chunkSize=Math.max(bodySize/chunkSize, 1),最小为1;

1.6、计算缓冲区内数据大小:

packetSize = chunkSize*chunksPerPacket

chunkSize表示块大小,chunksPerPacket表示每个数据包由多少数据块

1.7、实际申请的缓冲区大小还要加上头部Header的最大大小

bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize

2、申请缓存区数组

3、构造DFSPacket实例,确定各指针位置、其它指标等

2和3代码如下:

[java] view plain copy

  1. /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
  2. /**
  3. * 创建一个数据包
  4. */
  5. private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
  6. long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
  7. final byte[] buf;
  8. final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
  9. try {
  10. buf = byteArrayManager.newByteArray(bufferSize);
  11. catch (InterruptedException ie) {
  12. final InterruptedIOException iioe = new InterruptedIOException(
  13. "seqno=" + seqno);
  14. iioe.initCause(ie);
  15. throw iioe;
  16. }
  17. return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
  18. getChecksumSize(), lastPacketInBlock);
  19. }

(二)写数据至缓冲区

写数据的过程:
                 1、 先写入一个校验和块;

2、 再写入一个真实数据块;

3、 块数增1;

4、 重复1-3,写入后续数据块组;

写数据是在DFSOutputStream中触发的,代码如下:

[java] view plain copy

  1. // 写入校验和
  2. currentPacket.writeChecksum(checksum, ckoff, cklen);
  3. // 写入数据
  4. currentPacket.writeData(b, offset, len);
  5. // 增加块数目
  6. currentPacket.incNumChunks();
  7. // 迭代累加bytesCurBlock
  8. bytesCurBlock += len;

DataPacket的实现也比较简单,代码如下(有注释):

[java] view plain copy

  1. /**
  2. * Write data to this packet.
  3. * 往包内写入数据
  4. *
  5. * @param inarray input array of data
  6. * @param off the offset of data to write
  7. * @param len the length of data to write
  8. * @throws ClosedChannelException
  9. */
  10. synchronized void writeData(byte[] inarray, int off, int len)
  11. throws ClosedChannelException {
  12. // 检测缓冲区
  13. checkBuffer();
  14. // 检测数据当前位置后如果 写入len个字节,是否会超过缓冲区大小
  15. if (dataPos + len > buf.length) {
  16. throw new BufferOverflowException();
  17. }
  18. // 数据拷贝:从数据当前位置处起开始存放len个字节
  19. System.arraycopy(inarray, off, buf, dataPos, len);
  20. // 数据当前位置累加len,指针向后移动
  21. dataPos += len;
  22. }
  23. /**
  24. * Write checksums to this packet
  25. * 往包内写入校验和
  26. *
  27. * @param inarray input array of checksums
  28. * @param off the offset of checksums to write
  29. * @param len the length of checksums to write
  30. * @throws ClosedChannelException
  31. */
  32. synchronized void writeChecksum(byte[] inarray, int off, int len)
  33. throws ClosedChannelException {
  34. // 检测缓冲区
  35. checkBuffer();
  36. // 校验数据校验和长度
  37. if (len == 0) {
  38. return;
  39. }
  40. // 根据当前校验和位置和即将写入的数据大小,判断是否超过数据起始位置处,即是否越界
  41. if (checksumPos + len > dataStart) {
  42. throw new BufferOverflowException();
  43. }
  44. // 数据拷贝:从校验和当前位置处起开始存放len个字节
  45. System.arraycopy(inarray, off, buf, checksumPos, len);
  46. // 数据校验和当前位置累加len
  47. checksumPos += len;
  48. }
  49. /**
  50. * increase the number of chunks by one
  51. * 增加数据块(chunk)数目
  52. */
  53. synchronized void incNumChunks(){
  54. numChunks++;
  55. }

(三)缓冲区数据flush到输出流

发送数据过程:

1、 计算数据包的数据长度;

2、 生成头部header信息:一个protobuf对象;

3、 整理缓冲区,去除校验和块区域和真实数据块区域间的空隙;

4、 添加头部信息到缓冲区:从校验和块区域起始往前计算头部信息的起始位置;

5、 将缓冲区数据写入到输出流。

逻辑比较简单,代码如下:

[java] view plain copy

  1. /**
  2. * Write the full packet, including the header, to the given output stream.
  3. * 将整个数据包写入到指定流,包含头部header
  4. *
  5. * @param stm
  6. * @throws IOException
  7. */
  8. synchronized void writeTo(DataOutputStream stm) throws IOException {
  9. // 检测缓冲区
  10. checkBuffer();
  11. // 计算数据长度
  12. final int dataLen = dataPos - dataStart;
  13. // 计算校验和长度
  14. final int checksumLen = checksumPos - checksumStart;
  15. // 计算整个包的数据长度(数据长度+校验和长度+固定长度4)
  16. final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
  17. // 构造数据包包头信息(protobuf对象)
  18. PacketHeader header = new PacketHeader(
  19. pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
  20. if (checksumPos != dataStart) {// 如果校验和数据当前位置不等于数据起始处,挪动校验和数据以填补空白
  21. // 这个可能在最后一个数据包或者一个hflush/hsyn调用时发生
  22. // Move the checksum to cover the gap. This can happen for the last
  23. // packet or during an hflush/hsync call.
  24. System.arraycopy(buf, checksumStart, buf,
  25. dataStart - checksumLen , checksumLen);
  26. // 重置checksumPos、checksumStart
  27. checksumPos = dataStart;
  28. checksumStart = checksumPos - checksumLen;
  29. }
  30. // 计算header的起始位置:数据块校验和起始处减去序列化后的头部大小
  31. final int headerStart = checksumStart - header.getSerializedSize();
  32. // 做一些必要的确保
  33. assert checksumStart + 1 >= header.getSerializedSize();
  34. assert headerStart >= 0;
  35. assert headerStart + header.getSerializedSize() == checksumStart;
  36. // Copy the header data into the buffer immediately preceding the checksum
  37. // data.
  38. // 将header数据写入缓冲区。header是用protobuf序列化的
  39. System.arraycopy(header.getBytes(), 0, buf, headerStart,
  40. header.getSerializedSize());
  41. // corrupt the data for testing.
  42. // 测试用
  43. if (DFSClientFaultInjector.get().corruptPacket()) {
  44. buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
  45. }
  46. // Write the now contiguous full packet to the output stream.
  47. // 写入当前整个连续的packet至输出流
  48. // 从header起始处,写入长度为头部大小、校验和长度、数据长度的总和
  49. stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
  50. // undo corruption.
  51. // 测试用
  52. if (DFSClientFaultInjector.get().uncorruptPacket()) {
  53. buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
  54. }
  55. }

(四)心跳包

如果长时间没有数据传输,在输出流未关闭的情况下,客户端会发送心跳包给数据节点,心跳包是DataPacket的一种特殊实现,它通过数据包序列号为-1来进行特殊标识,如下:

[java] view plain copy

  1. public static final long HEART_BEAT_SEQNO = -1L;

[java] view plain copy

  1. /**
  2. * Check if this packet is a heart beat packet
  3. * 判断该包释放为心跳包
  4. *
  5. * @return true if the sequence number is HEART_BEAT_SEQNO
  6. */
  7. boolean isHeartbeatPacket() {
  8. / 心跳包的序列号均为-1
  9. return seqno == HEART_BEAT_SEQNO;
  10. }

而心跳包的构造如下:

[java] view plain copy

  1. /**
  2. * For heartbeat packets, create buffer directly by new byte[]
  3. * since heartbeats should not be blocked.
  4. */
  5. private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
  6. final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
  7. return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO,
  8. getChecksumSize(), false);
  9. }

Hhadoop-2.7.0中HDFS写文件源码分析相关推荐

  1. Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

    一.综述 HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作. 首先上一段代码,客户端是如何写文件的: ...

  2. php中的setinc,thinkphp3.2.0中setInc方法的源码分析

    下面为大家分享一篇thinkphp3.2.0 setInc方法 源码全面解析,具有很好的参考价值,希望对大家有所帮助. 我们先来看一下setInc的官方示例: 需要一个字段和一个自增的值(默认为1) ...

  3. suricata中DPDK收发包源码分析2

    <suricata中DPDK收发包源码分析1>中分析了整体的DPDK收发包框架代码,今天我们继续来深入了解一下一些细节方面的问题. 目录 Q1:收发包线程模式在代码中是怎样确定的? Q2: ...

  4. 【kafka】Kafka中的动态配置源码分析

    1.概述 2.源码分析 Broker启动加载动态配置 KafkaServer.startup 启动加载动态配置总流程 2.1 动态配置初始化 config.dynamicConfig.initiali ...

  5. MapReduce中Client提交Job源码分析

    回顾 在进行submit源码分析之前,先来回顾一下WordCount案例(点击查看WordCount案例).仔细回想一下曾经Client都干了点啥?获取对象-->一通set-->job.w ...

  6. spring boot实战(第六篇)加载application资源文件源码分析

    前言 在上一篇中了解了spring配置资源的加载过程,本篇在此基础上学习spring boot如何默认加载application.xml等文件信息的. ConfigFileApplicationLis ...

  7. spark读取文件源码分析-3

    本篇是spark read一个parquet源码分析的第三篇,这一篇主要介绍spark的默认的partition的设置逻辑,当然,这一篇实际上算不上源码分析了 第一篇 第二篇 1 . userProf ...

  8. spark读取文件源码分析-2

    文章目录 1. job1产生时机源码分析 1. DataSoure.getOrInferFileFormatSchema() 2. ParquetFileFormat.inferSchema 1. 简 ...

  9. spark读取文件源码分析-1

    文章目录 1. 问题背景 2. 测试代码 3. 生成的DAG图 1. job0 2. job1 4. job0 产生的时机源码分析 1. 调用DataFrameReader.load,DataFram ...

最新文章

  1. [ 懒人神器 ] —— OO一键build:.zip - .jar
  2. P3242 [HNOI2015] 接水果(整体二分、扫描线)
  3. 渗透测试 已学课时 1 个_我14岁上创业课时学到的东西
  4. pandas 保存数据到excel,csv
  5. ajax请求出错_学习笔记:Ajax总结
  6. matlab滤波操作实例,matlab信号滤波相关总结与实例
  7. 大数据应用案例---用户画像与精准营销
  8. linux php漏洞扫描工具,TPScan Thinkphp漏洞扫描器 命令执行
  9. Docker基础教程
  10. UA PHYS515 电磁理论II 静电场问题7 柱坐标系中的Laplace方程与Bessel函数
  11. 摄氏度和开氏度的换算_k与摄氏度的换算(摄氏度与开氏度换算)
  12. 基本的ps快捷键(图文)
  13. Mac 环境endnote 各种问题解决方法和word各种技巧汇总
  14. 怎样才能演示正弦和余弦的相互变换
  15. python循环n次_如何使循环重复n次-Python 3
  16. Java_String_Arrays_Character_BigDecimal_Calendar_Math_System
  17. 蓝牙 aptx android,没错,现在蓝牙耳机可以开始谈音质了 高通aptX HD SONY LDAC
  18. 【数字工厂】通信设备制造业“数字工厂”解决方案浅析
  19. 我所理解的高通UEFI之display的流程和移植
  20. 知识回顾:什么是封装?封装的作用?如何封装?

热门文章

  1. Excel转PDF最简单的方法
  2. python运行星空_用Python显示真实的星空
  3. 早餐店实用营销方案,小伙只用四招,6个月就赚了20万!
  4. 爱租房/所有api接口的实现
  5. C++格式化输出,文本文件操作,二进制文件操作
  6. UWB 超带宽寻迹定位模块——STM32设计部分
  7. UEFI开发探索34 – Option ROM前传1
  8. java tika 解析pdf_Tika解析word文件
  9. 唯品会2015校园招聘技术岗附加题解答
  10. 从人口迁移数据来看,哪个省复工最快?