RPC 是一种远程过程的调用,即两台节点之间的数据传输。
每个组件都有它自己的执行环境,RPC 的执行环境就是 RPCENVRPCENVSpark 2.x.x 新增加的,用于替代之前版本的 akka
RPC 是从 SparkEnv 开始启动的:

一.SparkEnv 的创建

在每一个节点都会启动一个 SparkEnv ,而通过启动 SparkContext 就能启动 SparkEnv:

try cache 语句块中有 _env = createSparkEnv(_conf, isLocal, listenerBus).



二.RpcEnv 的创建

SparkEnv#create 方法中:

val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager, clientMode = !isDriver)

NettyRpcEnvFactory#create 中会 new 一个 NettyRpcEnv

三.RpcEndPoint 的创建

作为 Rpc 请求的一种组件抽象,RpcEndPoint 代替了 akka 中的 actor,里面封装了很多方法,比如:

RpcEndPoint 的继承关系图如下:

Rpc 框架中使用 TreadSafeRpcEndpoint 这个实现类,它的继承关系如下:

四.RpcEndPointRef 的创建

akka 框架中有两个组件:actoractorref
前面说到 RpcEndPoint 代替了 akka 中的 actor ,而 RpcEndPointRef 就代替了 actorref
RpcEndPointRef 可以理解为 RpcEndPoint 的引用,通过这个引用来向目标节点发送消息。

因此,每个节点都会有一个或多个 RpcEndPointRefRpcEndPoint


RpcEndPointRef#send : 发送单项异步消息,发送了就完了,不会有返回消息。
RpcEndPointRef#ask:根据默认超时时间发送消息。
RpcEndPointRef#askWithRetry:是一个发送请求并且在默认超时时间范围内等待响应的方法

五.什么是 InboxMessage

节点之间消息传递实际封装在 InboxMessage 中,其继承关系如下:

接收消息的时候根据消息种类调用 RpcEndPoint 中不同的方法进行处理。

六.Dispatcher 处理消息

作为一个消息调度器,Dispatcher 有效地提高 NettyRpcEnv 对消息处理的并发度,将消息发送到对应的 RpcEndPoint

那么,Dispatcher 怎么创建出来的呢?
NettyRpcEnv#dispatcher:
private val dispatcher: Dispatcher = new Dispatcher(this)

Dispatcher 中的组件:

  • RpcEndPoint
  • RpcEndPointRef
  • InboxMessage
  • Inbox
  • EndpointData
  • receivers
  • threadpool

所有的 InboxMessage 被封装在 Inbox 中,Inbox#messages
protected val messages = new java.util.LinkedList[InboxMessage]()

每一个 RpcEnvPoint 负责一个 Inbox

注意 EndpointData ,它将三个组件封装成一个类:


其中,receivers 是用于存储 EndPointData 的阻塞队列,只有当 Inbox 有新 InboxMessage 时,才会将 EndPointData 放入此队列。

既然提到并发度,那就一定有线程池 ThreadPool(默认数量是2):

Dispatcher#MessageLoop:

如上图所示,如果 EndPointData 是读完状态,还会将其放回去让其他 MessageLoops 看到:receivers.offer(PoisonPill)

接下来执行 data.inbox.process(Dispatcher.this)

Dispatcher 内存模型:

七.Inbox#prosess

def process(dispatcher: Dispatcher): Unit = {var message: InboxMessage = nullinbox.synchronized {
/*
private var enableConcurrent = false 是否允许多线程同时处理
private var numActiveThreads = 0 当前激活的线程个数
*/if (!enableConcurrent && numActiveThreads != 0) {return}
// 当前激活线程数为0时执行      message = messages.poll()
// 如果消息为空,则直接返回,否则当前激活线程数+1,表示
// 当前已经有线程执行      if (message != null) {numActiveThreads += 1} else {return}}
/*
判断消息类型,执行对应方法
*/    while (true) {safelyCall(endpoint) {message match {case RpcMessage(_sender, content, context) =>try {endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>throw new SparkException(s"Unsupported message $message from ${_sender}")})} catch {case NonFatal(e) =>context.sendFailure(e)// Throw the exception -- this exception will be caught by the safelyCall function.// The endpoint's onError function will be called.throw e}case OneWayMessage(_sender, content) =>endpoint.receive.applyOrElse[Any, Unit](content, { msg =>throw new SparkException(s"Unsupported message $message from ${_sender}")})case OnStart =>endpoint.onStart()if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {inbox.synchronized {if (!stopped) {enableConcurrent = true}}}case OnStop =>val activeThreads = inbox.synchronized { inbox.numActiveThreads }assert(activeThreads == 1,s"There should be only a single active thread but found $activeThreads threads.")dispatcher.removeRpcEndpointRef(endpoint)endpoint.onStop()assert(isEmpty, "OnStop should be the last message")case RemoteProcessConnected(remoteAddress) =>endpoint.onConnected(remoteAddress)case RemoteProcessDisconnected(remoteAddress) =>endpoint.onDisconnected(remoteAddress)case RemoteProcessConnectionError(cause, remoteAddress) =>endpoint.onNetworkError(cause, remoteAddress)}}inbox.synchronized {// "enableConcurrent" will be set to false after `onStop` is called, so we should check it// every time.if (!enableConcurrent && numActiveThreads != 1) {// If we are not the only one worker, exitnumActiveThreads -= 1return}message = messages.poll()if (message == null) {numActiveThreads -= 1return}}}}

tips: while (true) 语句会一直死循环,以便随时接受 Message

总结:

  • 文字描述

SparkContext 初始化的时候就能启动 SparkEnv,接下来经过一系列的追溯封装,在 RpcEnv#create 方法中 new 一个 NettyRpcEnvFactory 并调用 create 方法,最终由 NettyRpcEnvFactory#create 返回一个我们想要的 env : nettyEnv

作为 Rpc 请求的一种组件抽象,RpcEndPoint 代替了 akka 中的 actor,里面封装了很多消息处理的方法,如 receivereceiveAndReplyakka 框架中有两个组件:actoractorref,前面说到 RpcEndPoint 代替了 akka 中的 actor ,而 RpcEndPointRef 就代替了 actorrefRpcEndPointRef 可以理解为 RpcEndPoint 的引用,通过这个引用来向目标节点发送消息。因此,每个节点都会有一个或多个 RpcEndPointRefRpcEndPoint。这里注意的是,创建 RpcEndPoint ,随后通过它和 RpcEndpointAddress 来创建 RpcEndPointRef 的同时,将这两者注册到 EndPointData 中。

节点之间消息传递实际封装在 InboxMessage 中,所有的 InboxMessage 被封装在 Inbox#messages 中,RpcEndPoint 接收消息的时候根据消息种类调用 RpcEndPoint 中不同的方法进行处理。

作为一个消息调度器,Dispatcher 有效地提高 NettyRpcEnv 对消息处理的并发度,将消息发送到对应的 RpcEndPointDispatcherenv 初始化的时候会被创建出来,每一个 RpcEndPoint 负责一个 Inbox ,注意 EndpointData ,它将三个组件(RpcEndPointRef RpcEndPoint Inbox)封装成一个类,所以,每一个 RpcEndPoint 负责一个 Inbox ,其中,Dispatcher#receivers 是用于存储 EndPointData 的阻塞队列,只有当 Inbox 有新 InboxMessage 时,才会将 EndPointData 放入此队列,然后被线程池处理。

Dispatcher 初始化便创建出来的线程池(默认数量是2):private val threadpool: ThreadPoolExecutor = {...},当上面的 receivers 队列中没内容时,会阻塞。当有 RpcEndpoint 相关请求(即 InboxMessage )的时候就会立刻执行,这里处理 InboxMessage 本质上是调用相应 RpcEndpointinbox 去处理。在 Dispatcher#MessageLoop#run 中有一个 while(true) 语句,它会从 receivers 队列中拿取 EndPointData 数据,如果 EndPointData 是读完状态,还会将其放回去让其他 MessageLoops 看到,接下来执行 data.inbox.process(Dispatcher.this)

Inbox#prosess 中有一个 while (true) 语句,它的作用是判断消息类型,执行对应方法,并且会一直死循环,以便随时接受 Message

  • 实例理解
    Spark2.0.2源码分析——Client 与 Master 之间创建 RPC 通信实例

Spark2.0.2源码分析——RPC 通信机制(消息处理)相关推荐

  1. Spark2.4.0 SparkEnv 源码分析

    Spark2.4.0 SparkEnv 源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 时序图 ...

  2. Pushlet 2.0.3 源码分析

    转载地址:http://blog.csdn.net/yxw246/article/details/2418255 Pushlet 2.0.3 源码分析 ----服务器端 1 总体架构 Pushlet从 ...

  3. 菜鸟读jQuery 2.0.3 源码分析系列(1)

    原文链接在这里,作为一个菜鸟,我就一边读一边写 jQuery 2.0.3 源码分析系列 前面看着差不多了,看到下面一条(我是真菜鸟),推荐木有入门或者刚刚JS入门摸不着边的看看,大大们手下留情,想一起 ...

  4. Android 11.0 Settings源码分析 - 主界面加载

    Android 11.0 Settings源码分析 - 主界面加载 本篇主要记录AndroidR Settings源码主界面加载流程,方便后续工作调试其流程. Settings代码路径: packag ...

  5. Android 8.0系统源码分析--Camera processCaptureResult结果回传源码分析

    相机,从上到下概览一下,真是太大了,上面的APP->Framework->CameraServer->CameraHAL,HAL进程中Pipeline.接各种算法的Node.再往下的 ...

  6. photoshop-v.1.0.1源码分析第三篇–FilterInterface.p

    photoshop-v.1.0.1源码分析第三篇–FilterInterface.p 总体预览 一.源码预览 二.语法解释 三.结构预览 四:语句分析 五:思维导图 六:疑留问题 一.源码预览 {Ph ...

  7. Dubbo系列(二)源码分析之SPI机制

    Dubbo系列(二)源码分析之SPI机制 在阅读Dubbo源码时,常常看到 ExtensionLoader.getExtensionLoader(*.class).getAdaptiveExtensi ...

  8. Memcached源码分析 - 内存存储机制Slabs(5)

    Memcached源码分析 - 网络模型(1) Memcached源码分析 - 命令解析(2) Memcached源码分析 - 数据存储(3) Memcached源码分析 - 增删改查操作(4) Me ...

  9. jQuery 2.0.3 源码分析 Deferred(最细的实现剖析,带图)

    Deferred的概念请看第一篇 http://www.cnblogs.com/aaronjs/p/3348569.html ******************构建Deferred对象时候的流程图* ...

最新文章

  1. 《OpenCV3编程入门》学习笔记5 Core组件进阶(一)访问图像中的像素
  2. (0048)iOS开发之内存管理探究
  3. springmvc面试题2021
  4. webrender 查看是否开启_想要体验极致顺滑的网页加载体验?手动开启Firefox WebRender渲染引擎...
  5. CoreAnimation编程指南(八)事务
  6. 双水泵轮换工作原理图_一用一备式冷凝水泵应急电源的设计与实现
  7. 10g中如何修改数据库字符集-2
  8. Android多个音频源采集,android音频采集
  9. 基于jQ+CSS3页面滚动内容元素动画特效
  10. lisp钢管_技术专栏集合管道模式(上)
  11. 【Thinking In Java】笔记之二 控制执行流程
  12. word文档中实现目录索引中标题加粗,前导符和页码不加粗
  13. 操作系统课堂笔记七-交换技术
  14. 新零售 —— 智慧门店原理详解
  15. unity 所有版本下载地址
  16. web安全入门之SQL注入-时间型盲注
  17. CHINAPLAS国际橡塑展落户深圳,扬帆启航踏新程
  18. 安卓手机变成横屏_安卓平板进化停滞?华为平板用增长证明这纯属偏见
  19. 怎么人像抠图?这几种抠图方法一看就会
  20. FPGA学习书籍汇总【持续更新】

热门文章

  1. Dubbo面试题及答案(2022最新版)
  2. debian11安装kde桌面环境
  3. 单片机 N76E003 EC12 编码器
  4. js页面跳转和打开新页面
  5. 计算机动画顺序在哪里设置,Keynote动画顺序怎么设置
  6. ajax beforesend xhr对象,jQuery中Ajax事件beforesend及各参数含义(示例代码)
  7. win10写java工具_jdk环境变量一键配置工具(Win10可用)
  8. NODE.JS如何开发短信接口以及demo
  9. #科技 #资讯 #生活 RTX 4080显卡在欧跌破建议零售价, 小鹏回应自研电池, Meta因泄漏超5亿用户资料被罚款 ,黑猫投诉开启2022“你我同心 反诈同行”系列活动,这就是今天的其它大新闻
  10. linux brk函数,Linux sbrk/brk函数使用整理