RpcEndpoint

文档对RpcEndpoint的解释:
An end point for the RPC that defines what functions to trigger given a message. It is guaranteed that onStart, receive and onStop will be called in sequence. The life-cycle of an endpoint is: constructor -> onStart -> receive* -> onStop Note: receive can be called concurrently. If you want receive to be thread-safe, please use ThreadSafeRpcEndpoint If any error is thrown from one of RpcEndpoint methods except onError, onError will be invoked with the cause. If onError throws an error, RpcEnv will ignore it.

其子类继承关系如下:

其下面还有一个抽象子接口:ThreadSafeRpcEndpoint

文档对ThreadSafeRpcEndpoint的解释如下:
需要RpcEnv线程安全地向其发送消息的trait。线程安全意味着在通过相同的ThreadSafeRpcEndpoint处理一条消息完成后再处理下一个消息。换句话说,在处理下一条消息时,可以看到对ThreadSafeRpcEndpoint的内部字段的更改,并且ThreadSafeRpcEndpoint中的字段不需要是volatile或等效的。但是,不能保证同一个线程将为不同的消息执行相同的ThreadSafeRpcEndpoint。
即顺序处理消息,不能同时并发处理。traint RpcEndpoint的方法如下:

对其变量和方法解释如下:

1. rpcEnv:RpcEndpoint 注册的那个 RpcEnv 对象

2. self : RpcEndpoint 对应的 RpcEndpointRef。onStart 方法被调用的时候,RpcEndpointRef有效,onStop 调用后,self会是null,注意由于在onStart之前,RpcEndpoint 还没有被注册,还没有有效的RpcEndpointRef,所以不要在onStart之前调用 self 方法

3. receive :处理从RpcEndpointRef.send 或 RpcCallContext.reply 过来的消息,如果接收到一个未匹配的消息,会抛出 SparkException 并且发送给onError 方法

4. receiveAndReply:处理从RpcEndpointRef.ask发过来的消息,如果接收到一个未匹配的消息,会抛出 SparkException 并且发送给onError 方法

5. onError: 在消息处理过程中,如果有异常都会调用此方法

6. onConnected:当remoteAddress 连接上当前节点时被调用

7. onDisconnected: 当当前节点丢失掉 remoteAddress 后被调用

8. onNetworkError:当连接当前节点和remoteAddress时,有网络错误发生时被调用

9. onStart:在RpcEndpoint开始处理其他消息之前被调用

10. onStop:当RpcEndpoint停止时被调用,self 将会是null,不能用于发送消息

11. stop: 停止RpcEndpoint

RpcEndPointRef

RpcEndPointRef:远程的RpcEndpoint引用,RpcEndpointRef是线程安全的。

有一个跟RpcEndPoint 很像的类 -- RpcEndPointRef。先来看 RpcEndpointRef抽象类。下面我们重点来看一下它内部构造。

首先看它的继承结构:

它的父类是 RpcEndpointRef。先来剖析它的内部变量和方法的解释:

有三个成员变量:

1. maxRetries: 最大尝试连接次数。可以通过 spark.rpc.numRetries 参数来指定,默认是 3 次。 该变量暂时没有使用。

2. retryWaitMs:每次尝试连接最大等待毫秒值。可以通过 spark.rpc.retry.wait 参数来指定,默认是 3s。该变量暂时没有使用。

3. defaultAskTimeout: spark 默认 ask 请求操作超时时间。 可以通过 spark.rpc.askTimeout 或 spark.network.timeout参数来指定,默认是120s。

成员方法:

1. address : 抽象方法,返回 RpcEndpointRef的RpcAddress

2. name:抽象方法,返回 endpoint 的name

3. send: 抽象方法,Sends a one-way asynchronous message. Fire-and-forget semantics. 发送单向的异步消息,满足 即发即忘 语义。

4. ask:抽象方法。发送消息到相应的 RpcEndpoint.receiveAndReply , 并返回 Future 以在默认超时内接收返回值。它有两个重载方法:其中没有RpcTimeOut 的ask方法添加一个 defaultAskTimeout 参数继续调用 有RpcTimeOut 的ask方法。

5. askSync:调用抽象方法ask。跟ask类似,有两个重载方法:其中没有RpcTimeOut 的askSync方法添加一个 defaultAskTimeout 参数继续调用 有RpcTimeOut 的askSync方法。有RpcTimeOut 的askSync方法 会调用 ask 方法生成一个Future 对象,然后等待任务执行完毕后返回。
注意,这里面其实就涉及到了模板方法模式。ask跟askSync都是设定好了,ask 要返回一个Future 对象,askSync则是 调用 ask 返回的Future 对象,然后等待 future 的 result 方法返回。

下面看RpcEndpointRef 的唯一实现类 - NettyRpcEndpointRef

RpcEndpointRef的NettyRpcEnv版本。此类的行为取决于它的创建位置。在“拥有”RpcEndpoint的节点上,它是RpcEndpointAddress实例的简单包装器。在接收序列化版本引用的其他计算机上,行为会发生变化。实例将跟踪发送引用的TransportClient,以便通过客户端连接发送到端点的消息,而不需要打开新连接。此ref的RpcAddress可以为null;这意味着ref只能通过客户端连接使用,因为托管端点的进程不会侦听传入连接。不应与第三方共享这些引用,因为它们将无法向端点发送消息。

先来看 成员变量:

1. conf : 是一个SparkConf 实例

2. endpointAddress:是一个RpcEndpointAddress 实例,主要包含了 RpcAddress (host和port) 和 rpc endpoint name的信息

3. nettyEnv:是一个NettyRpcEnv实例

4. client: 是一个TransportClient实例,这个client 是不参与序列化的。

成员方法:

1. 实现并重写了继承自超类的ask方法, 如下:

2. 实现并重写了继承自超类的send方法,如下:

3. 关于序列化和反序列化的两个方法:writeObject(序列化方法)和 readObject(反序列化方法),如下:

RequestMessage

顺便,我们来看RequestMessage对象,代码如下:

RequestMessage里面的消息是sender 发给 receiver 的,RequestMessage主要负责sender RpcAddress, receiver RpcAddress,receiver rpcendpoint name以及 消息 content 的序列化。

总结: 本文主要剖析了 RpcEndpoint和RpcEntpointRef两个类,顺便,也介绍了支持序列化的 RequestMessage 类。

注:到目前为止,Spark RPC组件还没有全部剖析完毕,预计还有三到四篇文章才能完全剖析完, be patient ?

转载于:https://www.cnblogs.com/johnny666888/p/11135539.html

spark 源码分析之七--Spark RPC剖析之RpcEndPoint和RpcEndPointRef剖析相关推荐

  1. spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析

    spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析 TransportContext 首先官方文档对Transpor ...

  2. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  3. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

  4. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  5. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  6. spark 源码分析之十九 -- DAG的生成和Stage的划分

    上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...

  7. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  8. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  9. Spark源码分析 – DAGScheduler

    DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...

最新文章

  1. 计算机视觉 | 优秀实用的OpenCV开源项目汇总
  2. Resharper的使用
  3. 「DB」数据库事务的隔离级别
  4. C++ Fermat‘s little theorem费马小定理寻找模逆实现算法(附完整源码)
  5. Oracle几种查找和删除重复记录的方法总结
  6. .Net微服务实战之技术选型篇
  7. hibernate开发优缺点
  8. html position的学习
  9. SharePoint 2010开发实例精选——通过客户端对象模型删除页面上的Web部件
  10. java 网络字节序转主机字节序_C语言高级编程——网络编程技术
  11. 如何在Mac电脑上更改地区或国家位置设定?
  12. python获取a股报表数据_python获取A股基础数据
  13. 学习人工智能深度学习需要掌握的python语法糖
  14. pojo类继承pojo类_如何编写更好的POJO服务
  15. 天正电气图例_天正电气CAD教程之符号篇 - CAD自学网
  16. html+css基础入门学习教程之HTML 样式
  17. 苹果手机使用技巧汇总,手把手教你如何快速使用苹果手机
  18. 解决office的PPT和WPS的PPT不兼容的问题
  19. rk3588 与 rk3399 差异比较
  20. Mendix开发介绍实用篇(一)

热门文章

  1. 集成ShareSDK,分享成功后QQ和空间回调不执行的可能原因
  2. php 时间倒计时代码 个人写法 有好的想法的欢迎贴出来分享
  3. hdu 5591 ZYB's Game 博弈论
  4. html 加载后删除,document.write()应该在我的页面加载后删除所有现有的html?
  5. C++报错无效的预处理命令include_无废话--Mac OS, VS Code 搭建c/c++基本开发环境
  6. Centos在虚拟机内可以ping通,在ssh内无法ping通外网
  7. linux查看服务依赖关系,服务管理(1)
  8. 福建省高等学校非计算机考试大纲,福建省高等院校学生计算机一级考试大纲
  9. java socket中属性详解_前端开发:关于Vue组件中的data属性值是函数而不是对象的详解...
  10. ANSYS报错Accelerations are exceeding internal limit解决方法