原文地址:http://yanbohappy.sinaapp.com/?p=110 

  最新版本的Hadoop代码中已经默认了Protocol buffer(以下简称PB,http://code.google.com/p/protobuf/)作为RPC的默认实现,原来的WritableRpcEngine已经被淘汰了。来自cloudera的Aaron T. Myers在邮件中这样说的“since PB can provide support for evolving protocols in a compatible fashion.”

首先要明白PB是什么,PB是Google开源的一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化/反序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。目前提供了 C++、Java、Python 三种语言的 API。简单理解就是某个进程把一些结构化数据通过网络通信的形式传递给另外一个进程(典型应用就是RPC);或者某个进程要把某些结构化数据持久化存储到磁盘上(这个有点类似于在Mongodb中的BSON格式)。对于存储的这个例子来说,使用PB和XML,JSON相比的缺点就是存储在磁盘上的数据用户是无法理解的,除非用PB反序列化之后才行,这个有点类似于IDL。优点就是序列化/反序列化速度快,网络或者磁盘IO传输的数据少,这个在Data-Intensive Scalable Computing中是非常重要的。

Hadoop使用PB作为RPC实现的另外一个原因是PB的语言、平台无关性。在mailing list里听说过社区的人有这样的考虑:就是现在每个MapReduce task都是在一个JVM虚拟机上运行的(即使是Streaming的模式,MR任务的数据流也是通过JVM与NN或者DN进行RPC交换的),JVM最严重的问题就是内存,例如OOM。我看社区里有人讨论说如果用PB这样的RPC实现,那么每个MR task都可以直接与NN或者DN进行RPC交换了,这样就可以用C/C++来实现每一个MR task了。百度做的HCE(https://issues.apache.org/jira/browse/MAPREDUCE-1270)和这种思路有点类似,但是由于当时的Hadoop RPC通信还是通过WritableRpcEngine来实现的,所以MR task还是没有摆脱通过本地的JVM代理与NN或者DN通信的束缚,因为Child JVM Process还是存在的,还是由它来设置运行时环境和RPC交互。

关于PB的原理和实现,请大家参考http://code.google.com/p/protobuf/或者http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/?ca=drs-tp4608,本文不再赘述。

下面来看看Hadoop代码中的RPC是如何实现的。RPC就是一台机器上的某个进程要调用另外一台机器上的某个进程的方法,中间通信传输的就是类似于“方法名、参数1、参数2……”这样的信息,是结构化的。同时通信除了这些RPC实体以外,还要有header等。

我们要定义一种PB实现的RPC传输格式,首先要定义相应的.proto文件,在Hadoop common工程里,这些文件放在D:\Hadoop-trunk\hadoop-common-project\hadoop-common\src\main\proto目录下;在Hadoop HDFS工程里这些文件放在D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目录下,以此类推。Hadoop编译脚本会调用相应的protoc二进制程序来编译这些以.proto结尾的文件,生成相应的.java文件。

以D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目录下的ClientNamenodeProtocol.proto为例说明。文件最开始定义了一些参数:

option java_package = "org.apache.hadoop.hdfs.protocol.proto";option java_outer_classname = "ClientNamenodeProtocolProtos";option java_generic_services = true;option java_generate_equals_and_hash = true;

这个表示这个.proto文件经过protoc编译之后会生成org.apache.hadoop.hdfs.protocol.proto这个包下面的ClientNamenodeProtocolProtos.java类文件,那么在Hadoop源码里就可以调用这个类里的方法了。

这个文件的主体主要是两种数据类型message和rpc,仔细看下这个文件就知道了,message就是这个ClientNamenodeProtocol协议中传输的结构体,rpc就是调用的方法。那么这两种类型在经过编译之后会生成什么呢?

编译之后,在Hadoop-trunk/hadoop-hdfs-project/hadoop-hdfs/target/generated-sources/java/org/apache/hadoop/hdfs/protocol/proto目录里生成了ClientNamenodeProtocolProtos.java文件,里面把message都包装成了类,而把rpc都包装成了方法。这个文件是由PB编译器自动生成的,所以不能修改。

有了这些java类之后,我们就可以看看在Server端是怎么实现RPC的了。首先还是NameNode初始化的流程,会调用到rpcServer = createRpcServer(conf)来创建RPC server。下面看看NameNodeRpcServer的构造函数里都做了哪些工作:

public NameNodeRpcServer(Configuration conf, NameNode nn)throws IOException {this.nn = nn;this.namesystem = nn.getNamesystem();this.metrics = NameNode.getNameNodeMetrics();int handlerCount =conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,DFS_NAMENODE_HANDLER_COUNT_DEFAULT);InetSocketAddress socAddr = nn.getRpcServerAddress(conf);//设置ProtolEngine,目前只支持PB协议。表示接收到的RPC协议如果是ClientNamenodeProtocolPB,//那么处理这个RPC协议的引擎是ProtobufRpcEngineRPC.setProtocolEngine(conf,ClientNamenodeProtocolPB.class,ProtobufRpcEngine.class);//声明一个ClientNamenodeProtocolServerSideTranslatorPB,//这个类负责把Server接收到的PB格式对象的数据,拼装成NameNode内村中的数据类型,//调用NameNodeRpcServer类中相应的逻辑,然后再把执行结果拼装成PB格式。
    ClientNamenodeProtocolServerSideTranslatorPBclientProtocolServerTranslator =new ClientNamenodeProtocolServerSideTranslatorPB(this);BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =new DatanodeProtocolServerSideTranslatorPB(this);BlockingService dnProtoPbService = DatanodeProtocolService.newReflectiveBlockingService(dnProtoPbTranslator);NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =new NamenodeProtocolServerSideTranslatorPB(this);BlockingService NNPbService = NamenodeProtocolService.newReflectiveBlockingService(namenodeProtocolXlator);RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator =new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService.newReflectiveBlockingService(refreshAuthPolicyXlator);RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator =new RefreshUserMappingsProtocolServerSideTranslatorPB(this);BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService.newReflectiveBlockingService(refreshUserMappingXlator);GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =new GetUserMappingsProtocolServerSideTranslatorPB(this);BlockingService getUserMappingService = GetUserMappingsProtocolService.newReflectiveBlockingService(getUserMappingXlator);HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =new HAServiceProtocolServerSideTranslatorPB(this);BlockingService haPbService = HAServiceProtocolService.newReflectiveBlockingService(haServiceProtocolXlator);WritableRpcEngine.ensureInitialized();InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);if (dnSocketAddr != null) {int serviceHandlerCount =conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);// Add all the RPC protocols that the namenode implementsthis.serviceRpcServer =RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class, clientNNPbService,dnSocketAddr.getHostName(), dnSocketAddr.getPort(),serviceHandlerCount,false, conf, namesystem.getDelegationTokenSecretManager());DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,serviceRpcServer);DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,serviceRpcServer);DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,serviceRpcServer);DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,refreshAuthService, serviceRpcServer);DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,refreshUserMappingService, serviceRpcServer);DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,getUserMappingService, serviceRpcServer);this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();nn.setRpcServiceServerAddress(conf, serviceRPCAddress);} else {serviceRpcServer = null;serviceRPCAddress = null;}// Add all the RPC protocols that the namenode implementsthis.clientRpcServer = RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,clientNNPbService, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf,namesystem.getDelegationTokenSecretManager());DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,clientRpcServer);DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,clientRpcServer);DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,clientRpcServer);DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,refreshAuthService, clientRpcServer);DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,refreshUserMappingService, clientRpcServer);DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,getUserMappingService, clientRpcServer);// set service-level authorization security policyif (serviceAuthEnabled =conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());if (this.serviceRpcServer != null) {this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());}}// The rpc-server port can be ephemeral... ensure we have the correct infothis.clientRpcAddress = this.clientRpcServer.getListenerAddress();nn.setRpcServerAddress(conf, clientRpcAddress);this.minimumDataNodeVersion = conf.get(DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);}

ClientNamenodeProtocol是protoc编译生成的ClientNamenodeProtocolProtos类中的inner class。

public static com.google.protobuf.BlockingServicenewReflectiveBlockingService(final BlockingInterface impl) {……}

这个方法也是由protoc编译器自动生成的。这个方法会返回一个com.google.protobuf.BlockingService类型的对象,这种类型的对象定义了RPC的各种服务,后面会讲。

this.clientRpcServer = RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,clientNNPbService, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf,namesystem.getDelegationTokenSecretManager());

这个RPC.getServer()函数生成一个Server对象,负责接收网络连接,读取数据,调用处理数据函数,返回结果。这个Server对象里有Listener, Handler, Responder内部类,分别开启多个线程负责监听、读取、处理和返回结果。前两个参数表示如果RPC发送过来的是ClientNamenodeProtocolPB协议,那么负责处理这个协议的服务(com.google.protobuf.BlockingService类型的对象)就是clientNNPbService。

这个RPC.getServer()会经过层层调用,因为现在默认的RPCEngine是ProtobufRpcEngine(ProtobufRpcEngine.java),就会调用到下面这个函数,在这生成了一个Server对象,就是用于接收client端RPC请求,处理,回复的Server。这个Server对象是一个纯粹的网络服务的Server,在RPC中起到基础网络IO服务的作用。

public RPC.Server getServer(Class<?> protocol, Object protocolImpl,String bindAddress, int port, int numHandlers, int numReaders,int queueSizePerHandler, boolean verbose, Configuration conf,SecretManager<? extends TokenIdentifier> secretManager,String portRangeConfig)throws IOException {return new Server(protocol, protocolImpl, conf, bindAddress, port,numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,portRangeConfig);}

现在该用到的东西都生成好了,就要看看client端来了一个RPC请求之后,Server端是怎么处理的呢?

Server里的Reader线程也是基于Selector的异步IO模式,每次Select选出一个SelectionKey之后,会调用SelectionKey.attachment()把这个SelectionKey所attach的Connection对象获取,然后执行对应的readAndProcess()方法,把这个SelectionKey所对应的管道上的网络IO数据读入缓冲区。readAndProcess()方法会层层调用到Server.processData()方法,在这个方法内部,会把刚才从网络IO中读取的数据反序列化成对象rpcRequest对象。rpcRequest对象的类型是继承自Writable类型的子类的对象,也就是说可以序列化/反序列化的类。这里rpcRequest对象里包含的RPC请求的内容对象是由.proto文件中Message生成的类,也就是说PB框架自动编译出来的类,后面可以通过调用这个类的get方法获取RPC中真正传输的数据。之后把生成的rpcRequest对象放到一个Call对象里面,再把Call对象放到队列Server.callQueue里面。至此网络服务器的Reader线程做的工作就OK了。

下面看看Handler线程是怎么处理的。Handler线程默认有10个,所以处理逻辑是多线程的。每个Handler线程会从刚才提到的callQueue中取一个Call对象,然后调用Server.call()方法执行这个Call对象中蕴含的RPC请求。Server.call()->RPC.Server.call()->Server.getRpcInvoker()->ProtobufRpcInvoker.call()在最后这个call()函数里面真正执行喽。。。。重点看这个函数,首先校验这个请求发过来的数据是不是合理的。然后就是获取实现这个协议的服务。实现协议的服务在初始化的时候已经注册过了,就是前面说的那个com.google.protobuf.BlockingService类型的对象,例如:

BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);

这个就是实现Client和NameNode之间的ClientNamenodeProtocol协议的服务。当然还有dnProtoPbService, NNPbService, refreshAuthService, refreshUserMappingService, haPbService等等这些不同的服务。

这个Service获取了之后,通过调用这句代码

result = service.callBlockingMethod(methodDescriptor, null, param);

就会执行这个RPC请求的逻辑。

再往深入执行就要涉及到google protocol buffer内部的东西了,这个service对象会把相应的方法调用转移到一个继承自BlockingInterface接口的实现类上。Service的真正实现类就是clientProtocolServerTranslator,是newReflectiveBlockingService()这个函数的参数。

BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);

这个初始化过程中的参数,也就是service.callBlockingMethod()真正调用的是clientProtocolServerTranslator中对应的方法。这一点可以通过由protoc自动编译生成的代码中看出:

public static com.google.protobuf.BlockingServicenewReflectiveBlockingService(final BlockingInterface impl) {return new com.google.protobuf.BlockingService() {public final com.google.protobuf.Descriptors.ServiceDescriptorgetDescriptorForType() {return getDescriptor();}public final com.google.protobuf.Message callBlockingMethod(com.google.protobuf.Descriptors.MethodDescriptor method,com.google.protobuf.RpcController controller,com.google.protobuf.Message request)throws com.google.protobuf.ServiceException {if (method.getService() != getDescriptor()) {throw new java.lang.IllegalArgumentException("Service.callBlockingMethod() given method descriptor for " +"wrong service type.");}switch(method.getIndex()) {case 0:return impl.getBlockLocations(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto)request);case 1:return impl.getServerDefaults(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto)request);case 2:return impl.create(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto)request);case 3:return impl.append(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto)request);……
}
……
}

上面就是proto编译生成的ClientNamenodeProtocolProtos.java文件,从中可以看出对callBlockingMethod()方法的调用都是转移到BlockingInterface impl上面了。

然后我们看看clientProtocolServerTranslator是怎么进一步执行的。下面以getBlockLocations()函数为例说明:

public GetBlockLocationsResponseProto getBlockLocations(RpcController controller, GetBlockLocationsRequestProto req)throws ServiceException {try {//下面这个server是由NameNodeRpcServer类生成的对象,定义了HDFS元数据操作逻辑。LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(),req.getLength());//由于server返回的是NameNode内存中的数据结构,要把这个结果通过RPC传回client端,//那么我们需要利用PB框架提供的对应Message的Builder类,把内存中的数据结构通过这个接口序列化。Builder builder = GetBlockLocationsResponseProto.newBuilder();if (b != null) {builder.setLocations(PBHelper.convert(b)).build();}return builder.build();} catch (IOException e) {throw new ServiceException(e);}}

至此,Hadoop的RPC流程Server端已经分析结束,不过这个是正确执行的流程。如果中间抛出了异常呢?还是以上面这个getBlockLocations()函数为例,如果元数据操作逻辑NameNodeRpcServer里面抛出IOException,那么它都会把它封装成ServiceException,然后一路传递给client端。在client端,会通过ProtobufHelper.getRemoteException()把封装在ServiceException中的IOException获取出来。

转载于:https://www.cnblogs.com/davidwang456/p/4773914.html

Hadoop基于Protocol Buffer的RPC实现代码分析-Server端--转载相关推荐

  1. Brpc代码分析-Server端(九)

    2021SC@SDUSC 回到CallMethod函数. 接下来将设置各种成员,如超时时间,response等,因为demo中场景没有设置loadbalancer,所以是SingleServer 通过 ...

  2. 基于windows PE文件的恶意代码分析;使用SystemInternal工具与内核调试器研究windows用户空间与内核空间...

    基于windows PE文件的恶意代码分析:使用SystemInternal工具与内核调试器研究windows用户空间与内核空间 ******************** 既然本篇的主角是PE文件,那 ...

  3. 基于NXP的蓝牙BLE协议栈代码分析

    重要概念点总结 1.profile 规范.包含有service服务,如电量. 2.service,每一个服务可能包含一个或多个特征值. 3.characteristic 特征值.通信载体,电量为20% ...

  4. PX4代码学习系列博客(6)——offboard模式位置控制代码分析(之前转载过,这是第二次转载了)

    我刚刚发现这篇文章去年八月份的时候转载过一次了 https://blog.csdn.net/sinat_16643223/article/details/107874349 转载自:https://b ...

  5. 【大数据Hadoop】HDFS-HA模式下Checkpointer机制代码分析

    Checkpointer机制 概览 源码解读 相关配置项 源码解析 创建StandbyCheckpointer StandbyCheckpointer分析 CheckpointerThread 干了些 ...

  6. OpenWRT(基于LEDE17.01.4)Open***的Client与Server端内网互通

    经过长达近一个星期的折腾,Open×××客户端终于可以和Server内网的客户端进行通信,其中防火墙占用了大部分时间,主要还是不太熟悉Iptables的各种设置方式以及规则(对CentOS7的Fire ...

  7. Hadoop中RPC机制详解之Server端

    2019独角兽企业重金招聘Python工程师标准>>> Hadoop 中 RPC 机制详解之 Client 端 1. Server.Listener RPC Client 端的 RP ...

  8. Protocol Buffer Java应用实例

    生成目标语言代码 下面的命令帮助我们将MyMessage.proto文件中定义的一组Protocol Buffer格式的消息编译成目标语言(Java)的代码.至于消息的内容,我们会在后面以分段的形式逐 ...

  9. 【Protocol Buffer】Protocol Buffer入门教程(一):简介和安装

    00. 目录 文章目录 00. 目录 01. Protocol Buffer简介 02. Protocol Buffer优缺点 03. Protocol Buffer安装 04. Protocol B ...

最新文章

  1. linux内核网络协议栈--接收流程及函数(九)
  2. BZOJ3944: Sum
  3. 基于行为树的新手引导设计
  4. myeclipse 8.5-9.0 安装 svn 方法 《转载》
  5. 幂集 返回某集合的所有子集
  6. JavaScript call()函数的应用
  7. 使用CXF发布WebService服务简单实例
  8. 笔记本电脑风扇控制软件
  9. 动态规划 之 完全背包
  10. 机器视觉培训-苏州机器视觉公司,课程安排历时7天
  11. $ is not defined
  12. 接口常见安全漏洞说明
  13. 【MQTT】MQTT协议学习
  14. Android平台美颜相机/Camera实时滤镜/视频编解码/影像后期/人脸技术探索——目录
  15. 解决Pycharm 多线程时出现错误Process finished with exit code -1073741819 (0xC0000005)
  16. 绩效被打C了,谈谈「绩效考核」背后的逻辑以及潜规则
  17. (英文版)吴恩达机器学习第五周笔记
  18. 解决电脑能上网不能登陆QQ-已测试并解决
  19. xbox360游戏下载_Xbox Live游戏玩家API
  20. stm32407定义浮点数后进入硬件错误

热门文章

  1. priority_queue
  2. C++中成员函数和成员变量的隶属问题
  3. uniapph5授权成功后返回上一页_被成功验证过的的7条选品思路(收藏)
  4. 湖南大学计算机学院软件专业杨磊,杨磊-湖大信息科学与工程学院
  5. 有sql漏洞的php脚本,DedeCms V57 plus/search.php 文件SQL注射0day漏洞脚本安全 -电脑资料...
  6. vue 不会热启动_使用PM2搭建在线vue.js开发环境(以守护进程方式热启动)
  7. 如何判断强化学习训练是否在收敛?
  8. 最短路径:Dijkstra、BellmanFord以及SPFA算法
  9. python 操作微信定时发信息
  10. tensorflow 风格迁移