SparkRPC源码分析之RPC管道与消息类型
我们前面看过了netty基础知识扫盲,那我们应该明白,ChannelHandler这个组件内为channel的各种事件提供了处理逻辑,也就是主要业务逻辑写在该组建内。Spark的RPC也不会例外,因此我们看一下Spark的Handler怎么调用的。在TransPortClientFactory初始化客户端之前有一条代码为TransportChannelHandler clientHandler = context.initializePipeline(ch);这里的context定义的地方为private final TransportContext context;也就时我们接下来看TransoprtContext类的方法,代码如下

public TransportChannelHandler initializePipeline(SocketChannel channel) {
return initializePipeline(channel, rpcHandler);
}
1
2
3
可以看到这里的initializePipeline调用了另一个initializePipeline方法,它的代码如下

public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);
return channelHandler;
} catch (RuntimeException e) {
logger.error("Error while initializing Netty pipeline", e);
throw e;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
这里面和我们前面netty基础知识扫盲里面做的内容很类似,就是给pipeline启动添加了一些Handler处理逻辑
通过addLast添加的Handler会被依次执行顺序或倒序,那么我们就来依次看一些他的Handler都干了什么。

addLast(“encoder”, ENCODER)
服务器端用来编码服务器到客户端响应的编码器。通过调用消息的encode()方法对其进行编码。对于非数据消息,将添加一个ByteBuf到“out”,其中包含总帧长度、消息类型和消息本身。在ChunkFetchSuccess的情况下,我们还将与数据对应的ManagedBuffer添加到“Out”,以便启用零拷贝传输。一般会出现的消息类型如下

0 ChunkFetchRequest;
1 ChunkFetchSuccess;
2 ChunkFetchFailure;
3 RpcRequest;
4 RpcResponse;
5 RpcFailure;
6 StreamRequest;
7 StreamResponse;
8 StreamFailure;
9 OneWayMessage;
1 UploadStream;
-1 User
1
2
3
4
5
6
7
8
9
10
11
12
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
- 一种允许截取原始数据的定制帧解码器。
- 类似于Netty的帧解码器(具有符合此库需要的硬编码参数)但是不同的是它在封装成帧之前允许去装拦截器直接去读取数据
- 与Netty的帧解码器不同,每个帧在解码后立即被发送给子处理程序,而不是尽可能多地放入当前缓冲区一次性发出去。这允许子处理程序在需要时安装拦截器。
- 如果安装了拦截器,则停止封装成帧,数据将直接输入拦截器,当拦截器指示它不需要读取任何更多数据时,封装恢复

.addLast(“decoder”, DECODER)
客户端用来解码服务器到客户端响应的解码器。消息类型和加密端一样不再重复写了

.addLast(“idleStateHandler”, new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle【空闲】 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互,确保TCP连接有效

.addLast(“handler”, channelHandler);

channelHandler的创建代码如下

TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
1
createChannelHandler代码如下

private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred());
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
}
1
2
3
4
5
6
7
8
值得注意的一点,我们可以看到这里面有客户端的初始化new TransportClient(channel, responseHandler);也许大家会有疑惑,我们前面才看了代码TransportClientFactory中有初始化TransportClient的代码,怎么这里也有呢?

这里分析一下TransportClientFactory中创建TransportClient时的情况,可以看到代码如下

final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
final AtomicReference<Channel> channelRef = new AtomicReference<>();
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
TransportChannelHandler clientHandler = context.initializePipeline(ch);
clientRef.set(clientHandler.getClient());
channelRef.set(ch);
}
});
…… 省略掉一部分代码
TransportClient client = clientRef.get();
…… 省略掉一部分代码
return client;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
可以看到这里的客户端与其说时创建不如说是获取,从clientHandler中获取,这么看来,客户端真正的创建的地方是在new关键字出现的地方,也就是这里,而TransportClientFactory中的创建不过是从这边取到的而已。

接着看TransportChannelHandler这个类到底为何方神圣?

从类图上可以看出来,这个类实现了ChannelInboundHandler接口,那么这个接口是干什么的呢?

ChannelInboundHandler是一个netty的组件,它是一个常用的Handler。这个Handler的作用就是处理接收到数据时的事件,我们的业务逻辑一般就是写在这个Handler里面。

这个TransportChannelHandler的处理业务逻辑是什么呢?看下面代码可知它重写了channelRead方法

@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else if (request instanceof ResponseMessage) {
responseHandler.handle((ResponseMessage) request);
} else {
ctx.fireChannelRead(request);
}
}
1
2
3
4
5
6
7
8
9
10
这里它主要判断请求是什么类型的数据,根据类型交给TransportResponseHandler或者TransportRequestHandler的对象去处理。

这里可以看出无论是TransportRequestHandler还是TransportResponseHandler都是继承于MessageHandler抽象类。
那么我们就来看一下MessageHandler,看一下他的方法发现上面调用的handle方法都是来自于重写该类的方法.

public abstract class MessageHandler<T extends Message> {
//处理单个消息的接收。
public abstract void handle(T message) throws Exception;
//当MessageHandler所在的通道处于活动状态时调用
public abstract void channelActive();
//在通道上捕获异常时调用
public abstract void exceptionCaught(Throwable cause);
//当MessageHandler所在的通道处于非活动状态时调用
public abstract void channelInactive();
}
1
2
3
4
5
6
7
8
9
10
那么我们先看TransportRequestHandler重写的handle方法

@Override
public void handle(RequestMessage request) {
if (request instanceof ChunkFetchRequest) {
processFetchRequest((ChunkFetchRequest) request);
} else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
} else if (request instanceof StreamRequest) {
processStreamRequest((StreamRequest) request);
} else if (request instanceof UploadStream) {
processStreamUpload((UploadStream) request);
} else {
throw new IllegalArgumentException("Unknown request type: " + request);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
可以看出,在这里用了if-else逻辑判断消息的类型,然后再交给相应的方法去处理。那么一共有多少消息呢?它都可以处理什么消息呢?请看类图

那么现在再看一下TransportResponseHandler它复写的handle方法逻辑如下

@Override
public void handle(ResponseMessage message) throws Exception {
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} since it is not outstanding",
resp.streamChunkId, getRemoteAddress(channel));
resp.body().release();
} else {
outstandingFetches.remove(resp.streamChunkId);
listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
resp.body().release();
}
} else if (message instanceof ChunkFetchFailure) {
ChunkFetchFailure resp = (ChunkFetchFailure) message;
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingFetches.remove(resp.streamChunkId);
listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
"Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
}
} else if (message instanceof RpcResponse) {
RpcResponse resp = (RpcResponse) message;
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
resp.requestId, getRemoteAddress(channel), resp.body().size());
} else {
outstandingRpcs.remove(resp.requestId);
try {
listener.onSuccess(resp.body().nioByteBuffer());
} finally {
resp.body().release();
}
}
} else if (message instanceof RpcFailure) {
RpcFailure resp = (RpcFailure) message;
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
resp.requestId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingRpcs.remove(resp.requestId);
listener.onFailure(new RuntimeException(resp.errorString));
}
} else if (message instanceof StreamResponse) {
StreamResponse resp = (StreamResponse) message;
Pair<String, StreamCallback> entry = streamCallbacks.poll();
if (entry != null) {
StreamCallback callback = entry.getValue();
if (resp.byteCount > 0) {
StreamInterceptor<ResponseMessage> interceptor = new StreamInterceptor<>(
this, resp.streamId, resp.byteCount, callback);
try {
TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
frameDecoder.setInterceptor(interceptor);
streamActive = true;
} catch (Exception e) {
logger.error("Error installing stream handler.", e);
deactivateStream();
}
} else {
try {
callback.onComplete(resp.streamId);
} catch (Exception e) {
logger.warn("Error in stream handler onComplete().", e);
}
}
} else {
logger.error("Could not find callback for StreamResponse.");
}
} else if (message instanceof StreamFailure) {
StreamFailure resp = (StreamFailure) message;
Pair<String, StreamCallback> entry = streamCallbacks.poll();
if (entry != null) {
StreamCallback callback = entry.getValue();
try {
callback.onFailure(resp.streamId, new RuntimeException(resp.error));
} catch (IOException ioe) {
logger.warn("Error in stream failure handler.", ioe);
}
} else {
logger.warn("Stream failure with unknown callback: {}", resp.error);
}
} else {
throw new IllegalStateException("Unknown response type: " + message.type());
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
详细代码暂且不管,大体可以看出也是使用if-else逻辑判断消息的类型,然后分别进行处理。那么我么来看一些这里的消息。
---------------------

转载于:https://www.cnblogs.com/hyhy904/p/11007502.html

SparkRPC源码分析之RPC管道与消息类型相关推荐

  1. Hadoop2源码分析-RPC机制初识

    1.概述 上一篇博客,讲述Hadoop V2的序列化机制,这为我们学习Hadoop V2的RPC机制奠定了基础.RPC的内容涵盖的信息有点多,包含Hadoop的序列化机制,RPC,代理,NIO等.若对 ...

  2. 深入Preact源码分析(五)非组件类型的diff解析

    非组件节点的diff分析 diff的流程,我们从简单到复杂进行分析 通过前面几篇文章的源码阅读,我们也大概清楚了diff函数参数的定义和component各参数的作用 /*** @param dom ...

  3. Apache Storm 实时流处理系统ACK机制以及源码分析

    1.ACK机制简介 Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间(timeout)内被完全处理.完全处理的意思是该MessageId绑定的源Tuple以及由该源Tupl ...

  4. MyBatis 源码分析 - SQL 的执行过程

    本文速览 本篇文章较为详细的介绍了 MyBatis 执行 SQL 的过程.该过程本身比较复杂,牵涉到的技术点比较多.包括但不限于 Mapper 接口代理类的生成.接口方法的解析.SQL 语句的解析.运 ...

  5. List接口的常用方法以及ArrayList/LinkedList源码分析

    1.List接口的常用方法 ArrayList list = new ArrayList();list.add(123);list.add(456);list.add("AA"); ...

  6. Android 系统(177)---Android消息机制分析:Handler、Looper、MessageQueue源码分析

    Android消息机制分析:Handler.Looper.MessageQueue源码分析 1.前言 关于Handler消息机制的博客实际上是非常多的了. 之前也是看别人的博客过来的,但是过了一段时间 ...

  7. RabbitMQ自动扩展消费者源码分析

    1 前言 在 RabbitMQ异常监控及动态控制队列消费的解决方案 中,提供了一种在线动态修改消费者数量的方法,但使用该方法需要及时的监控队列消息的堆积情况,不能做到自动扩展(增加或减少)消费者数量, ...

  8. Spark RPC框架源码分析(二)RPC运行时序

    前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...

  9. spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析

    spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析 TransportContext 首先官方文档对Transpor ...

最新文章

  1. php swoole编译,编译安装swoole1.7.9,PHP版本5.6.5
  2. 不同平台上安装python是一样的吗_python3 在不同操作系统安装第三方库方法
  3. ruby 嵌套函数_Ruby嵌套直到循环带有示例
  4. 与机器学习算法有关的数据结构
  5. 数据结构学习笔记:利用栈实现进制转换
  6. FTRL在线学习算法的前世今生-从SGD到TG再到FOBOS与RDA
  7. ai/ml_本月有关AI / ML的令人印象深刻的中等文章
  8. 转载:用图片搜索图片的几个好网站
  9. 正确理解三极管的放大区、饱和区、截止区
  10. 【软件安装】MacBook 安装 MATLAB 2020a
  11. 微软消息队列-MSMQ
  12. CSS3 自定义动画(animation)
  13. istqb纸质证书_关于istqb证书有效期的阿里云论坛用户知识和技术交流
  14. linux调度不执行,linux crond.d定时调度执行一段时间后不执行
  15. 2021-12-23 网工基础(十四) 链路聚合的两种模式、堆叠、集群、IP路由基础
  16. EasyX教程(一)
  17. 2014阿里巴巴秋季校园招聘-软件研发工程师笔试题/面试问题收集
  18. pat 1124 Raffle for Weibo Followers(20 分)
  19. Python Django实现MySQL百万、千万级的数据量下载:解决memoryerror、nginx time out
  20. 单播 、多播(组播)、广播

热门文章

  1. Vue-cli代理解决跨域问题
  2. 从Openvswitch代码看网络包的旅程
  3. python reduce()函数
  4. SQL Server 求结果
  5. APUE 学习笔记(一) Unix基础知识
  6. 必须去收藏14个响应式布局的前端开发框架
  7. hdu 1022 Train Problem I(栈)
  8. PGA Usage Larger than PGA_AGGREGATE_TARGET setting?
  9. python3 deque(双向队列)
  10. Mongodb百亿级数据添加,修改,删除,查询等性能测试【四】