一 BlockSender发送数据的格式详解

BlockSender主要负责从DataNode的磁盘读取数据块,然后发送数据块到接收方。需要注意的是,BlockSender发送的数据是以一定的结构组织的。

BlockSender发送的数据格式包括两部分:

校验信息头(ChecksumHeader)和数据包序列(packets)

1.1 ChecksumHeader

用于描述当前DataNode使用的校验方式等信息。如下所示,一个校验头信息也包括2个部分:

CHECKSUM_TYPE:校验类型

数据校验类型:包括三种校验—空校验,CRC32以及CRC32C,在这里使用1 byte描述数据校验类型,空校验,CRC32以及CRC32C,分别对应着0,1,2

BYTES_PER_CHECKSUM:校验块大小

校验快大小:也就是多少字节数据产生一个校验值。在这里CRC32为例,一把情况下是512字节数据产生一个4字节的checksum,我们把这512字节的数据叫做一个校验块(Chunk),chunk是HDFS读写数据块操作的最小单元

1.2数据包序列(packets)

BlockSender会将数据块切分成若干数据包对外发送,当数据发送完毕,会以一个空的数据包作为结束。每一个数据包包括一个变长的包头,校验数据和若干字节的实际数据

1.2.1数据包头

用于描述当前数据包信息,是通过PtotoBuf序列化的包括4字节的全包长度,以及2字节的包头长度

=>当前数据包在整个数据块中的位置

=>数据包在管道中的序列号

=>当前数据包是不是数据块中的最后一个数据包

=>当前数据包数据部分的长度

=>是否需要DN同步

1.2.2校验数据

校验数据是对实际数据做校验操作产生的,它将实际数据以校验块为单位,每一个校验块产生一个checksum,校验数据中包含了所有校验块的checksum.校验数据的大小=(实际数据长度+校验块大小)/ 校验块大小 *校验和长度

1.2.3实际数据

数据包中的实际数据就是数据块文件中保存的数据,实际数据的传输是以校验块为单位的,一个校验块对应产生一个checksum的实际数据。在数据包中会将校验块和校验数据分开发送,首先将所有校验块的校验数据发送出去,然后再发所有的校验块

二 BlockSender的实现

数据块的发送主要是由BlockSender来实现的,其发送过程包括:

发送准备,发送数据,清理工作

2.1发送准备

BlockSender数据块发送准备工作主要是在构造过程中执行的,

BlockSender(ExtendedBlock block, long startOffset, long length,

boolean corruptChecksumOk, boolean verifyChecksum,

boolean sendChecksum, DataNode datanode, String clientTraceFmt,

CachingStrategy cachingStrategy)

throws IOException {

try {

this.block = block;

this.corruptChecksumOk = corruptChecksumOk;

this.verifyChecksum = verifyChecksum;

this.clientTraceFmt = clientTraceFmt;

/*

* 如果缓存策略readDropBehind为空,我们则按照配置文件

* dfs.datanode.drop.cache.behind.reads来初始化

* dropCacheBehindLargeReads

*/

if (cachingStrategy.getDropBehind() ==null) {

this.dropCacheBehindAllReads = false;

this.dropCacheBehindLargeReads =

datanode.getDnConf().dropCacheBehindReads;

} else {

this.dropCacheBehindAllReads =

this.dropCacheBehindLargeReads =

cachingStrategy.getDropBehind().booleanValue();

}

/*

* 如果缓存策略readahead为空,那么我们则按照配置文件

* dfs.datanode.readahead.bytes的处理,默认是

* 4Mb

*/

if (cachingStrategy.getReadahead() ==null) {

this.alwaysReadahead = false;

this.readaheadLength = datanode.getDnConf().readaheadLength;

} else {

this.alwaysReadahead = true;

this.readaheadLength = cachingStrategy.getReadahead().longValue();

}

this.datanode = datanode;

//如果需要验证校验数据

if (verifyChecksum) {

// To simplify implementation, callers maynot specify verification

// without sending.

Preconditions.checkArgument(sendChecksum,

"If verifying checksum, currently mustalso send it.");

}

final Replica replica;

final long replicaVisibleLength;

synchronized(datanode.data) {

replica = getReplica(block, datanode);

replicaVisibleLength = replica.getVisibleLength();

}

// if there is a write in progress

ChunkChecksum chunkChecksum = null;

if (replica instanceof ReplicaBeingWritten) {

final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica;

waitForMinLength(rbw, startOffset + length);

chunkChecksum = rbw.getLastChecksumAndDataLen();

}

if (replica.getGenerationStamp() < block.getGenerationStamp()) {

throw new IOException("Replica gen stamp < block genstamp, block="

+ block + ", replica=" + replica);

} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {

if (DataNode.LOG.isDebugEnabled()) {

DataNode.LOG.debug("Bumpingup the client provided"

+ " block's genstamp to latest " + replica.getGenerationStamp()

+ " for block " + block);

}

block.setGenerationStamp(replica.getGenerationStamp());

}

if (replicaVisibleLength < 0) {

throw new IOException("Replica is not readable, block="

+ block + ", replica=" + replica);

}

if (DataNode.LOG.isDebugEnabled()) {

DataNode.LOG.debug("block=" + block + ",replica=" + replica);

}

//是否开启transferTo模式

this.transferToAllowed = datanode.getDnConf().transferToAllowed &&

(!is32Bit || length <= Integer.MAX_VALUE);

// Obtain a reference before reading data

this.volumeRef = datanode.data.getVolume(block).obtainReference();

/*

* (corruptChecksumOK, meta_file_exist):operation

* True,  True: will verify checksum

* True, False: No verify, e.g., need to read data from a corrupted file

* False, True: will verify checksum

* False, False: throws IOException filenot found

*/

//获取checksum信息:从Meta文件中获取当前数据块的校验算法、校验和长度,以及多少字节产生一个校验值

//也就是校验块的大小

DataChecksum csum = null;

if (verifyChecksum || sendChecksum) {

LengthInputStream metaIn = null;

boolean keepMetaInOpen = false;

try {

metaIn = datanode.data.getMetaDataInputStream(block);

if (!corruptChecksumOk || metaIn != null) {

if (metaIn == null) {

//need checksum but meta-data not found

throw new FileNotFoundException("Meta-data not found for " +

block);

}

if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()){

checksumIn = new DataInputStream(new BufferedInputStream(

metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));

csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);

keepMetaInOpen = true;

}

} else {

LOG.warn("Couldnot find metadata file for " + block);

}

} finally {

if (!keepMetaInOpen) {

IOUtils.closeStream(metaIn);

}

}

}

if (csum == null) {

csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);

}

/*

* If chunkSize is very large, then the metadatafile is mostly

* corrupted. For now just truncatebytesPerchecksum to blockLength.

*/

int size = csum.getBytesPerChecksum();

if (size > 10*1024*1024 && size > replicaVisibleLength) {

csum = DataChecksum.newDataChecksum(csum.getChecksumType(),

Math.max((int)replicaVisibleLength, 10*1024*1024));

size = csum.getBytesPerChecksum();

}

chunkSize = size;//校验块大小

checksum = csum;//校验算法

checksumSize = checksum.getChecksumSize();//校验和长度

length = length < 0 ? replicaVisibleLength : length;

// end is either last byte on disk or the length forwhich we have a

// checksum

long end = chunkChecksum != null ? chunkChecksum.getDataLength()

: replica.getBytesOnDisk();

if (startOffset < 0 || startOffset > end

|| (length + startOffset) > end) {

String msg = " Offset " + startOffset + "and length " + length

+ " don't match block " + block + " (blockLen " + end + " )";

LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) +

":sendBlock() : " + msg);

throw new IOException(msg);

}

//计算offset和endOffset,offset用于标识要去读取的数据在数据块的起始位置

//endOffset:用于标识结束位置。由于读取位置往往不会落在某个校验块的起始位置,

//所以在准备工作中,需要确保offset的校验块的起始位置,endOffset在校验块的

//的结束位置。这样读取时就可以校验块为单位读取,方便校验和的操作

offset = startOffset - (startOffset % chunkSize);

if (length >= 0) {

// Ensure endOffset points to end of chunk.

long tmpLen = startOffset + length;

if (tmpLen % chunkSize != 0) {

tmpLen += (chunkSize - tmpLen % chunkSize);

}

if (tmpLen < end) {

// will use on-disk checksum here since theend is a stable chunk

end = tmpLen;

} else if (chunkChecksum != null) {

// last chunk is changing. flag that weneed to use in-memory checksum

this.lastChunkChecksum = chunkChecksum;

}

}

endOffset = end;

//寻找正确的offSET

if (offset > 0 && checksumIn != null) {

long checksumSkip = (offset / chunkSize) * checksumSize;

// note blockInStream is seeked whencreated below

if (checksumSkip > 0) {

// Should we use seek() for checksum fileas well?

IOUtils.skipFully(checksumIn, checksumSkip);

}

}

seqno = 0;

if (DataNode.LOG.isDebugEnabled()) {

DataNode.LOG.debug("replica=" + replica);

}

blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset

if (blockIn instanceof FileInputStream) {

blockInFd = ((FileInputStream)blockIn).getFD();

} else {

blockInFd = null;

}

} catch (IOException ioe) {

IOUtils.closeStream(this);

IOUtils.closeStream(blockIn);

throw ioe;

}

}

2.2发送数据块

首先会进行预读取和丢弃,调用manageOsCache操作

/*

* 预读取或者丢弃

*

*/

private void manageOsCache() throws IOException {

if (blockInFd == null) return;

//按照条件触发预读取操作

if ((readaheadLength > 0)&& (datanode.readaheadPool != null)&&

(alwaysReadahead || isLongRead())) {

//满足预读取条件,则调用ReadaheadPool.readaheadStream方法触发预读取

curReadahead = datanode.readaheadPool.readaheadStream(

clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,

curReadahead);

}

//丢弃刚才从缓存中读取的数据,因为不再需要使用这些数据了

if (dropCacheBehindAllReads ||

(dropCacheBehindLargeReads && isLongRead())) {

//丢弃数据的位置

long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;

if (offset >= nextCacheDropOffset) {

//如果下一次读取数据的位置大于丢弃的数据的位置,则将读取数据位置前的数据全部丢弃

long dropLength = offset - lastCacheDropOffset;

NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(

block.getBlockName(), blockInFd, lastCacheDropOffset,

dropLength, NativeIO.POSIX.POSIX_FADV_DONTNEED);

lastCacheDropOffset = offset;

}

}

}

/**

* 读取数据块和他的元数据,然后发送数据到啊客户端或者其他的datanode

* out:将数据写到那儿

* throttler: 用于发送数据

*/

long sendBlock(DataOutputStream out, OutputStream baseStream,

DataTransferThrottler throttler) throws IOException {

TraceScope scope = datanode.tracer.

newScope("sendBlock_" + block.getBlockId());

try {

return doSendBlock(out, baseStream, throttler);

} finally {

scope.close();

}

}

private long doSendBlock(DataOutputStream out, OutputStream baseStream,

DataTransferThrottler throttler) throws IOException {

if (out == null) {

throw new IOException( "out stream is null" );

}

initialOffset = offset;

long totalRead = 0;

OutputStream streamForSendChunks = out;

lastCacheDropOffset = initialOffset;

if (isLongRead() && blockInFd != null) {

// Advise that this file descriptor will be accessedsequentially.

NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(

block.getBlockName(), blockInFd, 0, 0,

NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);

}

//预读取&丢弃

manageOsCache();

final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;

try {

int maxChunksPerPacket;

//构造一个Packet Buffer,也就是能容纳一个数据包的大小,对于2中不同发送数据包模式:

//transferTo和ioStream,缓冲区大小是不一样的。在transferTo模式中,数据块文件

//是通过零拷贝的方式直接传输给客户端,不需要将数据块文件写入缓冲区,所以Packet Buffer

//只需要缓冲校验数据即可;而ioStream模式则需要将实际数据以及校验数据都缓存下来

int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;

boolean transferTo = transferToAllowed && !verifyChecksum

&& baseStream instanceof SocketOutputStream

&& blockIn instanceof FileInputStream;

if (transferTo) {

FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();

blockInPosition = fileChannel.position();

streamForSendChunks = baseStream;

maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);

// Smaller packet size to only holdchecksum when doing transferTo

pktBufSize += checksumSize * maxChunksPerPacket;

} else {

maxChunksPerPacket = Math.max(1,

numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));

// Packet size includes both checksum anddata

pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;

}

ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);

//循环调用sendPacket发送数据包序列

while (endOffset > offset && !Thread.currentThread().isInterrupted()) {

//预读取

manageOsCache();

long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,

transferTo, throttler);

offset += len;//更新offset

totalRead += len + (numberOfChunks(len) * checksumSize);

seqno++;

}

// If this thread was interrupted, then it did not sendthe full block.

if (!Thread.currentThread().isInterrupted()) {

try {

//发送一个空的数据包泳衣标志数据块的结束

sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,

throttler);

out.flush();

} catch (IOException e) { //socket error

throw ioeToSocketException(e);

}

sentEntireByteRange = true;

}

} finally {

if ((clientTraceFmt != null)&& ClientTraceLog.isDebugEnabled()) {

final long endTime = System.nanoTime();

ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,

initialOffset, endTime - startTime));

}

close();

}

return totalRead;

}

private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,

boolean transferTo, DataTransferThrottler throttler) throws IOException {

int dataLen = (int) Math.min(endOffset - offset,

(chunkSize * (long) maxChunks));

//数据包中包含多少校验块

int numChunks = numberOfChunks(dataLen);

//校验数据长度

int checksumDataLen = numChunks * checksumSize;

//数据包长度

int packetLen = dataLen + checksumDataLen + 4;

boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;

//将数据包头写入缓存

int headerLen = writePacketHeader(pkt, dataLen, packetLen);

//数据包头在缓存中的位置

int headerOff = pkt.position() - headerLen;

//校验数据在缓存中的位子

int checksumOff = pkt.position();

byte[] buf = pkt.array();

if (checksumSize > 0&& checksumIn != null) {

//校验数据写入缓存

readChecksum(buf, checksumOff, checksumDataLen);

// write in progress that we need to use to get lastchecksum

if (lastDataPacket && lastChunkChecksum != null) {

int start = checksumOff + checksumDataLen - checksumSize;

byte[] updatedChecksum = lastChunkChecksum.getChecksum();

if (updatedChecksum != null) {

System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);

}

}

}

int dataOff = checksumOff + checksumDataLen;

//在普通模式下下将数据写入缓存

if (!transferTo) { //normal transfer

IOUtils.readFully(blockIn, buf, dataOff, dataLen);

//确认校验和数据

if (verifyChecksum) {

verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);

}

}

try {

if (transferTo) {

SocketOutputStream sockOut = (SocketOutputStream)out;

//首先将头和校验和数据写入缓存

sockOut.write(buf, headerOff, dataOff - headerOff);

//使用transfer方式,将数据通过0拷贝的方式写入IO流

FileChannel fileCh = ((FileInputStream)blockIn).getChannel();

LongWritable waitTime = new LongWritable();

LongWritable transferTime = new LongWritable();

sockOut.transferToFully(fileCh, blockInPosition, dataLen,

waitTime, transferTime);

datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());

datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());

blockInPosition += dataLen;

} else {

//普通模式下数据写入IO流

out.write(buf, headerOff, dataOff + dataLen - headerOff);

}

} catch (IOException e) {

if (e instanceof SocketTimeoutException) {

} else {

String ioem = e.getMessage();

if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connectionreset")) {

LOG.error("BlockSender.sendChunks()exception: ", e);

}

datanode.getBlockScanner().markSuspectBlock(

volumeRef.getVolume().getStorageID(),

block);

}

throw ioeToSocketException(e);

}

if (throttler != null) { // rebalancing so throttle

throttler.throttle(packetLen);

}

return dataLen;

}

DataNode之BlockSender分析相关推荐

  1. DataNode之DirectoryScanner分析

    DirectoryScanner的作用就是定期扫描磁盘上的数据块,检查磁盘数据块信息是否和FsDataSetImpl的数据块信息一致,如果不一致则进行更新,他只会检查内存和磁盘上FINALIZED状态 ...

  2. Hadoop源代码分析

    http://wenku.baidu.com/link?url=R-QoZXhc918qoO0BX6eXI9_uPU75whF62vFFUBIR-7c5XAYUVxDRX5Rs6QZR9hrBnUdM ...

  3. Hadoop源代码分析(完整图文版) part 1

    在网上看到了很多此文章的装载,但是都是纯文字,这篇文章在没有图片的情况下阅读起来意义不大了.花了点时间上传了100多张图片,希望对大家学习hadoop有帮助. Hadoop源代码分析(一) 关键字:  ...

  4. Hadoop源代码分析(完整版)

    Hadoop源代码分析(一) 关键字: 分布式云计算 Google的核心竞争技术是它的计算平台.Google的大牛们用了下面5篇文章,介绍了它们的计算设施.  GoogleCluster:http:/ ...

  5. 小朱笔记之hadoop应用实战、源码分析-目录

    小朱笔记之hadoop应用实战.源码分析 1.1 背景目的 该笔记从宏观架构.安装配置.源码分析.使用案例四个方面剖析了Hadoop1.0.3,希望能对同学们提供帮助,赠人玫瑰,手留余香.能够把had ...

  6. DataNode进入Stale状态问题排查

    先说下DataNode为啥会处于Stale状态 默认情况下,DataNode每3s向NameNode发送一次心跳,如果NameNode持续30s没有收到心跳,就把DataNode标记为Stale状态: ...

  7. Hdfs NameNode中数据块管理与数据节点管理分析

    数据块管理 在上一节介绍了BlockManager中的数据块副本状态,主要是保存各个数据块副本状态的存储对象.名字节点第二关系的管理包括数据块管理和数据节点管理,其对数据块的管理是依托于BlockMa ...

  8. Hadoop的学习笔记(Hive|pig|zookeeper|hbase)

    轉載的,此筆記的鏈接地址請點擊此處 hadoop笔记本 <div class="postText"><div id="cnblogs_post_body ...

  9. 大数据开发工程师目录

    阶段一:走进大数据 第1周   学好大数据先攻克Linux 在步入大数据殿堂之前,先带领大家快速掌握大数据的必备技能:Linux的操作使用,为后面学习大数据技术打下坚实基础. 课程安排: 1.掌握Li ...

最新文章

  1. Linux命令cat
  2. .NET : 再谈谈多线程
  3. MVCC在MySQL的InnoDB中的实现
  4. Scala:Function1、Function2
  5. 用计算机弹生僻字乐谱,生僻字 E调 (拇指琴卡林巴琴弹奏谱)_谱友园地_中国曲谱网...
  6. 移动IM开发那些事:技术选型和常见问题
  7. [css] 举例说明时间、频率、角度、弧度、百分度的单位分别是哪些?
  8. 【Java】递归删除文件目录
  9. 什么可以搜python答案_超星Python程序设计答案章节测试答案免费,能搜索网课答案的公众号...
  10. 网站优化如何创作优质的内容?
  11. 计算机ping使用的端口,ping端口命令是什么
  12. SONY索尼摄像机Z280断电KLV.RSV.MXF视频打不开数据恢复成功
  13. peoplesoft笔记
  14. SD card boot and flashing tool for TI davinic DM368
  15. 如何正确的看待人工智能?只有编程基础的人可以学吗?
  16. IP网络主动测评系统——X-Vision
  17. 您目前无法访问XXXX,因为此网站使用了HSTS
  18. Rambo: Last Blood
  19. 带你一起Piu Piu Piu~
  20. 注塑机摆放间距多少合适_选用注塑机的基本原则

热门文章

  1. Java NIO学习篇之PosixFilePermission详解
  2. python 文件修改记录_python基础-文件增删改查
  3. PostgreSQL 12系统表(6)pg_namespace
  4. 6-2图像分类网络模型框架解读(下)
  5. jsp转换java_JSP编码转换
  6. python模拟百度搜索点击链接_python采集百度搜索结果带有特定URL的链接代码实例...
  7. 学习easyui疑问(三)
  8. IDLDrawWidaget Activex
  9. 学习easyui疑惑(四)
  10. html5 内容载入,HTML5 的 DOMContentLoaded 和 onload