在客户端写hdfs文件的过程中,其会将数据以packet包的形式向DataNode发送,DataNode在接收到这个packet包时,会进行将该packet写入本地磁盘,之后便向数据流管道中的下游数据节点继续发送该数据包;并会接收来自下游数据节点的数据包确认消息。这个确认消息会逆向的通过数据流管道送回到客户端client端。接下来详细分析一下在客户端写过程中,DataNode上所进行的操作流程。

  • 基本的客户端写流程已经在上篇博文中给出:Hdfs 客户端写过程 源码解析
  • DataNode上的用于响应流式接口请求的服务也在博文中给出:DataNode DataXceiverServer readBlock详解

在客户端写流程中,在其通过nextBlockOutputStream()获取到NameNode所分配用于存储该block的DataNode信息后,其会建立到数据流管道中第一个DataNode的输出流;并向其发送数据块写入的操作指令:new Sender(out).writeBlock();接着DataNode上的DataXceiverServer将会构造一个DataXceiver线程对象用于响应该写操作请求;DataXceiver会解析其流式请求的操作符(此处对应WRITE_BLOCK),并调用DataXceiver.writeBlock()方法响应这个请求。接下来逐步分析DataXceiver.writeBlock()响应方法。

1、DataXceiver.writeBlock()的基本流程:

DataXceiver.writeBlock()方法的具体分析如下:

1、检查、设置对应的参数变量

// 是否是datannode发起的请求
final boolean isDatanode = clientname.length() == 0;
// 是否是客户端发起的请求
final boolean isClient = !isDatanode;
// 是否是数据块的复制操作
final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW|| stage == BlockConstructionStage.TRANSFER_FINALIZED;// reply to upstream datanode or client
// 和上游的数据节点或者客户端交互的输出流,用于发送响应数据
final DataOutputStream replyOut = getBufferedOutputStream();    

2、定义用于上下游节点的输入和输出流,其会在后续中进行对应的构造初始化

DataOutputStream mirrorOut = null;  // stream to next target  下游节点的输出流
DataInputStream mirrorIn = null;    // reply from next target 下游节点的输入流
Socket mirrorSock = null;           // socket to next target  下一个节点的Socket
String mirrorNode = null;           // the name:port of next target 下一个节点的名称:端口
String firstBadLink = "";           // first datanode that failed in connection setup 管道中第一恶坏的节点

DataNode与数据流管道中的上游节点通信包括输入流in和输出流replyOut,与数据流管道中的下游节点通信包括输入流mirrorIn和输出流mirrorOut,其共同构成当前节点的输入输出流如下:

3、构造BlockReceiver对象,并将其用来接收数据块信息

// open a block receiver
blockReceiver = new BlockReceiver(block, storageType, in,peer.getRemoteAddressString(),peer.getLocalAddressString(),stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,clientname, srcDataNode, datanode, requestedChecksum,cachingStrategy, allowLazyPersist, pinning);

4、建立到下游节点的socket连接,创建对应的输入流mirrorIn和输出流mirrorOut,并向下游发送数据块写入操作请求new Sender(mirrorOut).writeBlock();并从mirrorIn中解析来自下游节点的响应确认并记录相应的确认状态;并通过replyOut流向上游节点返回请求响应确认信息。

// 如果下一个节点不为空
if (targets.length > 0) {InetSocketAddress mirrorTarget = null;// Connect to backup machine// 从target节点中获取下一个节点。mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);mirrorTarget = NetUtils.createSocketAddr(mirrorNode);mirrorSock = datanode.newSocket();try {// .........// 连接到下一个datanodeNetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);mirrorSock.setSoTimeout(timeoutValue);mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);// .........// 构造下游节点的输出流mirrorOut,用于往下游节点写数据流// 构造下游节点的输入流mirrorIn,用于接收来自下游节点的相应信息mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,HdfsConstants.SMALL_BUFFER_SIZE));mirrorIn = new DataInputStream(unbufMirrorIn);// Do not propagate allowLazyPersist to downstream DataNodes.// 构造了Sender对象,向下游节点发送writeBlock操作指令new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],blockToken, clientname, targets, targetStorageTypes, srcDataNode,stage, pipelineSize, minBytesRcvd, maxBytesRcvd,latestGenerationStamp, requestedChecksum, cachingStrategy, false);// flush数据流mirrorOut.flush();// read connect ack (only for clients, not for replication req)// 从mirrorIn解析来自下游节点的确认请求信息  if (isClient) {BlockOpResponseProto connectAck =BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));mirrorInStatus = connectAck.getStatus();firstBadLink = connectAck.getFirstBadLink();if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {LOG.info("Datanode " + targets.length +" got response for connect ack " +" from downstream datanode with firstbadlink as " +firstBadLink);}}} catch (IOException e) {// 异常, 关闭输入输出流// .........IOUtils.closeStream(mirrorOut);mirrorOut = null;IOUtils.closeStream(mirrorIn);mirrorIn = null;IOUtils.closeSocket(mirrorSock);mirrorSock = null;// .........}
}// send connect-ack to source for clients and not transfer-RBW/Finalized
// 向上游节点发送请求响应确认信息
if (isClient && !isTransfer) {BlockOpResponseProto.newBuilder().setStatus(mirrorInStatus).setFirstBadLink(firstBadLink).build().writeDelimitedTo(replyOut);replyOut.flush();
}

5、在成功的建立了上下游节点的输入/输出流后;writeBlock()方法会调用blockReceiver.receiveBlock()方法从数据流管道中的上游接收数据块,然后保存数据块到当前数据节点的存储中,再将数据块转发到数据流管道中的下游数据节点。同时Blockreceiver还会接收来自下游节点的响应,并将这个响应发送给数据流管道中的上游节点。

if (blockReceiver != null) {String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,mirrorAddr, null, targets, false);// send close-ack for transfer-RBW/Finalized // 数据块复制操作,直接把状态置成SUCCESS,返回上游节点相关信息if (isTransfer) {if (LOG.isTraceEnabled()) {LOG.trace("TRANSFER: send close-ack");}writeResponse(SUCCESS, null, replyOut);}
}

6、更新记录当前新写入的数据块副本的时间戳、副本大小等信息,并根据是否是数据流管道的恢复操作或者数据块的复制操作,调用datanode.closeBlock()向NameNode汇报当前DataNode接收到新的数据块notifyNamenodeReceivedBlock。

// update its generation stamp
if (isClient && stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {block.setGenerationStamp(latestGenerationStamp);block.setNumBytes(minBytesRcvd);
}
if (isDatanode ||stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, storageUuid);LOG.info("Received " + block + " src: " + remoteAddress + " dest: "+ localAddress + " of size " + block.getNumBytes());
}

从上面的DataXceiver.writeBlock()方法的具体分析可以知道,其真正处理接收上游数据块,写入本地磁盘,并转发到数据流管道中的下游节点的处理类及过程是:blockReceiver.receiveBlock(),接下来对blockReceiver类进行详细的分析:

BlockReceiver类:

blockReceiver.receiveBlock()方法会先启动packetResponder线程负责接收并转发下游数据节点发送的确认数据包的ACK消息。之后receiverBlock方法循环调用receiverpacket()方法接收上游写入的数据包并发送这个数据包到下游节点,成功完成整个数据块的写入操作后,receiverBlock方法关闭Packetresponder线程。其接收数据块流程如下:

void receiveBlock(DataOutputStream mirrOut, // output to next datanodeDataInputStream mirrIn,   // input from next datanodeDataOutputStream replyOut,  // output to previous datanodeString mirrAddr, DataTransferThrottler throttlerArg,DatanodeInfo[] downstreams,boolean isReplaceBlock) throws IOException {//...... 参数设置try {// 如果是客户端发起的写请求(此处即为数据块create),// 则启动PacketResponder发送ackif (isClient && !isTransfer) {responder = new Daemon(datanode.threadGroup, new PacketResponder(replyOut, mirrIn, downstreams));responder.start(); // start thread to processes responses}// 循环同步接收packet,写block文件和meta文件while (receivePacket() >= 0) {}// 此时节点已接收了所有packet,可以等待发送完所有ack后关闭responderif (responder != null) {((PacketResponder)responder.getRunnable()).close();responderClosed = true;}//...... 数据块复制相关} catch (IOException ioe) {//...... 异常处理} finally {//...... 清理}
}

在blockReceiver.receiveBlock()方法中会同步循环调用receivePacket()方法来完整的接收数据块所切分的所有数据包;receivePacket()方法首先会从输入流中取出一个数据包,并将这个数据包放在缓冲区中,receivePacket()成功接收数据包之后,会判断当前节点是否是数据流管道中的最后一个节点,或者输入流是否启动了同步数据块标识,要求Datanode立即将数据包同步到磁盘。在这两种情况下,datanode会先将数据写入磁盘,然后再通知packetResponder处理确认消息,否则,receivePacket()方法接收完数据包后会立即通知packetResponder处理确认消息。接下来receivePacket()会将数据包发送给数据流管道中的下游节点,然后就可以将数据块文件和校验文件写入数据节点的磁盘,如果当前节点时数据流管道中的最后一个节点,则在写入磁盘前,需要对数据包进行校验。

private int receivePacket() throws IOException {// read the next packetpacketReceiver.receiveNextPacket(in);// ...... 检查packet头PacketHeader header = packetReceiver.getHeader();long offsetInBlock = header.getOffsetInBlock();long seqno = header.getSeqno();boolean lastPacketInBlock = header.isLastPacketInBlock();final int len = header.getDataLen();boolean syncBlock = header.getSyncBlock();// ......  // 如果不需要立即持久化也不需要校验收到的数据,// 则可以将当前packet响应信息ack加入ackQueue中,委托PacketResponder线程返回SUCCESS的ack,然后再进行校验和持久化if (responder != null && !syncBlock && !shouldVerifyChecksum()) {((PacketResponder) responder.getRunnable()).enqueue(seqno,lastPacketInBlock, offsetInBlock, Status.SUCCESS);}// First write the packet to the mirror:// 向下游节点发送数据包if (mirrorOut != null && !mirrorError) {try {long begin = Time.monotonicNow();// For testing. Normally no-op.DataNodeFaultInjector.get().stopSendingPacketDownstream();packetReceiver.mirrorPacketTo(mirrorOut);mirrorOut.flush();} catch (IOException e) {handleMirrorOutError(e);}}ByteBuffer dataBuf = packetReceiver.getDataSlice();ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();if (lastPacketInBlock || len == 0) {    // 收到空packet可能是表示心跳或数据块发送// 这两种情况都可以尝试把之前的数据刷到磁盘if (syncBlock) {flushOrSync(true);}} else {    // 持久化packet// 如果是管道中的最后一个节点,则持久化之前,要先对收到的packet做一次校验// 如果校验错误,则将packet响应信息ack加入ackQueue中,委托PacketResponder线程返回 ERROR_CHECKSUM 的ackfinal boolean shouldNotWriteChecksum = checksumReceivedLen == 0&& streams.isTransientStorage();try {long onDiskLen = replicaInfo.getBytesOnDisk();if (onDiskLen<offsetInBlock) {// 如果校验块不完整,需要加载并调整旧的meta文件内容,供后续重新计算crc// 写block文件int startByteToDisk = (int)(onDiskLen-firstByteInBlock) + dataBuf.arrayOffset() + dataBuf.position();int numBytesToDisk = (int)(offsetInBlock-onDiskLen);out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);// 写meta文件final byte[] lastCrc;if (shouldNotWriteChecksum) {lastCrc = null;} else if (partialCrc != null) {  // 如果是校验块不完整(之前收到过一部分)// 重新计算crc 更新lastCrcchecksumOut.write(buf);partialCrc = null;} else { // 如果校验块完整// 更新lastCrcchecksumOut.write(checksumBuf.array(), offset, checksumLen);}// ......}} catch (IOException iex) {datanode.checkDiskErrorAsync();throw iex;}}// if sync was requested, put in queue for pending acks here// (after the fsync finished)// 如果需要立即持久化或需要校验收到的数据,则现在已经完成了持久化和校验// 将当前packet响应信息ack加入ackQueue中,委托PacketResponder线程返回SUCCESS的ackif (responder != null && (syncBlock || shouldVerifyChecksum())) {((PacketResponder) responder.getRunnable()).enqueue(seqno,lastPacketInBlock, offsetInBlock, Status.SUCCESS);}// ......return lastPacketInBlock?-1:len;
}

BlockReceiver.PacketResponder类:

PacketResponder是一个线程类,它和BlockReceiver共同完成数据块的写操作流程。BlockReceiver完成对指定数据包的处理之后,会触发PacketResponder类处理当前数据包的响应消息,PacketResponder监听下游的输入流,接收到这个数据包的确认消息之后,在确认信息中,添加当前数据节点的确认消息,然后将这个消息发送给上游数据节点。

BlockReceiver在完成对指定的数据包处理之后,会调用委托PacketResponder类来处理这个数据包的响应。其首先会调用PacketResponder.enqueue()方法,将当前节点对当前数据包的响应信息ack加入到ackQueue队列(存储当前节点对当前packet包的响应信息)中。然后调用notify()方法通知PacketResponder.run()方法处理该数据包的响应信息。可以看到ackQueue是一个典型的生产者-消费者队列。
        完成上述操作之后,packetResponder会判断当前接收的数据包的响应是否为数据块中最后一个数据包的响应,如果是,则调用finalizeBlock()方法向namenode提交这个数据块,并在完成数据包的响应处理之后,从ackQueue队列中移除这个数据包。

void enqueue(final long seqno, final boolean lastPacketInBlock,final long offsetInBlock, final Status ackStatus) {final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,System.nanoTime(), ackStatus);if(LOG.isDebugEnabled()) {LOG.debug(myString + ": enqueue " + p);}synchronized(ackQueue) {if (running) {// 将当前节点对当前packet包的响应信息加入ackQueue中ackQueue.addLast(p);ackQueue.notifyAll();}}
}
public void run() {while (isRunning() && !lastPacketInBlock) {try {Packet pkt = nullPipelineAck ack = new PipelineAck();try {// 如果当前节点不是管道的最后一个节点,且下游节点正常,则从下游读取ackif (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {ack.readFields(downstreamIn);seqno = ack.getSeqno();}// 如果从下游节点收到了正常的ack,或当前节点是管道的最后一个节点,// 则需要从队列中取出当前节点对pkt的响应信息(即BlockReceiver#receivePacket()放入的ack)if (seqno != PipelineAck.UNKOWN_SEQNO|| type == PacketResponderType.LAST_IN_PIPELINE) {pkt = waitForAckHead(seqno);if (!isRunning()) {break;}// 判断下游接收序号与当前节处理序号是否相等// 可知该序号的packet是否已经正确接收expected = pkt.seqno;if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE&& seqno != expected) {throw new IOException(myString + "seqno: expected=" + expected+ ", received=" + seqno);}lastPacketInBlock = pkt.lastPacketInBlock;}} catch (InterruptedException ine) {// ......异常处理}// ......// 如果是最后一个packet,将block的状态转换为FINALIZED,并关闭BlockReceiverif (lastPacketInBlock) {finalizeBlock(startTime);}// 此时ack.seqno==pkt.seqno,将 下游节点的响应和当前节点的响应 构造新ack发送给上游sendAckUpstream(ack, expected, totalAckTimeNanos,(pkt != null ? pkt.offsetInBlock : 0), (pkt != null ? pkt.ackStatus : Status.SUCCESS));if (pkt != null) {removeAckHead();}} catch (IOException e) {// ......异常处理}}
}

最后在PacketResponder处理完所有数据包的响应信息后,其会调用PacketResponder#finalizeBlock()方法来告知NameNode当前DataNode已经成功的接受了当前数据块;以便NameNode更新对应的命名空间。PacketResponder#finalizeBlock()方法最终会调用BPOfferService.notifyNamenodeReceivedBlock()来通知NameNode。该处的源码较为简单就不做过多的赘述了。

最后,总结一下PacketResponder响应线程的整体流程如下:

  1. 从下游的datanode中读取响应数据ack
  2. 调用waitForAckHead方法从ackQueue队列中获取数据包响应信息。
  3. 对从下游获取的数据包和从队列中的数据包的seqno进行比较,如果不一致的话,直接抛出异常。
  4. 如果是最后一个数据包,调用finalizeBlock()方法完成数据块
  5. 把当前的packet ack确认信息和下游的ack确认信息合并,然后发到上游节点
  6. 从ackQueue里删除响应的数据包信息

DataNode DataXceiverServer writeBlock详解相关推荐

  1. Hadoop框架:DataNode工作机制详解

    本文源码:GitHub·点这里 || GitEE·点这里 一.工作机制 1.基础描述 DataNode上数据块以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是数据块元数据包括长度.校验.时 ...

  2. HDFS(下):NameNode和SecondaryNameNode、HDFS工作机制、故障处理、集群安全模式、服役退役节点、集群黑白名单、DataNode多目录详解、HDFS2.x新特性

    接上篇,上篇文章传送门:HDFS(上):HDFS优缺点.HDFS操作.HDFS客户端操作.HDFS的API.HDFS数据流.HDFS的IO流.HDFS读写数据流程.HDFS文件处理详解.windows ...

  3. HDFS体系结构(NameNode、DataNode详解)

    hadoop项目地址:http://hadoop.apache.org/ NameNode.DataNode详解 (一)分布式文件系统概述 数据量越来越多,在一个操作系统管辖的范围存不下了,那么就分配 ...

  4. Hadoop核心架构HDFS+MapReduce+Hbase+Hive内部机理详解

    编者按:HDFS和MapReduce是Hadoop的两大核心,除此之外Hbase.Hive这两个核心工具也随着Hadoop发展变得越来越重要.本文作者张震的博文<Thinking in BigD ...

  5. Hadoop 新 MapReduce 框架 Yarn 详解

    Hadoop MapReduceV2(Yarn) 框架简介 原 Hadoop MapReduce 框架的问题 对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储 ...

  6. 一致性协议raft详解(一):raft整体介绍

    一致性协议raft详解(一):raft介绍 前言 概述 raft独特的特性 raft集群的特点 raft中commit何意? raft leader election log replication ...

  7. [转]大数据环境搭建步骤详解(Hadoop,Hive,Zookeeper,Kafka,Flume,Hbase,Spark等安装与配置)

    大数据环境安装和配置(Hadoop2.7.7,Hive2.3.4,Zookeeper3.4.10,Kafka2.1.0,Flume1.8.0,Hbase2.1.1,Spark2.4.0等) 系统说明 ...

  8. Hadoop Streaming详解

    一: Hadoop Streaming详解 1.Streaming的作用 Hadoop Streaming框架,最大的好处是,让任何语言编写的map, reduce程序能够在hadoop集群上运行:m ...

  9. HDFS NameNode内存详解

    前言 <HDFS NameNode内存全景>中,我们从NameNode内部数据结构的视角,对它的内存全景及几个关键数据结构进行了简单解读,并结合实际场景介绍了NameNode可能遇到的问题 ...

最新文章

  1. [CF460E]Roland and Rose
  2. 微软Windows 11正式发布!一文带你了解免费升级方法、最低系统要求
  3. 深圳内推 | 华为诺亚方舟实验室招聘机器学习/深度学习算法实习生
  4. KineticJS教程(3)
  5. 程序员圣诞节相册源码_程序员分享圣诞刷屏源码,这次朋友圈千万不要再@微信官方了!...
  6. 进程调度算法 C++实现
  7. Python打包exe后报错:Failed to execute script xxxx问题的解决办法
  8. 数据结构6——回文树
  9. uni-app开发规范
  10. 计算机网络学习1-网络层次
  11. [转]awesome-tensorflow-chinese
  12. 二十一、日期Date类型
  13. QBASIC语言程序设计 谭浩强 pdf
  14. 通关!游戏设计之道的学习笔记(七)关卡设计
  15. SOLID 原则之依赖倒置原则
  16. php max file uploads,php上传多文件max_file_uploads限制问题
  17. 计算机图形学(闫令琪博士课程答疑)-Geometry(三)
  18. 【短视频运营】短视频制作流程 ( 视频存稿 | 写脚本 | 拍摄收音 | 提词器 | 后期剪辑 | 前测工具 | 检查违禁词 )
  19. 描述计算机内的存储单位及换算关系,计算机存储单位换算
  20. Windows.h和windows.h的区别

热门文章

  1. css转换后nook不能看,解决Nook中epub文件显示问号
  2. 机器人走正方形c语言代码,机器人走正方形教学案例
  3. JAVA数轴分界含义,1.程序分析:请利用数轴来分界,定位。注意定义时需把奖金定义成长整型。  用JAVA 谢谢 每部要有注释求大...
  4. 网络地址ABCDE划分记忆,靠谱的回答
  5. 系统管理Lesson 10. Managing Data Concurrency
  6. CMake Error: Could not create named generator Visual Studio 17 2022 win32
  7. cdf会员购广州推出新版小程序,附推荐码FX000118
  8. 前端开发需要学python吗_在选择学习Python开发还是前端开发时需要考虑哪些因素...
  9. vue实现侧边定位栏
  10. 硅谷最爱的测试框架:详解PyTest