Spark2.0.2源码分析——RPC 通信机制(消息处理)
RPC
是一种远程过程的调用,即两台节点之间的数据传输。
每个组件都有它自己的执行环境,RPC
的执行环境就是 RPCENV
,RPCENV
是 Spark 2.x.x
新增加的,用于替代之前版本的 akka
。
RPC
是从 SparkEnv
开始启动的:
一.SparkEnv 的创建
在每一个节点都会启动一个 SparkEnv
,而通过启动 SparkContext
就能启动 SparkEnv
:
try cache
语句块中有 _env = createSparkEnv(_conf, isLocal, listenerBus)
.
二.RpcEnv 的创建
在 SparkEnv#create
方法中:
val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager, clientMode = !isDriver)
在
NettyRpcEnvFactory#create
中会new
一个NettyRpcEnv
三.RpcEndPoint 的创建
作为 Rpc
请求的一种组件抽象,RpcEndPoint
代替了 akka
中的 actor
,里面封装了很多方法,比如:
RpcEndPoint
的继承关系图如下:
在 Rpc
框架中使用 TreadSafeRpcEndpoint
这个实现类,它的继承关系如下:
四.RpcEndPointRef 的创建
akka
框架中有两个组件:actor
,actorref
。
前面说到 RpcEndPoint
代替了 akka
中的 actor
,而 RpcEndPointRef
就代替了 actorref
。
RpcEndPointRef
可以理解为 RpcEndPoint
的引用,通过这个引用来向目标节点发送消息。
因此,每个节点都会有一个或多个 RpcEndPointRef
和 RpcEndPoint
RpcEndPointRef#send
: 发送单项异步消息,发送了就完了,不会有返回消息。
RpcEndPointRef#ask
:根据默认超时时间发送消息。
RpcEndPointRef#askWithRetry
:是一个发送请求并且在默认超时时间范围内等待响应的方法
五.什么是 InboxMessage
节点之间消息传递实际封装在 InboxMessage
中,其继承关系如下:
接收消息的时候根据消息种类调用 RpcEndPoint
中不同的方法进行处理。
六.Dispatcher 处理消息
作为一个消息调度器,Dispatcher
有效地提高 NettyRpcEnv
对消息处理的并发度,将消息发送到对应的 RpcEndPoint
。
那么,Dispatcher
怎么创建出来的呢?
NettyRpcEnv
#dispatcher
:
private val dispatcher: Dispatcher = new Dispatcher(this)
Dispatcher
中的组件:
RpcEndPoint
RpcEndPointRef
InboxMessage
Inbox
EndpointData
receivers
threadpool
所有的 InboxMessage
被封装在 Inbox
中,Inbox
#messages
:
protected val messages = new java.util.LinkedList[InboxMessage]()
每一个 RpcEnvPoint
负责一个 Inbox
注意 EndpointData
,它将三个组件封装成一个类:
其中,receivers
是用于存储 EndPointData
的阻塞队列,只有当 Inbox
有新 InboxMessage
时,才会将 EndPointData
放入此队列。
既然提到并发度,那就一定有线程池 ThreadPool
(默认数量是2):
Dispatcher
#MessageLoop
:
如上图所示,如果 EndPointData
是读完状态,还会将其放回去让其他 MessageLoops
看到:receivers.offer(PoisonPill)
。
接下来执行 data.inbox.process(Dispatcher.this)
Dispatcher
内存模型:
七.Inbox#prosess
def process(dispatcher: Dispatcher): Unit = {var message: InboxMessage = nullinbox.synchronized {
/*
private var enableConcurrent = false 是否允许多线程同时处理
private var numActiveThreads = 0 当前激活的线程个数
*/if (!enableConcurrent && numActiveThreads != 0) {return}
// 当前激活线程数为0时执行 message = messages.poll()
// 如果消息为空,则直接返回,否则当前激活线程数+1,表示
// 当前已经有线程执行 if (message != null) {numActiveThreads += 1} else {return}}
/*
判断消息类型,执行对应方法
*/ while (true) {safelyCall(endpoint) {message match {case RpcMessage(_sender, content, context) =>try {endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>throw new SparkException(s"Unsupported message $message from ${_sender}")})} catch {case NonFatal(e) =>context.sendFailure(e)// Throw the exception -- this exception will be caught by the safelyCall function.// The endpoint's onError function will be called.throw e}case OneWayMessage(_sender, content) =>endpoint.receive.applyOrElse[Any, Unit](content, { msg =>throw new SparkException(s"Unsupported message $message from ${_sender}")})case OnStart =>endpoint.onStart()if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {inbox.synchronized {if (!stopped) {enableConcurrent = true}}}case OnStop =>val activeThreads = inbox.synchronized { inbox.numActiveThreads }assert(activeThreads == 1,s"There should be only a single active thread but found $activeThreads threads.")dispatcher.removeRpcEndpointRef(endpoint)endpoint.onStop()assert(isEmpty, "OnStop should be the last message")case RemoteProcessConnected(remoteAddress) =>endpoint.onConnected(remoteAddress)case RemoteProcessDisconnected(remoteAddress) =>endpoint.onDisconnected(remoteAddress)case RemoteProcessConnectionError(cause, remoteAddress) =>endpoint.onNetworkError(cause, remoteAddress)}}inbox.synchronized {// "enableConcurrent" will be set to false after `onStop` is called, so we should check it// every time.if (!enableConcurrent && numActiveThreads != 1) {// If we are not the only one worker, exitnumActiveThreads -= 1return}message = messages.poll()if (message == null) {numActiveThreads -= 1return}}}}
tips
: while (true)
语句会一直死循环,以便随时接受 Message
。
总结:
- 文字描述
SparkContext
初始化的时候就能启动 SparkEnv
,接下来经过一系列的追溯封装,在 RpcEnv#create
方法中 new
一个 NettyRpcEnvFactory
并调用 create
方法,最终由 NettyRpcEnvFactory#create
返回一个我们想要的 env
: nettyEnv
。
作为 Rpc
请求的一种组件抽象,RpcEndPoint
代替了 akka
中的 actor
,里面封装了很多消息处理的方法,如 receive
和 receiveAndReply
;akka
框架中有两个组件:actor
,actorref
,前面说到 RpcEndPoint
代替了 akka
中的 actor
,而 RpcEndPointRef
就代替了 actorref
。RpcEndPointRef
可以理解为 RpcEndPoint
的引用,通过这个引用来向目标节点发送消息。因此,每个节点都会有一个或多个 RpcEndPointRef
和 RpcEndPoint
。这里注意的是,创建 RpcEndPoint
,随后通过它和 RpcEndpointAddress
来创建 RpcEndPointRef
的同时,将这两者注册到 EndPointData
中。
节点之间消息传递实际封装在 InboxMessage
中,所有的 InboxMessage
被封装在 Inbox
#messages
中,RpcEndPoint
接收消息的时候根据消息种类调用 RpcEndPoint
中不同的方法进行处理。
作为一个消息调度器,Dispatcher
有效地提高 NettyRpcEnv
对消息处理的并发度,将消息发送到对应的 RpcEndPoint
,Dispatcher
在 env
初始化的时候会被创建出来,每一个 RpcEndPoint
负责一个 Inbox
,注意 EndpointData
,它将三个组件(RpcEndPointRef
RpcEndPoint
Inbox
)封装成一个类,所以,每一个 RpcEndPoint
负责一个 Inbox
,其中,Dispatcher
#receivers
是用于存储 EndPointData
的阻塞队列,只有当 Inbox
有新 InboxMessage
时,才会将 EndPointData
放入此队列,然后被线程池处理。
Dispatcher
初始化便创建出来的线程池(默认数量是2):private val threadpool: ThreadPoolExecutor = {...}
,当上面的 receivers
队列中没内容时,会阻塞。当有 RpcEndpoint
相关请求(即 InboxMessage
)的时候就会立刻执行,这里处理 InboxMessage
本质上是调用相应 RpcEndpoint
的 inbox
去处理。在 Dispatcher
#MessageLoop
#run
中有一个 while(true)
语句,它会从 receivers
队列中拿取 EndPointData
数据,如果 EndPointData
是读完状态,还会将其放回去让其他 MessageLoops
看到,接下来执行 data.inbox.process(Dispatcher.this)
。
在 Inbox
#prosess
中有一个 while (true)
语句,它的作用是判断消息类型,执行对应方法,并且会一直死循环,以便随时接受 Message
。
- 实例理解
Spark2.0.2源码分析——Client 与 Master 之间创建 RPC 通信实例
Spark2.0.2源码分析——RPC 通信机制(消息处理)相关推荐
- Spark2.4.0 SparkEnv 源码分析
Spark2.4.0 SparkEnv 源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 时序图 ...
- Pushlet 2.0.3 源码分析
转载地址:http://blog.csdn.net/yxw246/article/details/2418255 Pushlet 2.0.3 源码分析 ----服务器端 1 总体架构 Pushlet从 ...
- 菜鸟读jQuery 2.0.3 源码分析系列(1)
原文链接在这里,作为一个菜鸟,我就一边读一边写 jQuery 2.0.3 源码分析系列 前面看着差不多了,看到下面一条(我是真菜鸟),推荐木有入门或者刚刚JS入门摸不着边的看看,大大们手下留情,想一起 ...
- Android 11.0 Settings源码分析 - 主界面加载
Android 11.0 Settings源码分析 - 主界面加载 本篇主要记录AndroidR Settings源码主界面加载流程,方便后续工作调试其流程. Settings代码路径: packag ...
- Android 8.0系统源码分析--Camera processCaptureResult结果回传源码分析
相机,从上到下概览一下,真是太大了,上面的APP->Framework->CameraServer->CameraHAL,HAL进程中Pipeline.接各种算法的Node.再往下的 ...
- photoshop-v.1.0.1源码分析第三篇–FilterInterface.p
photoshop-v.1.0.1源码分析第三篇–FilterInterface.p 总体预览 一.源码预览 二.语法解释 三.结构预览 四:语句分析 五:思维导图 六:疑留问题 一.源码预览 {Ph ...
- Dubbo系列(二)源码分析之SPI机制
Dubbo系列(二)源码分析之SPI机制 在阅读Dubbo源码时,常常看到 ExtensionLoader.getExtensionLoader(*.class).getAdaptiveExtensi ...
- Memcached源码分析 - 内存存储机制Slabs(5)
Memcached源码分析 - 网络模型(1) Memcached源码分析 - 命令解析(2) Memcached源码分析 - 数据存储(3) Memcached源码分析 - 增删改查操作(4) Me ...
- jQuery 2.0.3 源码分析 Deferred(最细的实现剖析,带图)
Deferred的概念请看第一篇 http://www.cnblogs.com/aaronjs/p/3348569.html ******************构建Deferred对象时候的流程图* ...
最新文章
- 《OpenCV3编程入门》学习笔记5 Core组件进阶(一)访问图像中的像素
- (0048)iOS开发之内存管理探究
- springmvc面试题2021
- webrender 查看是否开启_想要体验极致顺滑的网页加载体验?手动开启Firefox WebRender渲染引擎...
- CoreAnimation编程指南(八)事务
- 双水泵轮换工作原理图_一用一备式冷凝水泵应急电源的设计与实现
- 10g中如何修改数据库字符集-2
- Android多个音频源采集,android音频采集
- 基于jQ+CSS3页面滚动内容元素动画特效
- lisp钢管_技术专栏集合管道模式(上)
- 【Thinking In Java】笔记之二 控制执行流程
- word文档中实现目录索引中标题加粗,前导符和页码不加粗
- 操作系统课堂笔记七-交换技术
- 新零售 —— 智慧门店原理详解
- unity 所有版本下载地址
- web安全入门之SQL注入-时间型盲注
- CHINAPLAS国际橡塑展落户深圳,扬帆启航踏新程
- 安卓手机变成横屏_安卓平板进化停滞?华为平板用增长证明这纯属偏见
- 怎么人像抠图?这几种抠图方法一看就会
- FPGA学习书籍汇总【持续更新】
热门文章
- Dubbo面试题及答案(2022最新版)
- debian11安装kde桌面环境
- 单片机 N76E003 EC12 编码器
- js页面跳转和打开新页面
- 计算机动画顺序在哪里设置,Keynote动画顺序怎么设置
- ajax beforesend xhr对象,jQuery中Ajax事件beforesend及各参数含义(示例代码)
- win10写java工具_jdk环境变量一键配置工具(Win10可用)
- NODE.JS如何开发短信接口以及demo
- #科技 #资讯 #生活 RTX 4080显卡在欧跌破建议零售价, 小鹏回应自研电池, Meta因泄漏超5亿用户资料被罚款 ,黑猫投诉开启2022“你我同心 反诈同行”系列活动,这就是今天的其它大新闻
- linux brk函数,Linux sbrk/brk函数使用整理