Hadoop源码分析16: IPC流程(11) 整体流程
1.发送请求:query.getCPUStatus("Intel");
2.传送到:RPCInvoker.invoke(Object proxy, Method method, Object[]args)
其中method:publicabstract org.hadoopinternal.ipc.copy.CPUStatusorg.hadoopinternal.ipc.copy.Query.getCPUStatus(java.lang.String)
args:[Intel]
3.调用: ObjectWritable value = (ObjectWritable)
client.call(newRPCInvocation(method,args), remoteId);
4.构建 RPCInvocation对象
publicRPCInvocation(Methodmethod, Object[] parameters) {
this.methodName=method.getName();
this.parameterClasses=method.getParameterTypes();
this.parameters=parameters;
}
其中
methodName:getCPUStatus
parameterClasses:[classjava.lang.String]
parameters:[Intel]
5.构建ClientCall对象
publicWritablecall(Writable param, ClientConnectionIdremoteId)
throwsInterruptedException,IOException {
ClientCall call = newClientCall(param,this);
ClientConnection connection = getConnection(remoteId,call);
connection.sendParam(call);
......
}
其中:
protectedClientCall(Writableparam, Client client) {
this.client=client;
this.param=param;
synchronized(client){
this.id=client.counter++;
}
}
其中:
client:org.hadoopinternal.ipc.copy.Client@5fae6db3
param: RPCInvocation对象
id:2
6.调用connection.sendParam(call):
publicvoidsendParam(ClientCallcall) {
if(shouldCloseConnection.get()){
return;
}
DataOutputBuffer d=null;
try{
synchronized(this.out){
d = newDataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[]data = d.getData();
intdataLength =d.getLength();
out.writeInt(dataLength); //first putthe data length
out.write(data,0, dataLength);//write thedata
out.flush();
}
} catch(IOExceptione) {
markClosed(e);
} finally{
//the bufferis just an in-memory buffer, but it is still polite to
// closeearly
IOUtils.closeStream(d);
}
}
先和Call的ID和RPCInvocation对象加在一起,先发送长度,最后发送ID+RPCInvocation对象
7.服务器端接收
读取长度
count= server.channelRead(channel,dataLengthBuffer);
dataLength=dataLengthBuffer.getInt(); (18)
读取数据
data=ByteBuffer.allocate(dataLength);
count= server.channelRead(channel,data);
processData(data.array());
privatevoidprocessData(byte[]buf) throwsIOException, InterruptedException{
DataInputStream dis = newDataInputStream(newByteArrayInputStream(buf));
intid =dis.readInt(); // try toread an id
Writable param =ReflectionUtils.newInstance(server.paramClass,server.conf);//read param
param.readFields(dis);
ServerCall call = newServerCall(id,param, this);
server.callQueue.put(call);// queue thecall; maybe blocked here
rpcCount++;// Incrementthe rpc count
}
8.构建ServerCall对象:
publicServerCall(intid, Writableparam, ServerConnection connection) {
this.id=id;
this.param=param;
this.connection=connection;
this.timestamp=System.currentTimeMillis();
this.response=null;
}
id:2
param:RPCInvocation对象
connection:新构建的ServerConnection对象
timestamp:当前时间戳
response:回应结果
9. 拿出ServerCall,调用相应方法
finalServerCallcall = server.callQueue.take();
value= server.call(call.connection.protocol,call.param,call.timestamp);
10.反射调用真正方法
将param还原成RPCInvocation对象,拿出MethodName、ParameterClasses、Parameters调用
真正方法:
publicWritablecall(Class<?> protocol, Writableparam, longreceivedTime)
throwsIOException{
try{
RPCInvocation call = (RPCInvocation)param;
Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses());
method.setAccessible(true);
Object value = method.invoke(instance,call.getParameters());
returnnewObjectWritable(method.getReturnType(),value);
} catch(InvocationTargetExceptione) { }
}
根据method的ReturnType和结果构建返回值。
11.将结果(rv)写入ServerCall的Response中
voidsetupResponse(ByteArrayOutputStreamresponse, ServerCall call,
Status status, Writable rv, String errorClass, Stringerror)
throwsIOException{
response.reset();
DataOutputStream out = newDataOutputStream(response);
out.writeInt(call.id);// write callid
out.writeInt(status.state);// writestatus
if(status ==Status.SUCCESS){
rv.write(out);
} else{
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
先写ServerCall的Id,再写状态Status,再写结果(rv)
12.response发送到客户端
intnumBytes= server.channelWrite(channel,call.response);
13.客户端接收
privatevoidreceiveResponse(){
if(shouldCloseConnection.get()){
return;
}
lastActivity.set(System.currentTimeMillis());
try{
intid =in.readInt(); // try toread an id
ClientCall call = calls.get(id);
intstate= in.readInt(); // read callstatus
if(state ==Status.SUCCESS.state){
Writable value =ReflectionUtils.newInstance(client.valueClass,client.conf);
value.readFields(in); // readvalue
call.setValue(value);
calls.remove(id);
} elseif(state ==Status.ERROR.state){
call.setException(new IOException(WritableUtils.readString(in)));
calls.remove(id);
} elseif(state ==Status.FATAL.state){
markClosed(new IOException(WritableUtils.readString(in)));
}
} catch(IOExceptione) {
markClosed(e);
}
}
先读ID:2
再读state:0
再读返回结果value:OW[class=classorg.hadoopinternal.ipc.copy.CPUStatus,value=CPU: Intel Create atFri Mar 14 10:45:56 CST 2014]
其中:
client.valueClass:classorg.apache.hadoop.io.ObjectWritable
转载于:https://www.cnblogs.com/leeeee/p/7276516.html
Hadoop源码分析16: IPC流程(11) 整体流程相关推荐
- Hadoop源码分析8: IPC流程(3)客户端的clients、connections、calls复用
1. RPCClientCache 中的 clients publicclass RPCClientCache { private Map<SocketFactory,Client> cl ...
- 【SemiDrive源码分析】【X9芯片启动流程】11 - freertos_safetyos目录Cortex-R5 DIL2.bin 引导程序源代码分析
[SemiDrive源码分析][X9芯片启动流程]11 - freertos_safetyos目录Cortex-R5 DIL2.bin 引导程序源代码分析 一.freertos_safetyos目录结 ...
- Android 系统(78)---《android framework常用api源码分析》之 app应用安装流程
<android framework常用api源码分析>之 app应用安装流程 <android framework常用api源码分析>android生态在中国已经发展非常庞大 ...
- Hadoop源码分析(12)
Hadoop源码分析(12) 1. journalnode客户端 在文档(11)中分析了初始化editlog的方法.在初始化之前其会根据集 群的配置状态选择不同的方式来进行初始化.在HA状态下,其 ...
- 【SemiDrive源码分析】【X9芯片启动流程】30 - AP1 Android Kernel 启动流程 start_kernel 函数详细分析(一)
[SemiDrive源码分析][X9芯片启动流程]30 - AP1 Android Kernel 启动流程 start_kernel 函数详细分析(一) 一.Android Kernel 启动流程分析 ...
- Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用
Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程 第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...
- Hadoop源码分析(25)
Hadoop源码分析(25) ZKFC源码分析 从文档(4)到文档(24),详细分析了namenode的启动流程.最后namenode会以standby模式启动.但在standby模式下的name ...
- 【SemiDrive源码分析】【X9芯片启动流程】12 - freertos_safetyos目录Cortex-R5 DIL2.bin 之 sdm_display_init 显示初始化源码分析
[SemiDrive源码分析][X9芯片启动流程]12 - freertos_safetyos目录Cortex-R5 DIL2.bin 之 sdm_display_init 显示初始化源码分析 一.s ...
- 【SemiDrive源码分析】【X9芯片启动流程】08 - X9平台 lk 目录源码分析 之 目录介绍
[SemiDrive源码分析][X9芯片启动流程]08 - X9平台 lk 目录源码分析 之 目录介绍 一./rtos/lk/ 目录结构分析 1.1 /rtos/lk_boot/ 目录结构分析 1.2 ...
最新文章
- mysql下载吧_Mysql安装图文教程
- mysql表创建在哪_mysql创建表命令是哪句
- java中文乱码decode_java中文乱码
- Effect of Switchovers, Failovers, and Control File Creation on Backups
- tesklink 管理员项目角色被修改后的解决方法
- ORA-39095: Dump file space has been exhausted: Unable to allocate 8192 bytes
- 自定义Android带图片的按钮
- 学不会的JAVA,消不了的忧愁! 1
- 淘宝网的软件质量属性分析
- 高德地图获取坐标距离_【转】根据高德地图得出的坐标算出两点之间的距离
- GNU cflow实现调用关系分析
- python下载大文件mp4_python 实现视频流下载保存MP4的方法
- .NET 指南:构造器的设计
- Golang语言 零基础入门教程
- OpenPose 基本理念
- QTcpSocket实现客户端
- github已有项目上添加并更新
- 四色定理java_四色定理中公理的证明
- 运行python文件、电脑突然黑屏_电脑运行中总是突然黑屏怎么办?
- androidP 对反射的限制之黑名单机制