2019独角兽企业重金招聘Python工程师标准>>>

Hadoop 中 RPC 机制详解之 Client 端

1. Server.Listener

RPC Client 端的 RPC 请求发送到 Server 端后, 首先由 Server.Listener 接收

Server.Listener 类继承自 Thread 类, 监听了 OP_READ 和 OP_ACCEPT 事件

Server.Listener 接收 RPC 请求, 在 Server.Listener.doRead() 方法中读取数据, 在 doRead() 方法中又调用了Server.Connection.readAndProcess() 方法,

最后会调用 Server.Connection.processRpcRequest() 方法, 源码如下:

private void processRpcRequest(RpcRequestHeaderProto header,DataInputStream dis) throws WrappedRpcServerException,InterruptedException {...Writable rpcRequest;// 从成员变量dis中反序列化出Client端发送来的RPC请求( WritableRpcEngine.Invocation对象 )try { //Read the rpc requestrpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);rpcRequest.readFields(dis);} catch (Throwable t) { // includes runtime exception from newInstance...}// 构造Server端Server.Call实例对象Call call = new Call(header.getCallId(), header.getRetryCount(),rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header.getClientId().toByteArray());// 将Server.Call实例对象放入调用队列中callQueue.put(call);              // queue the call; maybe blocked hereincRpcCount();  // Increment the rpc count}

调用队列 callQueue 是 Server 的成员变量, Server.Listener 和 Server.Handler 是典型的生产者, 消费者模型,

Server.Listener( 生产者 )的doRead()方法最终调用Server.Connection.processRpcRequest() 方法,

而Server.Handler( 消费者 )处理RPC请求

2. Server.Handler 继承 Thread 类, 其主要工作是处理 callQueue 中的调用, 都在 run() 方法中完成. 在 run() 的主循环中, 每次处理一个从 callQueue 中出队的请求, Server.call() 是一个抽象方法, 实际是调用了 RPC.Server.call()方法, 最后通过 WritableRPCEngine.call() 方法完成 Server 端方法调用

/** Handles queued calls . */private class Handler extends Thread {...@Overridepublic void run() {...ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);while (running) {...final Call call = callQueue.take();    // 获取一个RPC调用请求...Writable value = null;value = call.connection.user.doAs(new PrivilegedExceptionAction<Writable>() {@Overridepublic Writable run() throws Exception {// 调用RPC.Server.call()方法// call.rpcKind : RPC调用请求的类型, 一般为Writable// call.connection.protocolName : RPC协议接口的类名// call.rpcRequest : Invocation实例对象, 包括方法名, 参数列表, 参数列表的Class对象数组// call.timestamp : 调用时间戳return call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp);}});}...}
}

RPC.Server.call() 方法如下:

@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,Writable rpcRequest, long receiveTime) throws Exception {return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,receiveTime);
}

最后通过 WritableRPCEngine.call() 方法完成 Server 端方法调用, 代码如下:

@Override
public Writable call(org.apache.hadoop.ipc.RPC.Server server,String protocolName, Writable rpcRequest, long receivedTime)throws IOException, RPC.VersionMismatch {Invocation call = (Invocation)rpcRequest;    // 将RPC请求强制转成WritableRpcEngine.Invocation对象...long clientVersion = call.getProtocolVersion();final String protoName;ProtoClassProtoImpl protocolImpl;  // Server端RPC协议接口的实现类的实例对象...// Invoke the protocol methodtry {...// 获取RPC请求中调用的方法对象MethodMethod method = protocolImpl.protocolClass.getMethod(call.getMethodName(),call.getParameterClasses());method.setAccessible(true);...// 在Server端RPC协议接口的实现类的实例对象 protocolImpl 上调用具体的方法Object value = method.invoke(protocolImpl.protocolImpl, call.getParameters());...// 调用正常结束, 返回调用结果return new ObjectWritable(method.getReturnType(), value);} catch (InvocationTargetException e) { // 调用出现异常, 用IOException包装异常, 最后抛出该异常Throwable target = e.getTargetException();if (target instanceof IOException) {throw (IOException)target;} else {IOException ioe = new IOException(target.toString());ioe.setStackTrace(target.getStackTrace());throw ioe;}} catch (Throwable e) {...}}
}

在 WritableRpcEngine.call() 方法中, 传入的 rpcRequest 会被强制转换成 WritableRpcEngine.Invocation 类型的对象 call , 并通过 call 这个对象包含的方法名(getMethodName()方法)和参数列表的 Class对象数组(getParameterClasses())获取 Method 对象, 最终通过 Method 对象的invoke() 方法, 调用实现类的实例对象 protocolImpl 上的方法, 完成 Hadoop 的远程过程调用

好了, 现在 Server 端的具体方法已经被调用了, 调用结果分两种情况:

1) 调用正常结束, 则将方法的返回值和调用结果封装成一个 ObjectWritable 类型的对象, 并返回

2) 调用出现异常, 抛出 IOException 类型的异常

3. Server.Responder

这个类的功能: 发送 Hadoop 远程过程调用的应答给 Client 端, Server.Responder 类继承自 Thread 类, 监听了 OP_WRITE 事件, 即通道可写.  具体细节写不下去了

总结:

Server.Responder 和 Server.Listener, Server.Handler 一起配合, 完成 Hadoop 中 RPC 的 Server 端处理:

Server.Listener 接收 Client 端的连接请求和请求数据; Server.Handler 完成实际的过程调用; Server.Responder 则进行应答发送

转载于:https://my.oschina.net/u/2503731/blog/670456

Hadoop中RPC机制详解之Server端相关推荐

  1. Hadoop之Shuffle机制详解

    Hadoop之Shuffle机制详解 目录 Shuffle机制 Partition分区 WritableComparable排序 Combiner合并 GroupingComparator分组(辅助排 ...

  2. Hadoop中RPC机制

    Hadoop中RPC机制 RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.Hadoo ...

  3. Java虚拟机中类加载机制详解

    Java虚拟机中类加载机制详解 1,什么是java类加载机制 **首先在java中,是通过编译来生成.class文件(可能在本地,或者网页下载),java的类加载机制就是 将这些.class文件加载到 ...

  4. Android中IPC机制详解

    本文部分内容参照<Android开发艺术探索> IPC是什么? IPC全称为Inter-Process Communication,译为"跨进程通信",在这里要着重提一 ...

  5. Java 中 Varargs 机制详解

    新的J2SE 1.5版本提供了"Varargs"这一机制.使用该机制可以定义能和多个实参相匹配的形参.从而可以用一种更简单的方式,来传递个数可变的实参.本文介绍这一机制的使用方法, ...

  6. oracle的mvcc解析,PostgreSQL原理:Oracle 和 MySQL 中MVCC机制详解

    MVCC,Multi-version Concurrency Control ,顾名思义指的是多版本并发控制.在介绍MVCC之前我们先来简单了解下事务的隔离级别: read uncommitted:脏 ...

  7. Hadoop之Yarn工作机制详解

    Hadoop之Yarn工作机制详解 目录 Yarn概述 Yarn基本架构 Yarn工作机制 作业提交全过程详解 1. Yarn概述 Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于 ...

  8. Hadoop之NameNode和SecondaryNameNode工作机制详解

    Hadoop之NameNode和SecondaryNameNode工作机制详解 NN和2NN工作机制 NN和2NN工作机制详解 Fsimage和Edits解析 checkpoint时间设置 1. NN ...

  9. Session机制详解及分布式中Session共享解决方案

    Session机制详解及分布式中Session共享解决方案 参考文章: (1)Session机制详解及分布式中Session共享解决方案 (2)https://www.cnblogs.com/jing ...

最新文章

  1. 为了智能驾驶,李彦宏要改造城市道路了!
  2. SAP的会计凭证类别
  3. Oracle表的分区update卡着,分区表update global indexes引起表阻塞
  4. c语言输入整数要求输出字符,求C语言 将输入整数转换成字符串输出!
  5. 动态规划训练25 [Food Delivery ZOJ - 3469 ]好题
  6. 看!闲鱼在ServiceMesh的探索和实践
  7. windows2003 iis 配置 php
  8. js判断客户浏览器类型,版本
  9. linux内核 can总线,基于Linux的PC104总线与CAN总线通信设计
  10. 论文清单:一文梳理因果推理在自然语言处理中的应用
  11. hub设备_【小O新品】办公设备的小助手,ORICO奥睿科HUB集线器新品来袭
  12. 刷单会入刑了你知道吗?四招教你迅速识别刷单!
  13. 智能相机与工业相机_使用智能手机相机后如何移动到专用相机
  14. 流量变现的好方法都在这里了!
  15. 二叉树前序遍历执行过程
  16. GPS经纬度转化为百度地图/Google坐标及互转方案
  17. win10自带的 快速截图功能
  18. 解决报错Parameter 0 of constructor in XXX required a bean...elasticsearch 继承ElasticsearchConfiguration方法
  19. 小白学习图像处理3——图像旋转原理
  20. 中国硫酸铜杀菌剂市场趋势报告、技术动态创新及市场预测

热门文章

  1. todo已完成任务_重要主干街路已完成清雪任务
  2. linux禁用用户账号,技术|在 Linux 系统中禁用与解禁用户的账号
  3. 合并远程仓库到本地_使用命令行把你新建的项目上传到GitHub仓库中
  4. java的character用法_Java中Character类的使用方法
  5. 自己写的计算时间坐标的代码
  6. 2023年考研之路或将更难
  7. java架构师眼中的高并发架构
  8. Canvas 渐变特效
  9. python 标准化_数据标准化
  10. 基于Java线程池读取数据库中数据(学习+运用)