以写文件为例,串联整个流程的源码:

 FSDataOutputStream out = fs.create(outFile);

1. DistributedFileSystem

继承并实现了FileSystem,该对象是终端用户和hadoop分布式文件系统交互的接口。

原文说明:

/***************************************************************** Implementation of the abstract FileSystem for the DFS system.* This object is the way end-user code interacts with a Hadoop* DistributedFileSystem.******************************************************************/

调用create方法:

 @Overridepublic FSDataOutputStream create(final Path f, final FsPermission permission,final EnumSet<CreateFlag> cflags, final int bufferSize,final short replication, final long blockSize, final Progressable progress,final ChecksumOpt checksumOpt) throws IOException {statistics.incrementWriteOps(1);Path absF = fixRelativePart(f);return new FileSystemLinkResolver<FSDataOutputStream>() {@Overridepublic FSDataOutputStream doCall(final Path p)throws IOException, UnresolvedLinkException {final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize,checksumOpt);return dfs.createWrappedOutputStream(dfsos, statistics);}@Overridepublic FSDataOutputStream next(final FileSystem fs, final Path p)throws IOException {return fs.create(p, permission, cflags, bufferSize,replication, blockSize, progress, checksumOpt);}}.resolve(this, absF);}

2. DFSClient

调用Create方法:

 /*** Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,* Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is* a hint to where the namenode should place the file blocks.* The favored nodes hint is not persisted in HDFS. Hence it may be honored* at the creation time only. HDFS could move the blocks during balancing or* replication, to move the blocks from favored nodes. A value of null means* no favored nodes for this create*/public DFSOutputStream create(String src, FsPermission permission,EnumSet<CreateFlag> flag, boolean createParent,short replication,long blockSize,Progressable progress,int buffersize,ChecksumOpt checksumOpt,InetSocketAddress[] favoredNodes) throws IOException {checkOpen();if (permission == null) {permission = FsPermission.getFileDefault();}FsPermission masked = permission.applyUMask(dfsClientConf.uMask);if(LOG.isDebugEnabled()) {LOG.debug(src + ": masked=" + masked);}String[] favoredNodeStrs = null;if (favoredNodes != null) {favoredNodeStrs = new String[favoredNodes.length];for (int i = 0; i < favoredNodes.length; i++) {favoredNodeStrs[i] = favoredNodes[i].getHostName() + ":" + favoredNodes[i].getPort();}}final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,src, masked, flag, createParent, replication, blockSize, progress,buffersize, dfsClientConf.createChecksum(checksumOpt),favoredNodeStrs);beginFileLease(result.getFileId(), result);return result;}

3. DFSOutputStream

  DFSOutputStream根据字节流创建文件。客户端应用先将数据写入流的缓存中,然后数据分解成包的形式,每个报文包(packet)通常为64k,一个报文包由多个块(chuck)组成,每个块通常为512比特,且存在一个关联的checksum(类似于文件的md5值)。

  当客户端应用向当前包报文写入数据时,数据排队进入数据队列(dataQueue),DataStreamer线程从数据队列中接收这些数据,然后发送到管道的第一个数据节点(datanode),并将它从数据队列中移动到响应队列(ackQueue)。响应处理器(ResponseProcessor)接收数据节点的响应。 当从所有的数据节点接收到一个成功的响应包报文时,ResponseProcessor将相应的包报文从响应队列中移除。

  当发送错误时,所有未完成的报文从响应队列中移除。从最初的管道线中关闭旧的坏的数据节点,然后新建一个管道线。此时DataStreamer开始从数据节点中发送数据包了。

原文如下:

/***************************************************************** DFSOutputStream creates files from a stream of bytes.** The client application writes data that is cached internally by* this stream. Data is broken up into packets, each packet is* typically 64K in size. A packet comprises of chunks. Each chunk* is typically 512 bytes and has an associated checksum with it.** When a client application fills up the currentPacket, it is* enqueued into dataQueue.  The DataStreamer thread picks up* packets from the dataQueue, sends it to the first datanode in* the pipeline and moves it from the dataQueue to the ackQueue.* The ResponseProcessor receives acks from the datanodes. When an* successful ack for a packet is received from all datanodes, the* ResponseProcessor removes the corresponding packet from the* ackQueue.** In case of error, all outstanding packets and moved from* ackQueue. A new pipeline is setup by eliminating the bad* datanode from the original pipeline. The DataStreamer now* starts sending packets from the dataQueue.
****************************************************************/

 static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,short replication, long blockSize, Progressable progress, int buffersize,DataChecksum checksum, String[] favoredNodes) throws IOException {HdfsFileStatus stat = null;// Retry the create if we get a RetryStartFileException up to a maximum// number of timesboolean shouldRetry = true;int retryCount = CREATE_RETRY_COUNT;while (shouldRetry) {shouldRetry = false;try {stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,new EnumSetWritable<CreateFlag>(flag), createParent, replication,blockSize, SUPPORTED_CRYPTO_VERSIONS);break;} catch (RemoteException re) {IOException e = re.unwrapRemoteException(AccessControlException.class,DSQuotaExceededException.class,FileAlreadyExistsException.class,FileNotFoundException.class,ParentNotDirectoryException.class,NSQuotaExceededException.class,RetryStartFileException.class,SafeModeException.class,UnresolvedPathException.class,SnapshotAccessControlException.class,UnknownCryptoProtocolVersionException.class);if (e instanceof RetryStartFileException) {if (retryCount > 0) {shouldRetry = true;retryCount--;} else {throw new IOException("Too many retries because of encryption" +" zone operations", e);}} else {throw e;}}}Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,flag, progress, checksum, favoredNodes);out.start();return out;}

Packet

 private static class Packet {private static final long HEART_BEAT_SEQNO = -1L;long seqno; // sequencenumber of buffer in blockfinal long offsetInBlock; // offset in blockboolean syncBlock; // this packet forces the current block to diskint numChunks; // number of chunks currently in packetfinal int maxChunks; // max chunks in packetprivate byte[] buf;private boolean lastPacketInBlock; // is this the last packet in block?/*** buf is pointed into like follows:*  (C is checksum data, D is payload data)** [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]*           ^        ^               ^               ^*           |        checksumPos     dataStart       dataPos*           checksumStart* * Right before sending, we move the checksum data to immediately precede* the actual data, and then insert the header into the buffer immediately* preceding the checksum data, so we make sure to keep enough space in* front of the checksum data to support the largest conceivable header. */int checksumStart;int checksumPos;final int dataStart;int dataPos;/*** Create a new packet.* * @param pktSize maximum size of the packet, *                including checksum data and actual data.* @param chunksPerPkt maximum number of chunks per packet.* @param offsetInBlock offset in bytes into the HDFS block.*/private Packet(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,int checksumSize) {this.lastPacketInBlock = false;this.numChunks = 0;this.offsetInBlock = offsetInBlock;this.seqno = seqno;this.buf = buf;checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;checksumPos = checksumStart;dataStart = checksumStart + (chunksPerPkt * checksumSize);dataPos = dataStart;maxChunks = chunksPerPkt;}
}

DataStreamer

DataStreamer负责发送数据报文包到管道中的数据节点。它从名称节点获取到新的blockid和block位置后,开始发送流报文到它的管道中。每个报文包有一个唯一的序列号。当块中所有报文发送完成并接受到响应报文时,DataStreamer将会关闭当前的block。

  private synchronized void start() {streamer.start();}

原文如下:

  //// The DataStreamer class is responsible for sending data packets to the// datanodes in the pipeline. It retrieves a new blockid and block locations// from the namenode, and starts streaming packets to the pipeline of// Datanodes. Every packet has a sequence number associated with// it. When all the packets for a block are sent out and acks for each// if them are received, the DataStreamer closes the current block.//

继承了Daemon(后台线程),间接继承了Thread类,因此其核心方法为run():

 /** streamer thread is the only thread that opens streams to datanode, * and closes them. Any error recovery is also done by this thread.*/@Overridepublic void run() {long lastPacket = Time.now();TraceScope traceScope = null;if (traceSpan != null) {traceScope = Trace.continueSpan(traceSpan);}while (!streamerClosed && dfsClient.clientRunning) {// if the Responder encountered an error, shutdown Responderif (hasError && response != null) {try {response.close();response.join();response = null;} catch (InterruptedException  e) {DFSClient.LOG.warn("Caught exception ", e);}}Packet one;try {// process datanode IO errors if anyboolean doSleep = false;if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {doSleep = processDatanodeError();}synchronized (dataQueue) {// wait for a packet to be sent.long now = Time.now();while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);timeout = timeout <= 0 ? 1000 : timeout;timeout = (stage == BlockConstructionStage.DATA_STREAMING)?timeout : 1000;try {dataQueue.wait(timeout);} catch (InterruptedException  e) {DFSClient.LOG.warn("Caught exception ", e);}doSleep = false;now = Time.now();}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}// get packet to be sent.if (dataQueue.isEmpty()) {one = createHeartbeatPacket();} else {one = dataQueue.getFirst(); // regular data packet
            }}assert one != null;// get new block from namenode.if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {if(DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("Allocating new block");}setPipeline(nextBlockOutputStream());   initDataStreaming();} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {if(DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("Append to block " + block);}setupPipelineForAppendOrRecovery();initDataStreaming();}long lastByteOffsetInBlock = one.getLastByteOffsetBlock();if (lastByteOffsetInBlock > blockSize) {throw new IOException("BlockSize " + blockSize +" is smaller than data size. " +" Offset of packet in block " + lastByteOffsetInBlock +" Aborting file " + src);}if (one.lastPacketInBlock) {// wait for all data packets have been successfully ackedsynchronized (dataQueue) {while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {try {// wait for acks to arrive from datanodesdataQueue.wait(1000);} catch (InterruptedException  e) {DFSClient.LOG.warn("Caught exception ", e);}}}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}stage = BlockConstructionStage.PIPELINE_CLOSE;}// send the packetsynchronized (dataQueue) {// move packet from dataQueue to ackQueueif (!one.isHeartbeatPacket()) {dataQueue.removeFirst();ackQueue.addLast(one);dataQueue.notifyAll();}}if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("DataStreamer block " + block +" sending packet " + one);}// write out data to remote datanodetry {one.writeTo(blockStream);blockStream.flush();   } catch (IOException e) {// HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already// been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a// time. If the primary node fails again during the recovery, it// will be taken out then.
            tryMarkPrimaryDatanodeFailed();throw e;}lastPacket = Time.now();// update bytesSentlong tmpBytesSent = one.getLastByteOffsetBlock();if (bytesSent < tmpBytesSent) {bytesSent = tmpBytesSent;}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}// Is this block full?if (one.lastPacketInBlock) {// wait for the close packet has been ackedsynchronized (dataQueue) {while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {dataQueue.wait(1000);// wait for acks to arrive from datanodes
              }}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}endBlock();}if (progress != null) { progress.progress(); }// This is used by unit test to trigger race conditions.if (artificialSlowdown != 0 && dfsClient.clientRunning) {Thread.sleep(artificialSlowdown); }} catch (Throwable e) {// Log warning if there was a real error.if (restartingNodeIndex == -1) {DFSClient.LOG.warn("DataStreamer Exception", e);}if (e instanceof IOException) {setLastException((IOException)e);} else {setLastException(new IOException("DataStreamer Exception: ",e));}hasError = true;if (errorIndex == -1 && restartingNodeIndex == -1) {// Not a datanode issuestreamerClosed = true;}}}if (traceScope != null) {traceScope.close();}closeInternal();}

ResponseProcessor

处理数据节点的响应。当接收到响应时,将一个包报文从响应队列中删除。

DataStreamer的run方法启动了ResponseProcessor线程:

    /*** Initialize for data streaming*/private void initDataStreaming() {this.setName("DataStreamer for file " + src +" block " + block);response = new ResponseProcessor(nodes);response.start();stage = BlockConstructionStage.DATA_STREAMING;}

原文描述:

    //// Processes responses from the datanodes.  A packet is removed// from the ackQueue when its response arrives.//

继承了Daemon(后台线程),间接继承了Thread类,因此其核心方法为run():

 public void run() {setName("ResponseProcessor for block " + block);
        PipelineAck ack = new PipelineAck();
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {// process responses from datanodes.try {// read an ack from the pipelinelong begin = Time.monotonicNow();
            ack.readFields(blockReplyStream);
            long duration = Time.monotonicNow() - begin;
            if (duration > dfsclientSlowLogThresholdMs&& ack.getSeqno() != Packet.HEART_BEAT_SEQNO) {DFSClient.LOG.warn("Slow ReadProcessor read fields took " + duration+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "+ ack + ", targets: " + Arrays.asList(targets));
            } else if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("DFSClient " + ack);
            }long seqno = ack.getSeqno();
            // processes response status from datanodes.for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {final Status reply = ack.getReply(i);
              // Restart will not be treated differently unless it is// the local node or the only one in the pipeline.if (PipelineAck.isRestartOOBStatus(reply) &&shouldWaitForRestart(i)) {restartDeadline = dfsClient.getConf().datanodeRestartTimeout +Time.now();
                setRestartingNodeIndex(i);
                String message = "A datanode is restarting: " + targets[i];
                DFSClient.LOG.info(message);
               throw new IOException(message);
              }// node errorif (reply != SUCCESS) {setErrorIndex(i); // first bad datanodethrow new IOException("Bad response " + reply +" for block " + block +" from datanode " + targets[i]);
              }}assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: " + ack;
            if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ackcontinue;
            }// a success ack for a data packetPacket one;
            synchronized (dataQueue) {one = ackQueue.getFirst();
            }if (one.seqno != seqno) {throw new IOException("ResponseProcessor: Expecting seqno " +" for block " + block +one.seqno + " but received " + seqno);
            }isLastPacketInBlock = one.lastPacketInBlock;
// Fail the packet write for testing in order to force a// pipeline recovery.if (DFSClientFaultInjector.get().failPacket() &&isLastPacketInBlock) {failPacket = true;
              throw new IOException("Failing the last packet for testing.");
            }// update bytesAckedblock.setNumBytes(one.getLastByteOffsetBlock());
synchronized (dataQueue) {lastAckedSeqno = seqno;
              ackQueue.removeFirst();
              dataQueue.notifyAll();
one.releaseBuffer(byteArrayManager);
            }} catch (Exception e) {if (!responderClosed) {if (e instanceof IOException) {setLastException((IOException)e);
              }hasError = true;
              // If no explicit error report was received, mark the primary// node as failed.tryMarkPrimaryDatanodeFailed();
              synchronized (dataQueue) {dataQueue.notifyAll();
              }if (restartingNodeIndex == -1) {DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "+ " for block " + block, e);
              }responderClosed = true;
            }}}}

小结:

  从上面的源码分析我们可以知道:

  DFSOutputStream是hdfs写文件的主类,它通过DataStreamer来写文件,并通过ResponseProcessor来处理数据节点的返回信息。

转载于:https://www.cnblogs.com/davidwang456/p/4778810.html

hdfs源码分析第二弹相关推荐

  1. hdfs源码分析第一弹

    1. hdfs定义 HDFS is the primary distributed storage used by Hadoop applications. A HDFS cluster primar ...

  2. Springboot源码分析第一弹 - 自动装配实现

    Springboot就不用多了吧,解放Java开发双手的神器. 最显著的特点就是,去配置化,自动装配,自动配置.让开发人员只需要注重业务的开发 今天就来了解一下自动装配的源码是怎么实现的 预先准备 直 ...

  3. HDFS源码分析心跳汇报之数据结构初始化

    在<HDFS源码分析心跳汇报之整体结构>一文中,我们详细了解了HDFS中关于心跳的整体结构,知道了BlockPoolManager.BPOfferService和BPServiceActo ...

  4. HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程

    在<HDFS源码分析心跳汇报之数据结构初始化>一文中,我们了解到HDFS心跳相关的BlockPoolManager.BPOfferService.BPServiceActor三者之间的关系 ...

  5. HDFS源码分析DataXceiver之整体流程

    在<HDFS源码分析之DataXceiverServer>一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer.它被用于接收来自客户端或其他数据节 ...

  6. tcplayer 源码改造第二弹 - 加入倍速播放

    文章目录 前序 简介 人群 git地址 源码改造(各位客官请自行格式化代码) 实现倍速切换的函数 添加配置参数 添加获取当前倍速的方法 添加切换倍速的函数 参照切换清晰度的代码对控制栏加入倍速播放的节 ...

  7. spring源码分析第二天------spring系统概述以及IOC实现原理

    1.Spring5 概述 Spring 是一个开源的轻量级 Java SE(Java 标准版本)/Java EE(Java 企业版本)开发应用框架, 其目的是用于简化企业级应用程序开发. Spring ...

  8. hadoop作业初始化过程详解(源码分析第三篇)

    (一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...

  9. Netty源码分析第7章(编码器和写数据)----第2节: MessageToByteEncoder

    Netty源码分析第7章(编码器和写数据)---->第2节: MessageToByteEncoder Netty源码分析第七章: Netty源码分析 第二节: MessageToByteEnc ...

最新文章

  1. python窗口显示图片imread() imshow()_Python-OpenCV学习之imread,imshow
  2. springboot + profile(不同环境读取不同配置)
  3. MATLAB-算术运算
  4. Java面向对象特征介绍
  5. Linux常用命令和服务器配置
  6. 刚刚,三位科学家获得2019年诺贝尔物理学奖!
  7. SQL Server-服务器迁移之后login登录问题
  8. 我们家的HTML5捣蛋王开始行动了
  9. 【转】惹恼程序员的十件事
  10. paip.2013年技术趋势以及热点 v2.0 cae
  11. python和c++哪个好-python和C++语言哪个好?老男孩教育
  12. ant design pro模板_ant design pro 当中改变ant design 组件的样式和 数据管理
  13. mysql 存储过程 长字符串_MySQL存储过程--长字符串扯分
  14. Android仿新浪微博弹出界面动画,Android仿新浪微博个人信息界面及其他效果
  15. 汇编语言典型例子详解_汇编语言及编程实例(电子教案).pdf
  16. 读《富爸爸,穷爸爸》后感(二)
  17. Hive大总结!!!
  18. 安卓开发视频教程!十多家大厂Android面试真题锦集干货整理,写给正在求职的安卓开发
  19. 使用Zerotier实现免费内网穿透
  20. adobe 安装需要帐号

热门文章

  1. python mysql模块 pip_MySQLpython模块不能与Pip一起在windows中安装
  2. mysql 相除 取整数位,psql除法保留小数,实现向上取整和向下取整操作_PostgreSQL_数据库...
  3. 投资计算机方面的策略构建,金融投资简单的策略分享和构建策略的基本思路
  4. java同步异步调用_详解java 三种调用机制(同步、回调、异步)
  5. efcore mysql autofac_Asp.NetCore3.1版本的CodeFirst与经典的三层架构与AutoFac批量注入
  6. 融云php sdk下载安装,LICENSE · 融云 RongCloud/server-sdk-php-composer - Gitee.com
  7. 一次性定时事件的处理
  8. java 异步 web_Java web spring异步方法实现步骤解析
  9. selenium速度_RPA UiPath和Selenium,谁是测试套件?
  10. uniapph5授权成功后返回上一页_被成功验证过的的7条选品思路(收藏)