DataNode最重要的功能就是管理物理存储上的数据块,并与NameNode和客户端通信执行读写数据块的操作。这里的读写涉及到大量的数据传输,例如DFSClient将数据块写入DataNode, DFSClient从DataNode读取数据,以及将DataNode数据块复制到其他数据节点。这些操作都涉及大量的I/O

在DataNode实现中,对这些读写操作提供了基于TCP流的数据访问接口DataTransferProtocal。

一 DataTransferProtocal

DataTransferProtocal是用来描述从DataNode读或者写数据的另一个接口,方法有:

readBlock:从当前DataNode读取数据块

writeBlock:写数据块到当前DataNode

transferBlock:传输数据块

releaseShortCirciutFds

requestShortCirciutShm

copyBlock:复制数据块

blockChecksum:获取指定数据块的校验值

DataTransferProtocal有2个子类Sender和Receiver。

Sender封装了DataTransferProtocal的调用操作,用于发起流式接口的请求。

Receiver封装了DataTransferProtocal的执行操作,用于响应流式接口的请求

假设DFSClient发起一个DataTransferProtocal.readBlock操作,那么DFSClient就会调用Sender类将这个请求序列化,并且传输给远程的Receiver。远程的Receiver类接受到这个请求之后,会反序列化请求,然后调用执行代码执行读取操作

二 Sender

Sender首先使用ProtoBuf序列化参数,然后用一个枚举类Op描述调用的是什么方法,最后将序列化的参数和Op一起发送给接收方   。

@Override

public voidreadBlock(final ExtendedBlockblk,

final Token<BlockTokenIdentifier> blockToken,

final String clientName,

final longblockOffset,

final longlength,

final booleansendChecksum,

final CachingStrategy cachingStrategy) throws IOException {

//将所有参数使用ProtoBuf序列化

OpReadBlockProto proto = OpReadBlockProto.newBuilder()

.setHeader(DataTransferProtoUtil.buildClientHeader(blk,clientName, blockToken))

.setOffset(blockOffset)

.setLen(length)

.setSendChecksums(sendChecksum)

.setCachingStrategy(getCachingStrategy(cachingStrategy))

.build();

//调用send方法发送Op.READ_BLOCK描述当前使用的是readBlock方法,同时发送序列化后的擦数proto

send(out, Op.READ_BLOCK,proto);

}

privatestatic voidsend(final DataOutputStreamout, final Opopcode,

final Messageproto) throws IOException {

if (LOG.isTraceEnabled()) {

LOG.trace("SendingDataTransferOp " +proto.getClass().getSimpleName()

+": " + proto);

}

//调用op方法写入版本号和操作码

op(out, opcode);

//写入序列化后的参数

proto.writeDelimitedTo(out);

out.flush();

}

当我们调用Sender发起一个readBlock()请求时,Sender会将readBlock请求使用PrtotoBuf序列化,然后通过I/O流发送到远程的DataNode.

DataNode读取到这个请求,会调用Receiver类的对应的readBlock方法opReadBlock方法反序列化请求,然后执行readBlock操作

三 Receiver

Receiver是一个抽象类,用于执行远程节点发起的流逝接口请求,他提供了解析Sender请求的操作码的readOp方法以及处理请求processOp方法,processOp接收readOp解析出来的Op操作码作为参数,针对不同的操作码执行不同的操作,针对不同的操作码执行不同的op操作码函数,比如是readBlock,这里就会调用opReadBlock方法,然后从IO流读取序列化的请求参数,并进行反序列化,然后调用子类DataXceiver的对应方法进行执行。

/** 从IO流读取版本号和Op操作码*/

protected final OpreadOp() throws IOException {

//从IO流读取版本号

final shortversion = in.readShort();

if (version !=DataTransferProtocol.DATA_TRANSFER_VERSION) {

throw newIOException( "Version Mismatch (Expected: " +

DataTransferProtocol.DATA_TRANSFER_VERSION  +

", Received: " +  version + " )");

}

//然后从IO流读取Op

return Op.read(in);

}

/** 接收readOp解析出来的Op操作码作为参数,针对不同的操作码执行不同的操作. */

protected finalvoid processOp(Op op) throws IOException {

switch(op) {

case READ_BLOCK:

opReadBlock();

break;

case WRITE_BLOCK:

opWriteBlock(in);

break;

case REPLACE_BLOCK:

opReplaceBlock(in);

break;

case COPY_BLOCK:

opCopyBlock(in);

break;

case BLOCK_CHECKSUM:

opBlockChecksum(in);

break;

case TRANSFER_BLOCK:

opTransferBlock(in);

break;

case REQUEST_SHORT_CIRCUIT_FDS:

opRequestShortCircuitFds(in);

break;

case RELEASE_SHORT_CIRCUIT_FDS:

opReleaseShortCircuitFds(in);

break;

case REQUEST_SHORT_CIRCUIT_SHM:

opRequestShortCircuitShm(in);

break;

default:

throw newIOException("Unknown op " +op + "in data stream");

}

}

/** Receive OP_READ_BLOCK */

private voidopReadBlock() throws IOException {

//从IO流读取序列化的readBlock参数

OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));

TraceScopetraceScope = continueTraceSpan(proto.getHeader(),

proto.getClass().getSimpleName());

try {

//反序列化参数,然后调用子类的DataXceiver的readBlock方法执行读取操作

readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),

PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),

proto.getHeader().getClientName(),

proto.getOffset(),

proto.getLen(),

proto.getSendChecksums(),

(proto.hasCachingStrategy() ?

getCachingStrategy(proto.getCachingStrategy()) :

CachingStrategy.newDefaultStrategy()));

} finally {

if (traceScope !=null) traceScope.close();

}

}

四 DataXceiverServer

我们知道在Java Socket的实现,首先需要创建一个ServerSocket对象,绑定某个指定的端口,然后通过ServerSocket.accept()方法监听是否有连接请求到达这个端口。当有socket连接请求,accept会返回一个socket对象,之后服务器就可以通过这个socket和客户端通信了。

DataNode的流式接口就参考了Socket的实现,设计了DataXceiver

以及DataXceiverServer,其中DataXceiverServer用于在DataNode上监听流式接口的请求,每当有客户端通过Sender发起流式请求时,DataXceiverServer就会监听并接受这个请求,然后创建一个DataXceiver对象用于响应这个请求并执行对应的操作

4.1DataXceiverServer初始化

在DataNode的初始化流程中,会创建一个DataXceiverServer对象监听所有的流式请求,DataNode会调用DataNode.initDataXceiver方法来完成DataXceiverServer对象的构造。

>>创建TcpPeerServer对象,这个对象封装了Socket对象。TcpPeer

Sever是通过配置项dfs.datanode.address作为监听地址的

privatevoid initDataXceiver(Configurationconf) throws IOException {

//TcpPeerServer封装了Socket,通过dfs.datanode.address监听请求

TcpPeerServertcpPeerServer;

if (secureResources !=null) {

tcpPeerServer = new TcpPeerServer(secureResources);

} else {

tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,

DataNode.getStreamingAddr(conf));

}

//设置Tcp接收缓冲区

tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);

streamingAddr = tcpPeerServer.getStreamingAddr();

LOG.info("Openedstreaming server at " +streamingAddr);

this.threadGroup =new ThreadGroup("dataXceiverServer");

//构造DataXceiverServer实例

xserver = new DataXceiverServer(tcpPeerServer, conf, this);

//将dataXceiverServer线程组设置为守护线程

this.dataXceiverServer =new Daemon(threadGroup, xserver);

this.threadGroup.setDaemon(true);// autodestroy when empty

//短路读取情况

if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,

DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||

conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,

DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {

//构造DomainPeerServer,基于DomainSocket,用于本地间进程通信

DomainPeerServerdomainPeerServer =

getDomainPeerServer(conf, streamingAddr.getPort());

if (domainPeerServer !=null) {

//构造localDataXceiverServer

this.localDataXceiverServer =new Daemon(threadGroup,

new DataXceiverServer(domainPeerServer, conf, this));

LOG.info("Listeningon UNIX domain socket: " +

domainPeerServer.getBindPath());

}

}

//构造shortCircuitRegistry对象

this.shortCircuitRegistry =new ShortCircuitRegistry(conf);

}

4.2 run方法

DataXceiverServer的功能都是在run方法中实现的,它会循环调用peerServer的accept()方法监听,如果有新的连接请求则创建Peer对象,并构造一个DataXceiver线程服务器这个流式请求,流式接口的请求则是由DataXceiver响应的,真正的操作都是DataXceiver来进行的

public voidrun() {

Peer peer = null;

//如果Data Node还在运行的话,一直循环监听是否有新的请求进来

while (datanode.shouldRun && !datanode.shutdownForUpgrade) {

try {

//如果有新的请求进来则调用peerServer的accept方法,并创建Peer对象

peer = peerServer.accept();

// Make sure the xceiver count isnot exceeded

int curXceiverCount =datanode.getXceiverCount();

if (curXceiverCount >maxXceiverCount) {

throw newIOException("Xceiver count " +curXceiverCount

+" exceeds the limit of concurrentxcievers: "

+maxXceiverCount);

}

//并构造一个DataXceiver线程服务这个流式请求,即DataXceiverServer只负责

//连接的建立和以及构造并启动DataXceiver,然后具体的事情交给DataXceiver

new Daemon(datanode.threadGroup,

DataXceiver.create(peer,datanode, this))

.start();

}catch (SocketTimeoutExceptionignored) {

// wake up to see if should continue to run

}catch (AsynchronousCloseExceptionace) {

// another thread closed our listenersocket - that's expected during shutdown,

// but not in other circumstances

if (datanode.shouldRun && !datanode.shutdownForUpgrade) {

LOG.warn(datanode.getDisplayName() +":DataXceiverServer: ", ace);

}

}catch (IOException ie) {

IOUtils.cleanup(null,peer);

LOG.warn(datanode.getDisplayName() +":DataXceiverServer: ", ie);

}catch (OutOfMemoryError ie) {

IOUtils.cleanup(null,peer);

// DataNode can run out of memory if thereis too many transfers.

// Log the event, Sleep for 30 seconds,other transfers may complete by

// then.

LOG.error("DataNodeis out of memory. Will retry in 30 seconds.",ie);

try {

Thread.sleep(30 *1000);

}catch (InterruptedException e) {

// ignore

}

}catch (Throwable te) {

LOG.error(datanode.getDisplayName()

+":DataXceiverServer: Exiting due to:", te);

datanode.shouldRun =false;//其他异常直接关闭Data Node

}

}

// 清理操作退出主循环,执行关闭操作,将peerServer关闭

try {

peerServer.close();

closed = true;

} catch (IOException ie) {

LOG.warn(datanode.getDisplayName()

+" :DataXceiverServer: closeexception", ie);

}

// if in restart prep stage, notify peers beforeclosing them.

if (datanode.shutdownForUpgrade) {

restartNotifyPeers();

// Each thread needs some time to process it. If a threadneeds

// to send an OOB message to the client, but blocked onnetwork for

// long time, we need to force its termination.

LOG.info("Shuttingdown DataXceiverServer before restart");

// Allow roughly up to 2 seconds.

for (inti = 0;getNumPeers() > 0 &&i < 10; i++) {

try {

Thread.sleep(200);

}catch (InterruptedException e) {

// ignore

}

}

}

//当前Server所有连接也全部关闭

closeAllPeers();

}

五 DataXceiver

DataXceiver是Receiver的子类,DataTransferPortocal真正响应操作都是在在这里完成的。

5.1run 方法

publicvoid run() {

int opsProcessed =0;

Op op = null;

try {

dataXceiverServer.addPeer(peer, Thread.currentThread(),this);

peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);

//获取底层的输入流

InputStream input = socketIn;

try {

IOStreamPairsaslStreams = datanode.saslServer.receive(peer,socketOut,

socketIn, datanode.getXferAddress().getPort(),

datanode.getDatanodeId());

//对输入流装饰

input = newBufferedInputStream(saslStreams.in,

HdfsConstants.SMALL_BUFFER_SIZE);

//获取底层的输出流

socketOut = saslStreams.out;

}catch (InvalidMagicNumberExceptionimne) {

LOG.info("Failedto read expected encryption handshake from client " +

"at " + peer.getRemoteAddressString() +". Perhaps the client " +

"is running an older version of Hadoopwhich does not support " +

"encryption");

return;

}

//调用父类的initialize方法完成初始化操作

super.initialize(newDataInputStream(input));

do {

updateCurrentThreadName("Waiting for operation #" + (opsProcessed +1));

try {

if (opsProcessed !=0) {

assert dnConf.socketKeepaliveTimeout > 0;

peer.setReadTimeout(dnConf.socketKeepaliveTimeout);

}else {

peer.setReadTimeout(dnConf.socketTimeout);

}

op = readOp();//输入流中解析Op操作符

}catch (InterruptedIOExceptionignored) {

break;

}catch (IOException err) {

// Since we optimistically expect the next op,it's quite normal to get EOF here.

if (opsProcessed >0 &&

(errinstanceof EOFException || err instanceof ClosedChannelException)) {

if (LOG.isDebugEnabled()) {

LOG.debug("Cached" +peer + " closing after " +opsProcessed + " ops");

}

}else {

incrDatanodeNetworkErrors();

throw err;

}

break;

}

// restore normal timeout

if (opsProcessed !=0) {

peer.setReadTimeout(dnConf.socketTimeout);

}

opStartTime = now();

//调用父类processOp处理流式请求

processOp(op);

++opsProcessed;

}while ((peer !=null) &&

(!peer.isClosed() &&dnConf.socketKeepaliveTimeout >0));

} catch (Throwable t) {

Strings = datanode.getDisplayName() + ":DataXceiver error processing "

+ ((op ==null) ? "unknown" : op.name()) +" operation "

+" src: " + remoteAddress +" dst: " + localAddress;

if (op == Op.WRITE_BLOCK &&t instanceofReplicaAlreadyExistsException) {

// For WRITE_BLOCK, it is okay if thereplica already exists since

// client and replication may write thesame block to the samedatanode

// at the same time.

if (LOG.isTraceEnabled()) {

LOG.trace(s,t);

}else {

LOG.info(s +"; " + t);

}

}else if (op == Op.READ_BLOCK &&t instanceofSocketTimeoutException) {

Strings1 =

"Likely the client has stoppedreading, disconnecting it";

s1 += " (" +s + ")";

if (LOG.isTraceEnabled()) {

LOG.trace(s1,t);

}else {

LOG.info(s1 +"; " + t);

}

}else {

LOG.error(s,t);

}

} finally {

if (LOG.isDebugEnabled()) {

LOG.debug(datanode.getDisplayName() +":Number of active connections is: "

+datanode.getXceiverCount());

}

updateCurrentThreadName("Cleaning up");

if (peer !=null) {

dataXceiverServer.closePeer(peer);

IOUtils.closeStream(in);

}

}

}

六 读数据

客户端调用Sender.readBlock()方法从指定DataNode上读取数据块,请求通过IO流达到DataNode之后,DataNode的DataXceiverServer会创建一个DataXceiver对象响应请求。

ReadBlock会传递几个参数过来:

ExtendedBlock?读哪一个数据块

Token<BlockTokenIdentifier>blockToken?要读取数据的访问令牌

ClientName: 哪一个客户端来读的

longblockOffset: 从数据块什么位置读

longlength: 读取数据的长度是多少

CachingStrategycachingStrategy:缓存策略

大致流程如下:

>>创建输出流

>>创建BlockSender对象

>>发送BlockOpResponseProto响应给客户端,通知客户端已经成功接收请求,并且告知客户端当前DataNode的校验信息

>>将数据块发送给客户端,并产生一个状态码

>>关闭流

public voidreadBlock(final ExtendedBlockblock,//要读取的数据块

final Token<BlockTokenIdentifier> blockToken,//读取数据块的访问令牌

final String clientName,//客户端名称

final longblockOffset,//要读取的数据在数据块中的位置

final longlength,//读取数据的长度

final booleansendChecksum,//是否发送校验数据,数据块的读取校验工作是在客户端完成的,客户端会将结果返回给Data Node

final CachingStrategy cachingStrategy//缓存策略

) throws IOException {

previousOpClientName = clientName;

long read =0;

OutputStream baseStream =getOutputStream();

DataOutputStreamout = newDataOutputStream(newBufferedOutputStream(

baseStream, HdfsConstants.SMALL_BUFFER_SIZE));

checkAccess(out, true,block, blockToken,

Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);

// send the block

BlockSenderblockSender = null;

DatanodeRegistrationdnR =

datanode.getDNRegistrationForBP(block.getBlockPoolId());

final String clientTraceFmt =

clientName.length() > 0 && ClientTraceLog.isInfoEnabled()

? String.format(DN_CLIENTTRACE_FORMAT,localAddress, remoteAddress,

"%d", "HDFS_READ",clientName, "%d",

dnR.getDatanodeUuid(),block, "%d")

:dnR + " Served block " +block + "to " +

remoteAddress;

updateCurrentThreadName("Sending block " +block);

//创建BlockSender对象

try {

try {

blockSender = newBlockSender(block,blockOffset, length,

true, false,sendChecksum, datanode,clientTraceFmt,

cachingStrategy);

}catch(IOException e) {

Stringmsg = "opReadBlock " + block + "received exception " + e;

LOG.info(msg);

sendResponse(ERROR, msg);

throw e;

}

//发送BlockOpResponseProto响应给客户端,通知客户端请求已经成功接收,

//并且告知客户端当前的数据节点的校验信息

writeSuccessWithChecksumInfo(blockSender,new DataOutputStream(getOutputStream()));

long beginRead = Time.monotonicNow();

//将数据块发送给客户端,并且产生一个一个状态码,DataNode解析需要这个状态码

read = blockSender.sendBlock(out,baseStream, null);// send data

long duration = Time.monotonicNow() -beginRead;

if (blockSender.didSendEntireByteRange()) {

// If we sent the entire range, then weshould expect the client

// to respond with a Status enum.

try {

ClientReadStatusProto stat =ClientReadStatusProto.parseFrom(

PBHelper.vintPrefixed(in));

if (!stat.hasStatus()) {

LOG.warn("Client" +peer.getRemoteAddressString() +

" did not send a valid status codeafter reading. " +

"Will close connection.");

IOUtils.closeStream(out);

}

}catch (IOException ioe) {

LOG.debug("Errorreading client status response. Will close connection.",ioe);

IOUtils.closeStream(out);

incrDatanodeNetworkErrors();

}

}else {

IOUtils.closeStream(out);

}

datanode.metrics.incrBytesRead((int)read);

datanode.metrics.incrBlocksRead();

datanode.metrics.incrTotalReadTime(duration);

} catch ( SocketException ignored ) {

if (LOG.isTraceEnabled()) {

LOG.trace(dnR +":Ignoring exception while serving " + block + "to " +

remoteAddress, ignored);

}

// Its ok for remote side to close the connection anytime.

datanode.metrics.incrBlocksRead();

IOUtils.closeStream(out);

} catch ( IOException ioe ) {

/* What exactly should we do here?

* Earlier version shutdown()datanodeif there is disk error.

*/

if (!(ioeinstanceof SocketTimeoutException)) {

LOG.warn(dnR +":Got exception while serving " + block + "to "

+remoteAddress, ioe);

datanode.metrics.incrDatanodeNetworkErrors();

}

throw ioe;

} finally {

IOUtils.closeStream(blockSender);

}

//update metrics

datanode.metrics.addReadBlockOp(elapsed());

datanode.metrics.incrReadsFromClient(peer.isLocal(),read);

}

七 写数据

HDFS使用数据流管道方式来写数据,DFSClient通过调用Saver.writeBlock方法触发一个写数据块的请求,这个请求会传送数据到数据流管道中每一个DataNode,最后一个DataNode回复请求确认,这个确认消息逆向的通过数据管道流发送回DFSClient.DFSClient收到后,将要写入的数据切分成若干个数据包,然后依次向数据流管道发送这些数据包。

数据包首先会发送到第一个DataNode, 第一个DataNode成功接收数据后,会将数据包写入磁盘,然后将数据包写入第二个DataNode,

以此类推。当到达最后一个DataNode,会对数据进行校验。如果校验成功,会发送数据包确认消息,这个确认消息会逆向发送到DFSclient.当一个数据块中所有数据都发送完毕,并且收到确认消息,DFSClient会发送一个空的数据包标志当前数据块发送完毕,至此整个数据块发送流程结束。

DataXceiver.writeBlock方法:

这三个变量用于控制处理流程:

//判断是否是Data Node发起的写操作

finalboolean isDatanode =clientname.length() ==0;

//判断是否是客户端发起的写操作

finalboolean isClient = !isDatanode;

//判断是否写操作是数据复制操作

finalboolean isTransfer =stage ==BlockConstructionStage.

TRANSFER_RBW ||stage == BlockConstructionStage.TRANSFER

_FINALIZED;

……

//到下一个Data Node的输出流

DataOutputStreammirrorOut =null;

//下一个数据节点的输入流

DataInputStreammirrorIn =null;

//到下一个Data Node的socket

SocketmirrorSock =null;

//下一个节点的名称

StringmirrorNode =null;

//数据流管道中的第一个失败的Data Node

String firstBadLink ="";

……

如果是DataNode发起的写

/*

*打开一个BlockReceiver,从上游Data Node获取数据块

* BlockReceiver:负责从数据流管道的上游节点接收数据块,然后保存数据

*块,在当前节点,再将数据块转发到数据流管道中的下游节点;同时它还接

*收来自下游节点的响应,并把这个响应发送给数据流管道中的上游节点

*/

blockReceiver =new BlockReceiver(block, storageType, in,

peer.getRemoteAddressString(),

peer.getLocalAddressString(),

stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,

clientname, srcDataNode, datanode, requestedChecksum,

cachingStrategy, allowLazyPersist, pinning);

storageUuid =blockReceiver.getStorageUuid();

//连接到下游节点

if (targets.length >0) {

……

//建立到下游节点的Socket连接

mirrorTarget = NetUtils.createSocketAddr(mirrorNode);

mirrorSock = datanode.newSocket();

try {

……

//建立到下有节点的输出流个输入流

mirrorOut = newDataOutputStream(newBufferedOutputStream(unbufMirrorOut,

HdfsConstants.SMALL_BUFFER_SIZE));

mirrorIn = newDataInputStream(unbufMirrorIn);

//向下游节点发送数据块写入请求

if (targetPinnings !=null&& targetPinnings.length > 0) {

new Sender(mirrorOut).writeBlock(originalBlock,targetStorageTypes[0],

blockToken, clientname,targets, targetStorageTypes,srcDataNode,

stage, pipelineSize,minBytesRcvd, maxBytesRcvd,

latestGenerationStamp, requestedChecksum, cachingStrategy,

false, targetPinnings[0], targetPinnings);

}else {

new Sender(mirrorOut).writeBlock(originalBlock,targetStorageTypes[0],

blockToken, clientname,targets, targetStorageTypes,srcDataNode,

stage, pipelineSize,minBytesRcvd, maxBytesRcvd,

latestGenerationStamp, requestedChecksum, cachingStrategy,

false, false,targetPinnings);

}

mirrorOut.flush();

DataNodeFaultInjector.get().writeBlockAfterFlush();

//接收来自下游节点的请求确认,并记录请求确认状态

if (isClient) {

BlockOpResponseProto connectAck =

BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));

mirrorInStatus = connectAck.getStatus();

firstBadLink = connectAck.getFirstBadLink();

if (LOG.isDebugEnabled() ||mirrorInStatus != SUCCESS){

LOG.info("Datanode" +targets.length +

" got response for connect ack " +

" from downstream datanode withfirstbadlink as " +

firstBadLink);

}

}

}catch (IOException e) {

if (isClient) {

BlockOpResponseProto.newBuilder()

.setStatus(ERROR)

// NB: Unconditionally using the xferaddr w/o hostname

.setFirstBadLink(targets[0].getXferAddr())

.build()

.writeDelimitedTo(replyOut);

replyOut.flush();

}

IOUtils.closeStream(mirrorOut);

mirrorOut = null;

IOUtils.closeStream(mirrorIn);

mirrorIn = null;

IOUtils.closeSocket(mirrorSock);

mirrorSock = null;

if (isClient) {

LOG.error(datanode +":Exception transfering block " +

block + " to mirror " +mirrorNode + ":" +e);

throw e;

}else {

LOG.info(datanode +":Exception transfering " +

block + " to mirror " +mirrorNode +

"- continuing without the mirror", e);

incrDatanodeNetworkErrors();

}

}

}

/*

*成功建立与下游节点的输入/输出流之后,writeBlock方法就

*会调用blockReceiver.receiveBlock

*方法从上游节点接收数据块,然后数据块发送到下游节点。

*同时blockReveiver对象还会从下游节点接收数据块中

*数据包的确认消,并且将这个确认消息转发到上游节点

*/

if (blockReceiver !=null) {

StringmirrorAddr = (mirrorSock ==null) ? null :mirrorNode;

//从上游节点接收数据,然后将数据发送到下游节点

blockReceiver.receiveBlock(mirrorOut,mirrorIn, replyOut,

mirrorAddr, null,targets, false);

//对于复制操作,不需要想下游节点转发数据块,也不需要接收下游节点的的确认

//所以成功接收万数据块之后,在当前节点直接返回确认消息

if (isTransfer) {

if (LOG.isTraceEnabled()) {

LOG.trace("TRANSFER:send close-ack");

}

writeResponse(SUCCESS, null,replyOut);

}

}

/*

*成功执行了receiveBlock方法之后,会更新当前数据节点上新写入的数据块副本的时间戳

*副本文件长度等信息

*如果是数据流管道关闭或者数据块复制操作,则调用closeBlock方法,向Name Node会报

* Data Node接收了新的数据块

*/

if (isClient &&

stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {

block.setGenerationStamp(latestGenerationStamp);

block.setNumBytes(minBytesRcvd);

}

if (isDatanode ||

stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {

datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT,storageUuid);

LOG.info("Received" +block + " src: " +remoteAddress + " dest: "

+localAddress + " of size " +block.getNumBytes());

}

if(isClient) {

size = block.getNumBytes();

}

receiveBlock操作:

//启动PacketResponder线程处理确认包的接收和转发

if (isClient && !isTransfer) {

responder = new Daemon(datanode.threadGroup,

new PacketResponder(replyOut,mirrIn, downstreams));

responder.start();

}

//循环调用receivePacket接收并转发数据块中的所有数据包

while (receivePacket() >=0) { /* Receive until the last packet */ }

if (responder !=null) {

//完成数据块的写入操作后,结束PacketResponder线程

((PacketResponder)responder.getRunnable()).close();

responderClosed = true;

}

}

/**

*会从上游节点或者客户端接受数据,接受完数据之后,会做以下操作

* 1获取下一个数据包的header信息

* 2如果不是最后一个数据包,则立即处理响应

* 3向下游Data Node发送数据包

* 4如果接收了完整的数据包,并且syncBlock为true,则自己将数据同步到本地磁盘

* 5如果是最后一个节点,需要进行checksum验证

*/

private intreceivePacket() throwsIOException {

//读取下一个数据包

packetReceiver.receiveNextPacket(in);

/*

*获取数据包的header,并取得相对应该数据包在block的位置,以及

*判断是否是最后一个数据包,是否需要同步到磁盘

*/

PacketHeaderheader = packetReceiver.getHeader();

if (LOG.isDebugEnabled()){

LOG.debug("Receivingone packet for block " +block +

": " + header);

}

//获取数据块的offset

long offsetInBlock =header.getOffsetInBlock();

long seqno =header.getSeqno();

//是否是block最后一个数据包

boolean lastPacketInBlock =header.isLastPacketInBlock();

//数据包header的长度

final int len = header.getDataLen();

boolean syncBlock =header.getSyncBlock();

// avoid double sync'ing on close

if (syncBlock &&lastPacketInBlock) {

this.syncOnClose =false;

}

// update received bytes

final long firstByteInBlock = offsetInBlock;

offsetInBlock += len;

if (replicaInfo.getNumBytes() <offsetInBlock) {

replicaInfo.setNumBytes(offsetInBlock);

}

//如果不是数据流管道中最后一个数据节点,立即处理响应

if (responder !=null && !syncBlock && !shouldVerifyChecksum()) {

((PacketResponder)responder.getRunnable()).enqueue(seqno,

lastPacketInBlock, offsetInBlock, Status.SUCCESS);

}

//如果下游节点不为空,则向下游Data Node发送数据包

if (mirrorOut !=null && !mirrorError) {

try {

long begin = Time.monotonicNow();

packetReceiver.mirrorPacketTo(mirrorOut);

mirrorOut.flush();

long duration = Time.monotonicNow() -begin;

}catch (IOException e) {

handleMirrorOutError(e);

}

}

ByteBuffer dataBuf = packetReceiver.getDataSlice();

ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();

if (lastPacketInBlock ||len == 0) {

if(LOG.isDebugEnabled()) {

LOG.debug("Receivingan empty packet or the end of the block " +block);

}

//如果接收了完整的数据块,并且syncBlock为true,则;自己将数据同步到磁盘

if (syncBlock) {

flushOrSync(true);

}

} else {

//如果当前节点是数据流最后一个节点,则验证数据包的checksum

final int checksumLen = diskChecksum.getChecksumSize(len);

final int checksumReceivedLen = checksumBuf.capacity();

if (checksumReceivedLen >0 && checksumReceivedLen !=checksumLen) {

throw newIOException("Invalid checksum length: received length is "

+checksumReceivedLen + " but expected length is " + checksumLen);

}

if (checksumReceivedLen >0 && shouldVerifyChecksum()) {

try {

//验证数据包的checksum

verifyChunks(dataBuf,checksumBuf);

}catch (IOException ioe) {

// checksum error detected locally. thereis no reason to continue.

if (responder !=null) {

try {

((PacketResponder)responder.getRunnable()).enqueue(seqno,

lastPacketInBlock, offsetInBlock,

Status.ERROR_CHECKSUM);

// Wait until the responder sends back theresponse

// and interrupt this thread.

Thread.sleep(3000);

}catch (InterruptedException e) { }

}

throw newIOException("Terminating due to a checksum error." +ioe);

}

}

if (checksumReceivedLen ==0 && !streams.isTransientStorage()) {

// checksum is missing, need to calculateit

checksumBuf = ByteBuffer.allocate(checksumLen);

diskChecksum.calculateChunkedSums(dataBuf,checksumBuf);

}

// by this point, the data in the buffer uses the diskchecksum

final booleanshouldNotWriteChecksum = checksumReceivedLen == 0

&&streams.isTransientStorage();

try {

long onDiskLen =replicaInfo.getBytesOnDisk();

if (onDiskLen<offsetInBlock) {

//finally write to the disk :

if (onDiskLen %bytesPerChecksum != 0) {

// prepare to overwrite last checksum

adjustCrcFilePosition();

}

// If this is a partial chunk, then read inpre-existing checksum

Checksum partialCrc =null;

if (!shouldNotWriteChecksum &&firstByteInBlock % bytesPerChecksum != 0) {

if (LOG.isDebugEnabled()) {

LOG.debug("receivePacketfor " +block

+": bytesPerChecksum=" + bytesPerChecksum

+" does not dividefirstByteInBlock=" + firstByteInBlock);

}

long offsetInChecksum = BlockMetadataHeader.getHeaderSize()+

onDiskLen / bytesPerChecksum *checksumSize;

partialCrc = computePartialChunkCrc(onDiskLen,offsetInChecksum);

}

int startByteToDisk = (int)(onDiskLen-firstByteInBlock)

+dataBuf.arrayOffset() +dataBuf.position();

int numBytesToDisk = (int)(offsetInBlock-onDiskLen);

//

long begin = Time.monotonicNow();

//写入数据和checksum

out.write(dataBuf.array(),startByteToDisk, numBytesToDisk);

long duration = Time.monotonicNow() -begin;

if (duration >datanodeSlowLogThresholdMs) {

LOG.warn("SlowBlockReceiver write data to disk cost:" +duration

+"ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");

}

final byte[]lastCrc;

if (shouldNotWriteChecksum) {

lastCrc = null;

}else if (partialCrc !=null) {

// If this is a partial chunk, then verifythat this is the only

// chunk in the packet. Calculate new crcfor this chunk.

if (len >bytesPerChecksum) {

throw newIOException("Unexpected packet data length for "

+ block + "from " +inAddr + ": a partial chunk must be "

+" sent in an individual packet (datalength = " +len

+ " > bytesPerChecksum = " + bytesPerChecksum + ")");

}

partialCrc.update(dataBuf.array(),startByteToDisk, numBytesToDisk);

byte[] buf =FSOutputSummer.convertToByteStream(partialCrc,checksumSize);

lastCrc = copyLastChunkChecksum(buf,checksumSize, buf.length);

checksumOut.write(buf);

if(LOG.isDebugEnabled()) {

LOG.debug("Writingout partial crc for data len " +len);

}

partialCrc = null;

}else {

// write checksum

final intoffset = checksumBuf.arrayOffset() +

checksumBuf.position();

final intend = offset +checksumLen;

lastCrc = copyLastChunkChecksum(checksumBuf.array(),checksumSize,

end);

checksumOut.write(checksumBuf.array(),offset, checksumLen);

}

/// flush entire packet, sync if requested

flushOrSync(syncBlock);

replicaInfo.setLastChecksumAndDataLen(offsetInBlock,lastCrc);

datanode.metrics.incrBytesWritten(len);

datanode.metrics.incrTotalWriteTime(duration);

manageWriterOsCache(offsetInBlock);

}

}catch (IOException iex) {

datanode.checkDiskErrorAsync();

throw iex;

}

}

// if sync was requested, put in queue for pending ackshere

// (after the fsync finished)

if (responder !=null && (syncBlock ||shouldVerifyChecksum())) {

((PacketResponder)responder.getRunnable()).enqueue(seqno,

lastPacketInBlock, offsetInBlock, Status.SUCCESS);

}

/*

* Send in-progress responses for thereplaceBlock() calls back to caller to

* avoid timeouts due to balancerthrottling. HDFS-6247

*/

if (isReplaceBlock

&& (Time.monotonicNow()-lastResponseTime > responseInterval)) {

BlockOpResponseProto.Builder response =BlockOpResponseProto.newBuilder()

.setStatus(Status.IN_PROGRESS);

response.build().writeDelimitedTo(replyOut);

replyOut.flush();

lastResponseTime = Time.monotonicNow();

}

if (throttler !=null) { // throttle I/O

throttler.throttle(len);

}

return lastPacketInBlock?-1:len;

}

DataNode的流式接口相关推荐

  1. java流式接口,JAVA流式计算

    JAVA流式计算 流的简单介绍 Java 8 中,引入了流(Stream)的概念,利用提供的Stream API,我们可以方便的操作集合数据,这种方式很类似于使用SQL对数据库的操作. 如何生成流 利 ...

  2. DataNode启动流程源码分析

    我们都知道在Hadoop hdfs文件系统中,Datanode是负责hdfs文件对应的数据块存储管理的组件,其会在启动时向NameNode汇报其上拥有的数据块,以及周期性心跳并接收来自NameNode ...

  3. 科大迅飞语音听写(流式版)WebAPI,Web前端、H5调用 语音识别,语音搜索,语音听写

    前言 由于公司有个Web项目需要用到语音搜索功能,找了一些第三方库都不太理想,要么语音识别速度很慢,要么不能精确识别等等,最后选择了迅飞语音(迅飞语音听写(流式版)WebAPI).迅飞语音相对来说做得 ...

  4. c++ grpc 实现一个传图服务(异步方式,流式接收与发送)

    ~!转载请注明出处 异步传输官方示例只给了普通Unary元对象的传输,没有流式传输示例,经过摸索调试,实现了grpc的异步流式传输(目前只是单向流,服务端推流至客户端,或者客户端上送流至服务端). 1 ...

  5. chatGPT流式回复是怎么实现的

    chatGPT流式回复是怎么实现的 先说结论: chatGPT的流式回复用的就是HTTP请求方案中的server-send-event流式接口,也就是服务端向客户端推流数据. 那eventStream ...

  6. java 流常用接口_java 8新特性5--使用集合流式API

    PS:向公众号发送关键字可以搜索文章哦! 使用集合的流式API 直接上代码了,常用api都有了,要点都在注释中 苹果类: packagejava8.stream;/***@authorqiang.xi ...

  7. Java8 Stream接口流式方法:map操作、filter操作以及flatMap操作

    点击关注公众号,利用碎片时间学习 关于stream 流式操作,在rt.jar 包里面,ReferencePipeline管道方式操作数据 下面集成所有操作方法,利用这些流,处理大数据的方式,效率提升明 ...

  8. java流式编程(六)Collector接口

    目录 一.接口定义 二.接口泛型 一.接口定义 public interface Collector<T, A, R> {Supplier<A> supplier();BiCo ...

  9. Java8 Stream流式操作接口详解

    stream是用于集合使用的流式操作,可使用collection.stream获取流 default Stream<E> stream() {return StreamSupport.st ...

最新文章

  1. RHCSA 系列(六): 使用 Parted 和 SSM 来配置和加密系统存储
  2. VUE -- 自定义控件(标签)
  3. 有关MongoDB数据库设计的问题
  4. Fedora的ifconfig命令
  5. CSS类命名的语义化VS结构化方式
  6. 《vSphere性能设计:性能密集场景下CPU、内存、存储及网络的最佳设计实践》一3.3.3 供应实验室...
  7. RHEL7.0 配置网络IP的三种方法
  8. C#中数据类型转换-显式转换
  9. 期刊投稿状态_追踪期刊在线系统投稿状态(十七)
  10. 虚拟化关键技术及解决方案
  11. OS + Linux RedHat 7 / redhat 7 configuration
  12. Java JDK 10:下一代 Java 有哪些新特性?
  13. 关于TeamViewer中的ITbrain
  14. BarTender对单个二维码/文本添加多个域(数据库字段)/嵌入的数据,并对数据添加后缀等处理;扩展更多域
  15. android手游开发三维地图高清版,全球地图3D模拟器
  16. struts2拦截器的一个使用实例
  17. CTE 递归查询全解
  18. 第1章第15节:导出:如何将幻灯片保存为图片格式的文件 [PowerPoint精美幻灯片实战教程]
  19. 爬取B站前两千位up主的粉丝数
  20. 《研究生科研能力训练与培养》

热门文章

  1. Nacos简介和安装
  2. Python机器学习:多项式回归与模型泛化008模型泛化与岭回归
  3. 数据科学入门与实战:Matplotlib绘图hist
  4. java 判断值是否设置,获取Java中“-非法访问”设置的当前值
  5. Linux 文件系统相关的命令
  6. Mybatis plus 开启日志
  7. 喵喵的支付宝小程序登录
  8. java Clob转CLOB_Java获取Oracle中CLOB字段转换成String
  9. 解决python读取pickle报错ValueError: unsupported pickle protocol: 5
  10. 解决关闭hbase时stop-hbase.sh报错stopping hbasecat: /tmp/hbase-xxxx-master.pid: No such file or directory