1.发送请求:query.getCPUStatus("Intel");

2.传送到:RPCInvoker.invoke(Object proxy, Method method, Object[]args)

其中method:publicabstract org.hadoopinternal.ipc.copy.CPUStatusorg.hadoopinternal.ipc.copy.Query.getCPUStatus(java.lang.String)

args:[Intel]

3.调用:     ObjectWritable value = (ObjectWritable)

client.call(newRPCInvocation(method,args), remoteId);

4.构建 RPCInvocation对象

publicRPCInvocation(Methodmethod, Object[] parameters) {

this.methodName=method.getName();

this.parameterClasses=method.getParameterTypes();

this.parameters=parameters;

}

其中

methodName:getCPUStatus

parameterClasses:[classjava.lang.String]

parameters:[Intel]

5.构建ClientCall对象

publicWritablecall(Writable param, ClientConnectionIdremoteId)

throwsInterruptedException,IOException {

ClientCall call = newClientCall(param,this);

ClientConnection connection = getConnection(remoteId,call);

connection.sendParam(call);

......

}

其中:

protectedClientCall(Writableparam, Client client) {

this.client=client;

this.param=param;

synchronized(client){

this.id=client.counter++;

}

}

其中:

client:org.hadoopinternal.ipc.copy.Client@5fae6db3

param: RPCInvocation对象

id:2

6.调用connection.sendParam(call): 

publicvoidsendParam(ClientCallcall) {

if(shouldCloseConnection.get()){

return;

}

DataOutputBuffer d=null;

try{

synchronized(this.out){

d = newDataOutputBuffer();

d.writeInt(call.id);

call.param.write(d);

byte[]data = d.getData();

intdataLength =d.getLength();

out.writeInt(dataLength);     //first putthe data length

out.write(data,0, dataLength);//write thedata

out.flush();

}

} catch(IOExceptione) {

markClosed(e);

} finally{

//the bufferis just an in-memory buffer, but it is still polite to

// closeearly

IOUtils.closeStream(d);

}

}

先和Call的ID和RPCInvocation对象加在一起,先发送长度,最后发送ID+RPCInvocation对象

7.服务器端接收

读取长度

count= server.channelRead(channel,dataLengthBuffer);

dataLength=dataLengthBuffer.getInt(); (18)

读取数据

data=ByteBuffer.allocate(dataLength);

count= server.channelRead(channel,data);

processData(data.array());

privatevoidprocessData(byte[]buf) throwsIOException, InterruptedException{

DataInputStream dis = newDataInputStream(newByteArrayInputStream(buf));

intid =dis.readInt(); // try toread an id

Writable param =ReflectionUtils.newInstance(server.paramClass,server.conf);//read param

param.readFields(dis);

ServerCall call = newServerCall(id,param, this);

server.callQueue.put(call);// queue thecall; maybe blocked here

rpcCount++;// Incrementthe rpc count

}

8.构建ServerCall对象:

publicServerCall(intid, Writableparam, ServerConnection connection) {

this.id=id;

this.param=param;

this.connection=connection;

this.timestamp=System.currentTimeMillis();

this.response=null;

}

id:2

param:RPCInvocation对象

connection:新构建的ServerConnection对象

timestamp:当前时间戳

response:回应结果

9. 拿出ServerCall,调用相应方法 

finalServerCallcall = server.callQueue.take();

value= server.call(call.connection.protocol,call.param,call.timestamp);

10.反射调用真正方法

将param还原成RPCInvocation对象,拿出MethodName、ParameterClasses、Parameters调用

真正方法:

publicWritablecall(Class<?> protocol, Writableparam, longreceivedTime)

throwsIOException{

try{

RPCInvocation call = (RPCInvocation)param;

Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses());

method.setAccessible(true);

Object value = method.invoke(instance,call.getParameters());

returnnewObjectWritable(method.getReturnType(),value);

} catch(InvocationTargetExceptione) { }

}

根据method的ReturnType和结果构建返回值。

11.将结果(rv)写入ServerCall的Response中 

voidsetupResponse(ByteArrayOutputStreamresponse, ServerCall call,

Status status, Writable rv, String errorClass, Stringerror)

throwsIOException{

response.reset();

DataOutputStream out = newDataOutputStream(response);

out.writeInt(call.id);// write callid

out.writeInt(status.state);// writestatus

if(status ==Status.SUCCESS){

rv.write(out);

} else{

WritableUtils.writeString(out, errorClass);

WritableUtils.writeString(out, error);

}

call.setResponse(ByteBuffer.wrap(response.toByteArray()));

}

先写ServerCall的Id,再写状态Status,再写结果(rv)

12.response发送到客户端

intnumBytes= server.channelWrite(channel,call.response);

13.客户端接收

privatevoidreceiveResponse(){

if(shouldCloseConnection.get()){

return;

}

lastActivity.set(System.currentTimeMillis());

try{

intid =in.readInt();                   // try toread an id

ClientCall call = calls.get(id);

intstate= in.readInt();    // read callstatus

if(state ==Status.SUCCESS.state){

Writable value =ReflectionUtils.newInstance(client.valueClass,client.conf);

value.readFields(in);                // readvalue

call.setValue(value);

calls.remove(id);

} elseif(state ==Status.ERROR.state){

call.setException(new IOException(WritableUtils.readString(in)));

calls.remove(id);

} elseif(state ==Status.FATAL.state){

markClosed(new IOException(WritableUtils.readString(in)));

}

} catch(IOExceptione) {

markClosed(e);

}

}

先读ID:2

再读state:0

再读返回结果value:OW[class=classorg.hadoopinternal.ipc.copy.CPUStatus,value=CPU: Intel Create atFri Mar 14 10:45:56 CST 2014]

其中:

client.valueClass:classorg.apache.hadoop.io.ObjectWritable

转载于:https://www.cnblogs.com/leeeee/p/7276516.html

Hadoop源码分析16: IPC流程(11) 整体流程相关推荐

  1. Hadoop源码分析8: IPC流程(3)客户端的clients、connections、calls复用

    1. RPCClientCache 中的 clients publicclass RPCClientCache { private Map<SocketFactory,Client> cl ...

  2. 【SemiDrive源码分析】【X9芯片启动流程】11 - freertos_safetyos目录Cortex-R5 DIL2.bin 引导程序源代码分析

    [SemiDrive源码分析][X9芯片启动流程]11 - freertos_safetyos目录Cortex-R5 DIL2.bin 引导程序源代码分析 一.freertos_safetyos目录结 ...

  3. Android 系统(78)---《android framework常用api源码分析》之 app应用安装流程

    <android framework常用api源码分析>之 app应用安装流程 <android framework常用api源码分析>android生态在中国已经发展非常庞大 ...

  4. Hadoop源码分析(12)

    Hadoop源码分析(12) 1. journalnode客户端   在文档(11)中分析了初始化editlog的方法.在初始化之前其会根据集 群的配置状态选择不同的方式来进行初始化.在HA状态下,其 ...

  5. 【SemiDrive源码分析】【X9芯片启动流程】30 - AP1 Android Kernel 启动流程 start_kernel 函数详细分析(一)

    [SemiDrive源码分析][X9芯片启动流程]30 - AP1 Android Kernel 启动流程 start_kernel 函数详细分析(一) 一.Android Kernel 启动流程分析 ...

  6. Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用

    Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程   第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...

  7. Hadoop源码分析(25)

    Hadoop源码分析(25) ZKFC源码分析   从文档(4)到文档(24),详细分析了namenode的启动流程.最后namenode会以standby模式启动.但在standby模式下的name ...

  8. 【SemiDrive源码分析】【X9芯片启动流程】12 - freertos_safetyos目录Cortex-R5 DIL2.bin 之 sdm_display_init 显示初始化源码分析

    [SemiDrive源码分析][X9芯片启动流程]12 - freertos_safetyos目录Cortex-R5 DIL2.bin 之 sdm_display_init 显示初始化源码分析 一.s ...

  9. 【SemiDrive源码分析】【X9芯片启动流程】08 - X9平台 lk 目录源码分析 之 目录介绍

    [SemiDrive源码分析][X9芯片启动流程]08 - X9平台 lk 目录源码分析 之 目录介绍 一./rtos/lk/ 目录结构分析 1.1 /rtos/lk_boot/ 目录结构分析 1.2 ...

最新文章

  1. mysql下载吧_Mysql安装图文教程
  2. mysql表创建在哪_mysql创建表命令是哪句
  3. java中文乱码decode_java中文乱码
  4. Effect of Switchovers, Failovers, and Control File Creation on Backups
  5. tesklink 管理员项目角色被修改后的解决方法
  6. ORA-39095: Dump file space has been exhausted: Unable to allocate 8192 bytes
  7. 自定义Android带图片的按钮
  8. 学不会的JAVA,消不了的忧愁! 1
  9. 淘宝网的软件质量属性分析
  10. 高德地图获取坐标距离_【转】根据高德地图得出的坐标算出两点之间的距离
  11. GNU cflow实现调用关系分析
  12. python下载大文件mp4_python 实现视频流下载保存MP4的方法
  13. .NET 指南:构造器的设计
  14. Golang语言 零基础入门教程
  15. OpenPose 基本理念
  16. QTcpSocket实现客户端
  17. github已有项目上添加并更新
  18. 四色定理java_四色定理中公理的证明
  19. 运行python文件、电脑突然黑屏_电脑运行中总是突然黑屏怎么办?
  20. androidP 对反射的限制之黑名单机制

热门文章

  1. Java基础学习记录
  2. VS2010与SVN
  3. 《Pro ASP.NET MVC 3 Framework》学习笔记之十八【URL和Routing】
  4. Unity 导出Supermap exe
  5. 怎么成为开源贡献者_为什么要成为开源的支持者
  6. GlobalSight在翻译社区中与开源大放异彩
  7. 开源项目面试重要吗_开源是最重要项目的骨干
  8. thinkphp 事件
  9. 过滤器实栗 登录检测
  10. css布局模型详细介绍