之前一直在看Hadoop源代码,接下来打算好好的总结一下,先占一个坑,先把之前注释的代码发出来。如有不对,请大家指正。

一、RPC基础概念

1.1 RPC的基础概念

RPC,即Remote Procdure Call,中文名:远程过程调用;

  (1)它允许一台计算机程序远程调用另外一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的。因此,它经常用于分布式网络通信中。

RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

  (2)Hadoop的进程间交互都是通过RPC来进行的,比如Namenode与Datanode直接,Jobtracker与Tasktracker之间等。

因此,可以说Hadoop的运行就是建立在RPC基础之上的。

1.2 RPC的显著特点

  (1)透明性:远程调用其他机器上的程序,对用户来说就像是调用本地方法一样;

  (2)高性能:RPC Server能够并发处理多个来自Client的请求;

  (3)可控性:jdk中已经提供了一个RPC框架—RMI,但是该PRC框架过于重量级并且可控之处比较少,所以Hadoop RPC实现了自定义的PRC框架。

  (1)RPC采用了C/S的模式;

  (2)Client端发送一个带有参数的请求信息到Server;

  (3)Server接收到这个请求以后,根据发送过来的参数调用相应的程序,然后把自己计算好的结果发送给Client端;

  (4)Client端接收到结果后继续运行;

1.4 Hadoop中的RPC机制(IPC)

同其他RPC框架一样,Hadoop RPC分为四个部分:

  (1)序列化层:Clent与Server端通信传递的信息采用了Hadoop里提供的序列化类或自定义的Writable类型;

  (2)函数调用层:Hadoop RPC通过动态代理以及java反射实现函数调用;

  (3)网络传输层:Hadoop RPC采用了基于TCP/IP的socket机制;

  (4)服务器端框架层:RPC Server利用java NIO以及采用了事件驱动的I/O模型,提高RPC Server的并发处理能力;

1.5 Hadoop RPC设计技术

  (1)动态代理

动态代理可以提供对另一个对象的访问,同时隐藏实际对象的具体事实,代理对象对客户隐藏了实际对象。目前Java开发包中提供了对动态代理的支持,但现在只支持对接口的实现。

  (2)反射——动态加载类

  (3)序列化

  (4)非阻塞的异步IO(NIO)

RPC是在分布式系统中必须要关注的,就是你在某一台机器要调用其他机器上的函数的时候,就可以用RPC,使得这个函数调用就像调用本地函数一样,你不需要担心底层如何实现的,就跟TCP一样, 上层调用无需关注下层实现。

Client的大致流程全在下面的代码中,你需要有的基础知识(1)动态代理 (2)JAVA NIO 。  

所有的RPC请求都会重定向,然后所有请求都会形成一个Call类,Call类会加到传输队列中,然后会有一个线程获取Call,并进行数据的传输调用,数据传输用的NIO。具体请看代码注释。

/*需要的知识:1动态代理 2.JAVA NIO
*//*客户端所有的方法调用都重定向到了Invoker.invoke()方法中,所以分析IPC的连接建立与方法调用就从Invoker类开始。所有的ipc代理最后都会调用这个invoke()方法@proxy  需要被代理的协议@method 需要被ipc的方法@args   参数
*/
// ProtobufRpcEngine.invoke()
public Object invoke(Object proxy, final Method method, Object[] args)throws ServiceException {long startTime = 0;if (args.length != 2) { // RpcController + Messagethrow new ServiceException("Too many parameters for request. Method: ["+ method.getName() + "]" + ", Expected: 2, Actual: "+ args.length);}if (args[1] == null) {throw new ServiceException("null param while calling Method: ["+ method.getName() + "]");}// if Tracing is on then start a new span for this rpc.// guard it in the if statement to make sure there isn't// any extra string manipulation.Tracer tracer = Tracer.curThreadTracer();TraceScope traceScope = null;if (tracer != null) {traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));}//IPC产生消息发送头,包括函数名,ProtocolRequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);//参数列表Message theRequest = (Message) args[1];//ipc调用的返回值final RpcResponseWrapper val;try {/** 发送rpc请求,等待返回结果* */val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,fallbackToSimpleAuth);} finally {if (traceScope != null) traceScope.close();}if (Client.isAsynchronousMode()) {final AsyncGet<RpcResponseWrapper, IOException> arr= Client.getAsyncRpcResponse();final AsyncGet<Message, Exception> asyncGet= new AsyncGet<Message, Exception>() {@Overridepublic Message get(long timeout, TimeUnit unit) throws Exception {return getReturnMessage(method, arr.get(timeout, unit));}@Overridepublic boolean isDone() {return arr.isDone();}};ASYNC_RETURN_MESSAGE.set(asyncGet);return null;} else {return getReturnMessage(method, val);}
}/** * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by* <code>remoteId</code>, returning the rpc respond.** @param rpcKind* @param rpcRequest -  contains serialized method and method parameters* @param remoteId - the target rpc server* @param fallbackToSimpleAuth - set to true or false during this method to*   indicate if a secure client falls back to simple auth* @returns the rpc response* Throws exceptions if there are network problems or if the remote code* threw an exception.*/
// Client.call()
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)throws IOException {return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,fallbackToSimpleAuth);
}/*** Make a call, passing <code>rpcRequest</code>, to the IPC server defined by* <code>remoteId</code>, returning the rpc response.** @param rpcKind* @param rpcRequest -  contains serialized method and method parameters* @param remoteId - the target rpc server* @param serviceClass - service class for RPC* @param fallbackToSimpleAuth - set to true or false during this method to*   indicate if a secure client falls back to simple auth* @returns the rpc response* Throws exceptions if there are network problems or if the remote code* threw an exception.*//** 产生一个 call,传递rcpRequest到由remoteId指定的IPC server,并且返回一个 rpc response** */
// Client.call()
Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId, int serviceClass,AtomicBoolean fallbackToSimpleAuth) throws IOException {//产生一个回调实例final Call call = createCall(rpcKind, rpcRequest);//获得连接,里面包含握手,握手发送了一些基本消息final Connection connection = getConnection(remoteId, call, serviceClass,fallbackToSimpleAuth);try {checkAsyncCall();try {connection.sendRpcRequest(call);                 // send the rpc request} } if (isAsynchronousMode()) {final AsyncGet<Writable, IOException> asyncGet= new AsyncGet<Writable, IOException>() {@Overridepublic Writable get(long timeout, TimeUnit unit)throws IOException, TimeoutException{boolean done = true;try {final Writable w = getRpcResponse(call, connection, timeout, unit);if (w == null) {done = false;throw new TimeoutException(call + " timed out "+ timeout + " " + unit);}return w;} finally {if (done) {releaseAsyncCall();}}}@Overridepublic boolean isDone() {synchronized (call) {return call.done;}}};ASYNC_RPC_RESPONSE.set(asyncGet);return null;} else {//返回rpc responsereturn getRpcResponse(call, connection, -1, null);}
}/** Get a connection from the pool, or create a new one and add it to the* pool.  Connections to a given ConnectionId are reused. */
// Client.Connetcion.getConnection()
private Connection getConnection(ConnectionId remoteId,Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)throws IOException {Connection connection;/* we could avoid this allocation for each RPC by having a  * connectionsId object and with set() method. We need to manage the* refs for keys in HashMap properly. For now its ok.*/while (true) {// These lines below can be shorten with computeIfAbsent in Java8connection = connections.get(remoteId);if (connection == null) {// 初始化connection信息connection = new Connection(remoteId, serviceClass);//  putIfAbsent/*** If the specified key is not already associated* with a value, associate it with the given value.* This is equivalent to* <pre>*   if (!map.containsKey(key))*       return map.put(key, value);*   else*       return map.get(key);</pre>* except that the action is performed atomically.* .....*///放入队列中,线程安全的放入Connection existing = connections.putIfAbsent(remoteId, connection);if (existing != null) {connection = existing;}}// 在该connection中加入一个call,线程安全的加if (connection.addCall(call)) {break;} else {// This connection is closed, should be removed. But other thread could// have already known this closedConnection, and replace it with a new// connection. So we should call conditional remove to make sure we only// remove this closedConnection.connections.remove(remoteId, connection);}}// If the server happens to be slow, the method below will take longer to// establish a connection.//设置连接IO流connection.setupIOstreams(fallbackToSimpleAuth);return connection;
}/*** Add a call to this connection's call queue and notify* a listener; synchronized.* Returns false if called during shutdown.* @param call to add* @return true if the call was added.*//*往这个连接中加入一个call, 并且唤醒Connection run线程的等待*/
//Client.Connection.addCall()
private synchronized boolean addCall(Call call) {if (shouldCloseConnection.get())return false;//加入calls发送队列汇总calls.put(call.id, call);//唤醒线程notify();return true;
}/** Connect to the server and set up the I/O streams. It then sends* a header to the server and starts* the connection thread that waits for responses.*//** 建立这个socket 的IO 流* *///Client.Connection.setupIOstreams()
private synchronized void setupIOstreams(AtomicBoolean fallbackToSimpleAuth) {try {Span span = Tracer.getCurrentSpan();if (span != null) {span.addTimelineAnnotation("IPC client connecting to " + server);}short numRetries = 0;Random rand = null;while (true) {// 建立socket连接setupConnection();//获得这个socket连接的输入流InputStream inStream = NetUtils.getInputStream(socket);//获得输出流OutputStream outStream = NetUtils.getOutputStream(socket);// 写 rpc 请求头/*** Write the connection header - this is sent when connection is established* +----------------------------------+* |  "hrpc" 4 bytes                  |* +----------------------------------+* |  Version (1 byte)                |* +----------------------------------+* |  Service Class (1 byte)          |* +----------------------------------+* |  AuthProtocol (1 byte)           |* +----------------------------------+*///第一次写,写headerwriteConnectionHeader(outStream);/*private void writeConnectionHeader(OutputStream outStream)throws IOException {DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));// Write out the header, version and authentication method// “hrpc”out.write(RpcConstants.HEADER.array());//version="9"out.write(RpcConstants.CURRENT_VERSION);out.write(serviceClass);out.write(authProtocol.callId);out.flush();}*/// 都是以 Ping请求发送的      if (doPing) {inStream = new PingInputStream(inStream);}this.in = new DataInputStream(new BufferedInputStream(inStream));// SASL may have already buffered the streamif (!(outStream instanceof BufferedOutputStream)) {outStream = new BufferedOutputStream(outStream);}this.out = new DataOutputStream(outStream);//将connectionHeader发送到服务端口// 第二次写writeConnectionContext(remoteId, authMethod);// update last activity timetouch();span = Tracer.getCurrentSpan();if (span != null) {span.addTimelineAnnotation("IPC client connected to " + server);}// start the receiver thread after the socket connection has been set// up//开启 connection线程,如果calls队列中有call,就会去接受消息start();return;}}
}/*发送rpc 请求  第三次写
*/
// Client.Connection.sendRpcRequest()
public void sendRpcRequest(final Call call)throws InterruptedException, IOException {if (shouldCloseConnection.get()) {return;}// Serialize the call to be sent. This is done from the actual// caller thread, rather than the sendParamsExecutor thread,// so that if the serialization throws an error, it is reported// properly. This also parallelizes the serialization.//// Format of a call on the wire:// 0) Length of rest below (1 + 2)// 1) RpcRequestHeader  - is serialized Delimited hence contains length// 2) RpcRequest//// Items '1' and '2' are prepared here. final DataOutputBuffer d = new DataOutputBuffer();/** call.rpcKind  rpc引擎** */RpcRequestHeaderaderProto header = ProtoUtil.makeRpcRequestHeader(call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,clientId);header.writeDelimitedTo(d);call.rpcRequest.write(d);// 同步锁 sendRpcRequestLock  synchronized (sendRpcRequestLock) {//为了后续的数据发送Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {// 发送具体回调函数给server端@Overridepublic void run() {try {synchronized (Connection.this.out) {if (shouldCloseConnection.get()) {return;}if (LOG.isDebugEnabled())LOG.debug(getName() + " sending #" + call.id);// 获取数据长度byte[] data = d.getData();int totalLength = d.getLength();out.writeInt(totalLength); // Total Lengthout.write(data, 0, totalLength);// RpcRequestHeader + RpcRequestout.flush();}} catch (IOException e) {// exception at this point would leave the connection in an// unrecoverable state (eg half a call left on the wire).// So, close the connection, killing any outstanding callsmarkClosed(e);} finally {//the buffer is just an in-memory buffer, but it is still polite to// close earlyIOUtils.closeStream(d);}}});try {senderFuture.get();} }
}//Client.Connection.run()
public void run() {try {// 等待receive的工作while (waitForWork()) {//wait here for work - read or close connectionreceiveRpcResponse();}} close();
}private void receiveRpcResponse() {if (shouldCloseConnection.get()) {return;}//修改最后一次活动时间touch();try {//读取数据头int totalLen = in.readInt();/*** Protobuf type {@code hadoop.common.RpcResponseHeaderProto}** <pre>*** Rpc Response Header* +------------------------------------------------------------------+* | Rpc total response length in bytes (4 bytes int)                 |* |  (sum of next two parts)                                         |* +------------------------------------------------------------------+* | RpcResponseHeaderProto - serialized delimited ie has len         |* +------------------------------------------------------------------+* | if request is successful:                                        |* |   - RpcResponse -  The actual rpc response  bytes follow         |* |     the response header                                          |* |     This response is serialized based on RpcKindProto            |* | if request fails :                                               |* |   The rpc response header contains the necessary info            |* +------------------------------------------------------------------+** Note that rpc response header is also used when connection setup fails.* Ie the response looks like a rpc response with a fake callId.* </pre>*/RpcResponseHeaderProto header =RpcResponseHeaderProto.parseDelimitedFrom(in);checkResponse(header);int headerLen = header.getSerializedSize();headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);//获取callid号int callId = header.getCallId();if (LOG.isDebugEnabled())LOG.debug(getName() + " got value #" + callId);//Rpc 回复的状态RpcStatusProto status = header.getStatus();//判断返回的rpc response 状态if (status == RpcStatusProto.SUCCESS) {Writable value = ReflectionUtils.newInstance(valueClass, conf);value.readFields(in);                 // read value// 移除这个callidfinal Call call = calls.remove(callId);// 设置返回值call.setRpcResponse(value);// verify that length was correct// only for ProtobufEngine where len can be verified easilyif (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();if (totalLen != headerLen + resWrapper.getLength()) { throw new RpcClientException("RPC response length mismatch on rpc success");}}} else { // Rpc Request failed// Verify that length was correctif (totalLen != headerLen) {throw new RpcClientException("RPC response length mismatch on rpc error");}final String exceptionClassName = header.hasExceptionClassName() ?header.getExceptionClassName() : "ServerDidNotSetExceptionClassName";final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;final RpcErrorCodeProto erCode = (header.hasErrorDetail() ? header.getErrorDetail() : null);if (erCode == null) {LOG.warn("Detailed error code not set by server on rpc error");}RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);if (status == RpcStatusProto.ERROR) {final Call call = calls.remove(callId);call.setException(re);} else if (status == RpcStatusProto.FATAL) {// Close the connectionmarkClosed(re);}}} catch (IOException e) {markClosed(e);}
}

hadoop3.0.0 源码阅读之一:IPC Client部分相关推荐

  1. 一个CTU分割细节过程—— VTM (13.0)源码阅读

    摘要:本文主要分析VTM13.0 一个CTU分割过程,目的是为了在此基础上嵌入深度学习算法加速CU 分割决策过程. 1. CU/CTU 总体划分框架 1)CTU -> CU 2)xComPres ...

  2. 推荐系列文章:《DotText源码阅读》

    DotText源码阅读 作者:shanhe DotText源码阅读(0) DotText源码阅读(1)-调试 DotText源码阅读(2)-工程.数据库表结构 DotText源码阅读(3)-框架配置体 ...

  3. 从路由原理出发,深入阅读理解react-router 4.0的源码

      react-router等前端路由的原理大致相同,可以实现无刷新的条件下切换显示不同的页面.路由的本质就是页面的URL发生改变时,页面的显示结果可以根据URL的变化而变化,但是页面不会刷新.通过前 ...

  4. Ubuntu下android-4.0.3_r1源码下载,阅读工具安装配置,源码编译详解

    备注: android源码动辄6.7G,新版本的则10G左右,所以要有足够大的硬盘空间. android应用开发环境搭建:http://www.cnblogs.com/pharen/archive/2 ...

  5. Linux内核基础——Linux源码阅读工具Source Insight4.0

    Linux内核源码阅读工具--source insight4.0 Source insight4.0工具的使用入门 一.Souce insight建立工程.导入源码 二.遍历所有源码文件建立符号索引 ...

  6. Spring Boot 2.0系列文章(四):Spring Boot 2.0 源码阅读环境搭建

    前提 前几天面试的时候,被问过 Spring Boot 的自动配置源码怎么实现的,没看过源码的我只能投降��了. 这不,赶紧来补补了,所以才有了这篇文章的出现,Spring Boot 2. 0 源码阅 ...

  7. 【转】Ubuntu 14.04.3上配置并成功编译Android 6.0 r1源码

    http://www.linuxidc.com/Linux/2016-01/127292.htm 终于成功把Android 6.0 r1源码的源码编译.先上图,这是在Ubuntu中运行的Android ...

  8. Ubuntu16.04编译android6.0.1源码记录

    目录 目录 一.安装环境 二.下载源码 1.下载repo 2.初始化repo 3.同步源代码 关于驱动 三.编译源码 四.导入源码到AS 五.刷入真机 六.修改源码 总结: 3.同步源代码 关于驱动 ...

  9. android4.0.3源码之硬件gps简单移植

    [转]我和菜鸟一起学android4.0.3源码之硬件gps简单移植 2013-7-5阅读94 评论0 关于android定位方式 android 定位一般有四种方法,这四种方式分别是GPS定位.WI ...

最新文章

  1. Plate impulse response spatial interpolation with sub-Nyquist sampling
  2. SBO中流程控制功能的实现-SBO_SP_TransactionNotification
  3. JZOJ 4933. 【NOIP2017提高组模拟12.24】C
  4. 腾讯----小Q的歌单
  5. 树莓派4b上部署yolov3和v3-tiny记录带截图
  6. Effective Java~37. 用EnumMap 代替序数索引
  7. 网线主管(信息学奥赛一本通-T1242)
  8. 微擎 php开发手册,目录结构
  9. Myeclipse和 eclipse中的控制台汉字横着显示修改
  10. Android Studio “Project Structure”选项目录结构显示异常
  11. 报文交换(串行)和分组交换(并行)
  12. Javascript第四章参数和返回值基本用法第二课
  13. 2022年最佳的9种逆向工程工具[持续更新]
  14. centos7 文件名中文乱码_Linux服务器文件名乱码常见问题
  15. ESP32的智能药箱-WEB定时-舵机和语音控制-OLED实时时间显示
  16. Mac 文本对比工具(比较两份文件差异)
  17. 给div元素设置背景颜色
  18. 使用wandb报错:ERROR Error while calling WB API: project not found (<Response [404]>)
  19. syswow64删除文件_syswow64是什么文件夹?syswow64可以删除吗
  20. [python爬虫之路day20]:CrawSpider爬取微信小程序社区技术帖

热门文章

  1. 互斥体CMutex的使用
  2. Log4j执行漏洞修复教程
  3. SQL 学习最强刷题网站!
  4. 干货:数据仓库架构及基础知识
  5. 如何正确、高效地阅读源代码?
  6. 科普|什么是负载均衡
  7. CCtalk高可用多媒体服务技术选型与实现
  8. 原理篇 | 推荐系统之矩阵分解模型
  9. PMP之项目整合管理之变更管理计划
  10. Redis中布隆过滤器的使用及原理