1、概述

在Spark中很多地方都涉及网络通信,比如Spark各个组件间的消息互通、用户文件与Jar包的上传、节点间的Shuffle过程、Block数据的复制与备份等。Spark 2.0 之后,master 和worker 之间完全不使用akka 通信,改用netty实现。因为使用Akka要求message发送端和接收端有相同的版本,为了避免Akka造成的版本问题,给用户的应用更大灵活性,决定使用更通用的RPC实现。

spark 基于netty新的rpc框架借鉴了Akka的中的设计,它是基于Actor模型,如下图所示:

Spark通讯框架中各个组件(Client/Master/Worker)可以认为是一个个独立的实体,各个实体之间通过消息来进行通信。具体各个组件之间的关系图如下:

Endpoint(Client/Master/Worker)有1个InBox和N个OutBox(N>=1,N取决于当前Endpoint与多少其他的Endpoint进行通信,一个与其通讯的其他Endpoint对应一个OutBox),Endpoint接收到的消息被写入InBox,发送出去的消息写入OutBox并被发送到其他Endpoint的InBox中。

2、Spark通信架构

(1)RpcEndpoint:RPC端点,Spark针对每个节点(Client/Master/Worker)都称之为一个Rpc端点,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher;

(2)RpcEnv:RPC上下文环境,每个RPC端点运行时依赖的上下文环境称为RpcEnv;

(3)Dispatcher:消息分发器,针对于RPC端点需要发送消息或者从远程RPC接收到的消息,分发至对应的指令收件箱/发件箱。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;

(4)Inbox:指令消息收件箱,一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中,另外Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费;

(5)RpcEndpointRef:RpcEndpointRef是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。

(6)OutBox:指令消息发件箱,对于当前RpcEndpoint来说,一个目标RpcEndpoint对应一个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox后,紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;

(7)RpcAddress:表示远程的RpcEndpointRef的地址,Host + Port。

(8)TransportClient:Netty通信客户端,一个OutBox对应一个TransportClient,TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer;

(9)TransportServer:Netty通信服务端,一个RpcEndpoint对应一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱;

3、相关源码阅读

3.1、RpcEnv

RpcEnv是Rpc的环境(相当于Actor中的ActorSystem),所有的RPCEndPoint都需要注册给RpcEnv实例对象(注册的时候会指定注册的名称,这样客户端就可以通过名称查询到RPCEndPoint的RPCEndPointRef引用,进而进行通信(客户端通过操作RPCEndPointRef要给RpcEndPoint发信息,怎么发要RpcEnv去管理,RpcEnv在具体的实例看见发的信息,因为有Ref肯定有路由,就路由到远程的具体的RpcEndPoint实体内部的receive方法中)),如果不注册的话收不到消息。所有的RpcEndPoint其实都是属于RpcEnv的,只有属于他客户端发消息的时候才能把信息路由给RpcEndPoint。

也就是RpcEnv 是一个RPC 环境。 RpcEndpoint需要使用RpcEnv的名称来注册自己,以接收消息。RpcEnv将处理从RpcEndpointRef或远程节点发送的消息,并将它们传递到相应的RpcEndpoint。

RpcEnv是个抽象类,作为Rpc通信肯定要传入SparkConf,因为是分布式的,在spark2.x版本中,使用的具体的实现类是NettyRpcEnv。RpcEnv的结构如下:

在RpcEnv的伴生对象中,重要的功能是创建一个RpcEnv的实例,这个实例就是用于管理endpoint。

在 RpcEnv中,有一个重要且常用的方法setupEndpoint方法。该方法就是利用上面创建的RpcEnv实例,来初始化(注册)一个Endpoint,并返回该endpoint的RpcEndpointRef(代理对象)。当我们调用RpcEnv中的setupEndpoint来注册一个endpoint到rpcEnv的时候,在NettyRpcEnv内部,会将该endpoint的名称与其本省的映射关系,rpcEndpoint与rpcEndpointRef之间映射关系保存在dispatcher对应的成员变量中。我们拿到Endpoint的代理对象后就能向该endpoint发送消息

还有一个setupEndpointRef 方法来获取到指定endpoint的引用对象

下面看看master端的代码,master在启动的时候在其伴生对象中会有一个rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)方法创建一个RpcEnv的实例,这个实例就是用于管理endpoint。然后masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))获得一个具体一个具体的endpoint的实例

3.2、RpcEndpointRef

RpcEndpointRef是RpcEndpoint的一个引用,简称代理。RpcEndpointRef是线程安全的。

要想向一个RpcEndpoint发送消息,必须先持有其Ref代理,通过该代理才能发送消息。RpcEnv结构如下

常用的发送消息的方法是send,ask,askWithRetry,他们之间的区别:send发送的消息,没有返回值,用receive接收即可;ask和askWithRetry,发送的消息有返回值,用receiveAndReply来接收。

下面看看worker端的代码:worker获取到master的ref引用对象,然后发送注册消息

3.3、在standalone模式中worker向master注册案例

(1)在worker启动的时候有onStart方法,这里面调用了registerWithMaster,这里面用了tryRegisterAllMaster方法在具体注册的时候向所有的master提交,是用线程池的中一个线程来提交。然后就获得了masterEndpoint。获得了masterEndpoint之后,将其作为参数传入registerWithMaster方法。然后通过ask发送消息。

(2)当调用ask将消息发送出去。其实是调用NettyRpcEndpointRef中ask,在方法中当前发送地址(nettyEnv.address),目标的master地址(this)和发送的消息message被封装成了RequestMessage消息。

(3)在NettyRpcEnv.ask中如果是远程rpc调用的话,最终ask将调用postToOutbox函数,并且此时消息会被序列化成Byte流。实现如下:

(4)在postToOutbox函数中,消息将经过OutboxMessage中的sendWith方法(client.sendRpc(content)),最终通过TransportClient的sendRpc方法(client.sendRpc(content)),而在TransportClient中将消息进一步封装,然后发送给master。

(5)在master端TransportRequestHandler的handle方法中,由于信息在worker端被分装成了RpcRequest,所以在该handle方法中,将调用processRpcRequest进行处理。

(6)processRpcRequest函数将调用rpcHandler的实现类NettyRpcHandler中的receive方法。在该方法中,首先通过internalRecieve将消息解包成RequestMessage。然后该消息通过dispatcher的分发给对应的endpoint

(7)在Dispatcher的postMessage方法中,可以看到,首先根据对应的endpoint的EndpointData信息(主要是该endpoint及其应用以及其信箱(inbox)),然后将消息塞到给endpoint(此例中的master)的信箱中,最后将消息塞到recievers的阻塞队列中。

(8)在Dispatcher中有一个线程池threadpool在MessageLoop类的run方法中,将receivers中的对象取出来,交由信箱的process方法去处理。如果没有收到任何消息,将会阻塞在take处

(9)在inbox的proces方法中,首先取出消息,然后根据消息的类型,最终将调用endpoint的receiver方法进行处理(也就是master中的receive方法)。至此,整个一次rpc调用的流程结束。

总结:①当调用ask将消息发送出去。其实是调用NettyRpcEndpointRef中的ask等方法,并将消息封装②NettyRpcEndpointRef中的ask方法调用了NettyRpcEnv.ask如果是远程rpc调用的话,最终ask将调用postToOutbox函数,并且此时消息会被序列化成Byte流。③在postToOutbox函数中调用OutboxMessage中的sendWith方法中调用TransportClient的sendRpc方法,在TransportClient中将消息进一步封装,然后发送给master④在master端TransportRequestHandler的handle方法中进行消息类型判断,调用processRpcRequest函数⑤processRpcRequest函数将调用rpcHandler的实现类NettyRpcHandler中的receive方法,然后该消息通过dispatcher的分发给对应的endpoint⑥在Dispatcher的postMessage方法中,可以看到,首先根据对应的endpoint的EndpointData信息放到inbox信箱中,最后将消息塞到recievers的阻塞队列中⑦在Dispatcher中有一个线程池threadpool在MessageLoop类的run方法中,将receivers中的对象取出来,交由信箱的process方法去处理。如果没有收到任何消息,将会阻塞在take处⑧在inbox的proces方法中,首先取出消息,然后根据消息的类型,最终将调用endpoint的receiver或receiveAndReply方法进行处理(也就是master中的receive方法)。

Spark2.x RPC解析相关推荐

  1. Hadoop RPC解析

    RPC(Remote Procedure Call)即是远程过程调用,简单的理解就是调用远程计算机上的服务,就像调用本地服务一样.而不需要了解底层网络技术的协议.RPC采用的是C/S模式:请求部分是一 ...

  2. 怎么改PHP_PHP实现RPC(简版)

    概述 RPC这个东西是什么? 第一次听说他, 还要在它的前边加个G, 当时我以为GRPC是一项技术, 后来才知道, 并不是这样. GRPC只是RPC的谷歌实现. 谷歌搜了一下, RPC就是一种: 远程 ...

  3. PHP实现RPC(简版)

    概述 RPC这个东西是什么? 第一次听说他, 还要在它的前边加个G, 当时我以为GRPC是一项技术, 后来才知道, 并不是这样. GRPC只是RPC的谷歌实现. 谷歌搜了一下, RPC就是一种: 远程 ...

  4. TDS协议解析(转载)

    最近在做TDS协议解析,但国内很少有TDS的资料,特此转载从国外一个网站弄来的TDS资料,不是特别全,可能也有些乱(比如今天做的RPC包的解析,看了好久才看明白,有机会的话我把RPC解析贴出来,RPC ...

  5. Flink rpc实践

    Flink的RPC服务是基于Akka Remote实现的 一个简单的Akka Remoting ActorSystem的配置如下(基于akka 2.6.0版本): 服务端配置及代码:applicati ...

  6. micro api入门

    上一节学习了go-micro的入门. 这一节我们来熟悉micro的工具集里的api用法. micro api网关 Micro的api就是api网关 API参考了API网关模式为服务提供了一个单一的公共 ...

  7. Spark2.1.0之内置RPC框架

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/80799622 在Spark中很多地方都涉及网络通 ...

  8. 100 行代码透彻解析 RPC 原理

    欢迎关注方志朋的博客,回复"666"获面试宝典 文章来源:https://sourl.cn/HpZHvy 引 言 本文主要论述的是"RPC 实现原理",那么首先 ...

  9. RPC 的概念模型与实现解析

    今天分布式应用.云计算.微服务大行其道,作为其技术基石之一的 RPC 你了解多少?一篇 RPC 的技术总结文章,数了下 5k+ 字,略长,可能也不适合休闲的碎片化时间阅读,可以先收藏抽空再细读:) 全 ...

最新文章

  1. 利用SAP FR高效预测客户需求
  2. Latex appendix 生成附录A和B
  3. HDU 5752.Sqrt Bo
  4. Java多线程基本概念
  5. js中占位符总结积累
  6. [代码发布]中文文字转换组件 1.0,支持VB/ASP编程
  7. mount: RPC: Unable to receive; errno = Connection refused 的解决方法
  8. 微猫恋爱撩妹术V2 4.1.0-多开版
  9. 上海富勒wms_【3PL | 多家三方物流应用富勒WMS,仓配一体助力供应链升级】
  10. 微信消息模板换行符转义问题处理
  11. 《实战java程序设计---上》
  12. 新视野大学英语读写2 78单元翻译
  13. PTA数据结构7.1给定一个初始为空的栈和一系列压栈、弹栈操作,请编写程序输出每次弹栈的元素。栈的元素值均为整数。
  14. StikyNotes便签软件
  15. java的jdk安装教程附百度网盘链接环境配置遇到的各种问题版本选择
  16. python重装之前要卸载吗_关于fedroa下安全地卸载和重装python
  17. 关于本人的网络地址请移步简书
  18. word 向程序发送命令时出现错误
  19. C++学习日记#1.1——四元一次方程组进行Sor松弛迭代法求解(现已推向n元一次方程组进行Sor松弛迭代法求解)
  20. IOS 将百度网盘中的文件直接发到微信而不是通过小程序或是网盘链接

热门文章

  1. mysql基础14(关于mysql数据库在没有主键情况下去除重复数据办法)
  2. Python_迭代器和生成器的复习_38
  3. Python 37 进程池与线程池 、 协程
  4. Mybatis中trim的使用
  5. 程序员经常说的「设计模式」到底是什么?
  6. 民企信息化建设个人经历(四)
  7. window.cookie
  8. 含有5亿个整数的大文件,如果排序?
  9. java并发编程实战阅读总结(b)
  10. 秀操作 | 函数宏的三种封装方式