..........................................

  • 一.前言
  • 二.调用流程
  • 三.Sender类
  • 四.Receiver 类

一.前言

DataTransferProtocol有两个子类——Sender和Receiver。
其中Sender类封装了DataTransferProtocol的调用操作, 用于发起流式接口请求;
Receiver类封装了DataTransferProtocol的执行操作, 用于响应流式接口请求。

二.调用流程

DFSClient发起了一个DataTransferProtocol.readBlock()操作, 那么DFSClient会调用Sender类将这个请求序列化, 并传输给远程的Receiver。 远程的Receiver类接收到这个请求后, 会反序列化请求, 然后调用执行代码执行读取操作。

三.Sender类

Sender类用于发起DataTransferProtocol请求。Sender类首先使用ProtoBuf将参数序列化, 然后用一个枚举类Op描述调用的是什么方法, 最后将序列化后的参数和Op一起发送给接收方。

@Overridepublic void readBlock(final ExtendedBlock blk,final Token<BlockTokenIdentifier> blockToken,final String clientName,final long blockOffset,final long length,final boolean sendChecksum,final CachingStrategy cachingStrategy) throws IOException {//将所有DataTransferProtocol.readBlock()方法中的参数用ProtoBuf序列化OpReadBlockProto proto = OpReadBlockProto.newBuilder().setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName,blockToken)).setOffset(blockOffset).setLen(length).setSendChecksums(sendChecksum).setCachingStrategy(getCachingStrategy(cachingStrategy)).build();//调用send()方法发送Op.READ_BLOCK描述当前调用的是readBlock()方法同时发送序列化后的参数protosend(out, Op.READ_BLOCK, proto);}

调用了send()方法将Op对象以及序列化后的参数发送到IO流中。

  private static void send(final DataOutputStream out, final Op opcode,final Message proto) throws IOException {LOG.trace("Sending DataTransferOp {}: {}",  proto.getClass().getSimpleName(), proto);//调用op ()方法写入版本号, 然后再写入操作码Opop(out, opcode);//写入序列化后的参数proto.writeDelimitedTo(out);out.flush();}//op()方法用于向输出流中写入DataTransferProtocol版本号, 然后再写入操作码Opprivate static void op(final DataOutput out, final Op op) throws IOException {out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);op.write(out);}

Op是一个枚举类型, 使用一个byte类型的变量code标识操作码。 一个操作码对应DataTransferProtocol接口中的一个方法, 例如操作码80对应DataTransferProtocol.writeBlock()方法。

public enum Op {//操作码80, 对应DataTransferProtocol.writeBlock()方法WRITE_BLOCK((byte)80),//操作码81, 对应DataTransferProtocol.readBlock()方法READ_BLOCK((byte)81),READ_METADATA((byte)82),//操作码83, 对应DataTransferProtocol.replaceBlock()方法REPLACE_BLOCK((byte)83),//操作码84, 对应DataTransferProtocol.copyBlock()方法COPY_BLOCK((byte)84),//操作码85, 对应DataTransferProtocol.blockChecksum()方法 BLOCK_CHECKSUM((byte)85),//操作码86, 对应DataTransferProtocol.transferBlock()方法TRANSFER_BLOCK((byte)86),REQUEST_SHORT_CIRCUIT_FDS((byte)87),RELEASE_SHORT_CIRCUIT_FDS((byte)88),REQUEST_SHORT_CIRCUIT_SHM((byte)89),BLOCK_GROUP_CHECKSUM((byte)90),CUSTOM((byte)127);}

通过调用Sender类发起一个readBlock()操作时, Sender类会将读取数据块的请求通过IO流发送给远程的Datanode。 Datanode接收到这个请求后, 会调用Receiver类的对应方法执行readBlock()操作。

读取数据块请求的格式 :

首先是一个short类型的DataTransferProtocol版本号, 然后是byte类型的Op操作码, 最后是通过ProtoBuf序列化的readBlock()请求参数。

short -DataTransferProtocol版本号| byte -Op操作码(OpCode) |方法的序列化参数|

四.Receiver 类

Receiver是一个抽象类, 它提供了解析Sender请求操作码的readOp()方法, 以及处理Sender请求的processOp()方法。
Receiver类封装了DataTransferProtocol的执行操作, 用于执行远程节点发起的流式接口请求。

  /** Read an Op.  It also checks protocol version. */protected final Op readOp() throws IOException {//先从数据流中读入DataTransferProtocol版本号, 并与当前版本号进行比对final short version = in.readShort();// 对比版本 , hadoop 3.2.1的版本是 --> 28if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {throw new IOException( "Version Mismatch (Expected: " +DataTransferProtocol.DATA_TRANSFER_VERSION  +", Received: " +  version + " )");}//然后从数据流中读入Op, 并返回return Op.read(in);}

processOp()方法接收readOp()解析出的Op操作码作为参数, 在run() 方法中, 不断循环调用, 针对不同的操作码调用指定的方法。

/** Process op by the corresponding method. */protected final void processOp(Op op) throws IOException {//根据不同的Op操作码调用指定的方法响应switch(op) {case READ_BLOCK:opReadBlock();break;case WRITE_BLOCK:opWriteBlock(in);break;case REPLACE_BLOCK:opReplaceBlock(in);break;case COPY_BLOCK:opCopyBlock(in);break;case BLOCK_CHECKSUM:opBlockChecksum(in);break;case BLOCK_GROUP_CHECKSUM:opStripedBlockChecksum(in);break;case TRANSFER_BLOCK:opTransferBlock(in);break;case REQUEST_SHORT_CIRCUIT_FDS:opRequestShortCircuitFds(in);break;case RELEASE_SHORT_CIRCUIT_FDS:opReleaseShortCircuitFds(in);break;case REQUEST_SHORT_CIRCUIT_SHM:opRequestShortCircuitShm(in);break;default:throw new IOException("Unknown op " + op + " in data stream");}}

opReadBlock()方法首先从IO流中解析出序列化的Receiver.readBlock()方法的参数, 然后对解析出的参数进行反序列化, 最后调用Receiver.readBlock()方法执行读取操作。

/** Receive OP_READ_BLOCK */private void opReadBlock() throws IOException {//从IO流中读取序列化的readBlock()参数OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));TraceScope traceScope = continueTraceSpan(proto.getHeader(),proto.getClass().getSimpleName());try {//反序列化参数, 然后调用子类DataXceiver的readBlock()方法执行读取操作readBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),proto.getHeader().getClientName(),proto.getOffset(),proto.getLen(),proto.getSendChecksums(),(proto.hasCachingStrategy() ?getCachingStrategy(proto.getCachingStrategy()) :CachingStrategy.newDefaultStrategy()));} finally {if (traceScope != null) traceScope.close();}}

参考:

Hadoop 2.X HDFS源码剖析 – 徐鹏

Hadoop3.2.1 【 HDFS 】源码分析 Sender和Receiver解析相关推荐

  1. HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程

    在<HDFS源码分析心跳汇报之数据结构初始化>一文中,我们了解到HDFS心跳相关的BlockPoolManager.BPOfferService.BPServiceActor三者之间的关系 ...

  2. HDFS源码分析心跳汇报之数据结构初始化

    在<HDFS源码分析心跳汇报之整体结构>一文中,我们详细了解了HDFS中关于心跳的整体结构,知道了BlockPoolManager.BPOfferService和BPServiceActo ...

  3. HDFS源码分析DataXceiver之整体流程

    在<HDFS源码分析之DataXceiverServer>一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer.它被用于接收来自客户端或其他数据节 ...

  4. hdfs源码分析第二弹

    以写文件为例,串联整个流程的源码: FSDataOutputStream out = fs.create(outFile); 1. DistributedFileSystem 继承并实现了FileSy ...

  5. Okhttp源码分析以及Google Gson解析json数据实例

    Okhttp Github的Okhttp OkHttp是一个高效的HTTP客户端,它有以下默认特性: 支持HTTP/2,允许所有同一个主机地址的请求共享同一个socket连接 透明的GZIP压缩减少响 ...

  6. Lua源码分析 - 虚拟机篇 - 语义解析之Opcode执行(18)

    目录 一.虚拟机篇 - 指令执行状态机luaV_execute 二.虚拟机篇 - 状态机的具体实现原理 一.虚拟机篇 - 指令执行状态机luaV_execute 在<Lua源码分析 - 主流程篇 ...

  7. 【Nginx源码分析】Nginx配置文件解析(一)

    运营研发团队 李乐 配置文件是nginx的基础,对于学习nginx源码甚至开发nginx模块的同学来说更是必须深究.本文将从源码从此深入分析nginx配置文件的解析,配置存储,与配置查找. 看本文之前 ...

  8. spring 源码分析(1)-xml文件解析

    我们在最开始接触spring的时候,看到不少书spring入门的例子如下 ApplicationContext atx = new ClassPathXmlApplicationContext(&qu ...

  9. rocketmq之源码分析及关键技术解析目录(一)

    2019独角兽企业重金招聘Python工程师标准>>> 最近因为工作时间允许和需要,决定对阿里贡献的rocketmq进行源码的整体梳理及设计解析,在整体解析完后再对技术实现中用到的设 ...

最新文章

  1. 解决linux下中文文件名显示乱码问题
  2. mysql实时读写_[DataBase] MySql 查看实时日志
  3. csu 1548: Design road (三分)
  4. mvvm 后端_ZK实际应用:MVVM –与ZK客户端API一起使用
  5. 非常全面的阿里的Java面试题目,涵盖Java基础+高级+架构
  6. Java LocalDate类| isSupported()方法与示例
  7. Python弹窗提示警告框MessageBox
  8. 先收藏!关于Java类、接口、枚举的知识点大汇总
  9. 数据中心智能化运维之路
  10. coherence初识
  11. oracle替换指定字符串字符_实例:替换方框内字符串内容
  12. 如何使用MonoDevelop调试Unity3D脚本
  13. 友华PT925E,电信天翼网关3.0,光猫超级密码获取最简便方式
  14. VS2003 搜索直接导致卡死问题
  15. TortoiseGit拉取gitee代码
  16. 四色印刷和专色印刷的区别是什么?
  17. java maven导入_Eclipse导入Maven项目详解(新手初学)
  18. NSIS脚本学习:创建 MUI 界面使用的自定义语言包文件(nlf nsh)
  19. 优化后的sql 语句 oracle
  20. docker容器启动后无法访问宿主机host

热门文章

  1. 【网络Ping不通如何解决?】
  2. awk命令 去掉重复行
  3. 一大波猪年元素的二维码助你跨猪年!
  4. HDU2099-整除的尾数
  5. 北京工业大学外网访问知网数据库方法
  6. jQuery-动画效果(图片抽奖案例)
  7. xmind zen 同步问题解决 坚果云
  8. 需要精读3遍的8个健身知识
  9. PHPstudy V8 安装PHPAdmin
  10. 黑马程序员_源自梦想 GUI