Spark运行环境之SparkEnv和通信工具RpcEnv
Spark在运行时Driver端和Executor端需要互相通信,那么这种通信是如何进行的?
在SparkEnv
中有两个方法createDriverEnv
,createExecutorEnv
,分别用于创建Driver端和Executor端的SparkEnv
对象。
看一下SparkEnv
对象的结构,从下面的代码中可以看到SparkEnv
包含了Spark中很多重要组件,比如用于通信的RpcEnv
,用于序列化的SerializerManager
,还包括ShuffleManager
、BroadcastManager
、BlockManager
,MemoryManager
等用于管理Shuffle,broadcast,block和memory的组件。
class SparkEnv (val executorId: String,private[spark] val rpcEnv: RpcEnv,val serializer: Serializer,val closureSerializer: Serializer,val serializerManager: SerializerManager,val mapOutputTracker: MapOutputTracker,val shuffleManager: ShuffleManager,val broadcastManager: BroadcastManager,val blockManager: BlockManager,val securityManager: SecurityManager,val metricsSystem: MetricsSystem,val memoryManager: MemoryManager,val outputCommitCoordinator: OutputCommitCoordinator,val conf: SparkConf)
接下来的文章中主要从Driver端和Executor端两个角度的源码来分析SparkEnv
对象的生成过程,以及其中的RpcEnv
是如何实现Driver端和Executor端通信的。
在此之前,插入一个题外话,原生的Spark代码中,由于SparkEnv
的限制会使得同一JVM中无法共存多个SparkContext
,如果不解决SparkEnv
上的限制的话,会由于Driver端和Executor端Broadcast等组件的不匹配导致SparkContext
环境异常。原因如下所示,SparkEnv
的伴生对象中有一个SparkEnv
类型的对象,当Driver端或者Executor端构建SparkEnv
时,会SparkEnv.set
该对象,然后在多个地方直接通过SparkEnv.get
方法来获取该对象。如果存在多个SparkContext
,那么后面创建的SparkContext
触发的SparkEnv.set
操作会将之前的env
覆盖。当执行任务时就会出现SparkEnv
中各组件不匹配了。
object SparkEnv extends Logging {@volatile private var env: SparkEnv = _...def set(e: SparkEnv) {env = e}/*** Returns the SparkEnv.*/def get: SparkEnv = {env}...
}
一、SparkEnv的创建
1、调用栈分析
(1)Driver端
Driver端创建SparkEnv
对象是在SparkContext
中进行的,调用栈如下,
SparkContext#createSparkEnv
----> SparkEnv.createDriverEnv
--------> SparkEnv.create
(2)Executor端
Executor端创建SparkEnv
对象的过程是,
CoarseGrainedExecutorBackend#run
----> SparkEnv.createExecutorEnv
--------> SparkEnv.create
Executor启动过程
Spark在启动时,将Executor端的启动命令通过Yarn分发到各节点,然后在本地启动CoarseGrainedExecutorBackend
进程,这部分的逻辑可以参考,
CoarseGrainedExecutorBackend
的入口是其main
方法,使用方法如下Usage: CoarseGrainedExecutorBackend [options]Options are:--driver-url <driverUrl>--executor-id <executorId>--hostname <hostname>--cores <cores>--app-id <appid>--worker-url <workerUrl>--user-class-path <url>
启动命令在
ExecutorRunnable#prepareCommand
中生成,这个方法的调用是由ExecutorRunnable#startContainer
触发的,从方法名看,这个是启动Executor节点的地方。val commands = prefixEnv ++ Seq(YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java","-server") ++javaOpts ++Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend","--driver-url", masterAddress,"--executor-id", executorId,"--hostname", hostname,"--cores", executorCores.toString,"--app-id", appId) ++userClassPath ++Seq(s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
上面就是在各个节点上启动Executor服务的命令,可以在机器上看到以下形式的java进程
184629 CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.0.110.110:3990 --executor-id 93 --hostname hostname --cores 4 --app-id application_1530195947232_7897078 --user-class-path file:/data1/nodemanager/usercache/master/appcache/application_1530195947232_7897078/container_e33_1530195947232_7897078_01_000159/__app__.jar
再看一下Executor启动时是如何获取到Driver端的
SparkConf
配置的。SparkConf
对象在创建SparkEnv
时会用到。val executorConf = new SparkConfval port = executorConf.getInt("spark.executor.port", 0)// 创建一个RpcEnv,从Driver请求val fetcher = RpcEnv.create("driverPropsFetcher",hostname,port,executorConf,new SecurityManager(executorConf),clientMode = true)val driver = fetcher.setupEndpointRefByURI(driverUrl)val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig)val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))fetcher.shutdown()// 新建一个SparkConf对象,将Rpc获取的参数都赋给该对象val driverConf = new SparkConf()for ((key, value) <- props) {// this is required for SSL in standalone modeif (SparkConf.isExecutorStartupConf(key)) {driverConf.setIfMissing(key, value)} else {driverConf.set(key, value)}}
Executor启动时就会通过CoarseGrainedExecutorBackend#run
方法生成SparkEnv
对象。所以上面要先分析Executor的启动过程。
Driver端生成SparkEnv
的过程可以直接查看SparkContext
中的逻辑,这个过程比较简单。
2、SparkEnv#create
上面Driver端和Executor端的调用栈,最终都是进入到了同一个方法SparkEnv#create
方法中。只要搞清楚了这个方法的逻辑,也就知道了Driver端和Executor端是如何构建SparkEnv
对象的了。
源代码如下,只保留其中的关键逻辑。
/* Driver端调用该方法时传入的参数如下:* executorId: driver* bindAddress: spark.driver.bindAddress参数指定,默认与spark.driver.host参数相同,取driver主机名* advertiseAddress: spark.driver.host参数指定,默认取driver主机名* * Executor端调用该方法时传入的参数如下:* conf: driverConf 从driver端获取的SparkConf对象* executorId: Executor启动时的编号,例如--executor-id 93* bindAddress: Executor所在主机名,例如--hostname hostname* advertiseAddress: 和bindAddress相同*/private def create(conf: SparkConf,executorId: String,bindAddress: String,advertiseAddress: String,port: Int,isLocal: Boolean,numUsableCores: Int,ioEncryptionKey: Option[Array[Byte]],listenerBus: LiveListenerBus = null,mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {// 判断是不是driver端,driver端的识别符号是“driver”val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER// 根据是否driver,生成不同的systemName用于构建rpcEnv对象,driver端为"sparkDriver",executor端为"sparkExecutor"val systemName = if (isDriver) driverSystemName else executorSystemName// 创建RpcEnv对象,下一节中详细分析val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf, securityManager, clientMode = !isDriver)val serializer = ...val serializerManager = ...val closureSerializer = ...val broadcastManager = ...val mapOutputTracker = ...val shuffleManager = ...val useLegacyMemoryManager = ...val memoryManager: MemoryManager = ...val blockManagerPort = ...val blockTransferService = ...val blockManager = ...val metricsSystem = ...val outputCommitCoordinator = ...val outputCommitCoordinatorRef = ...val envInstance = new SparkEnv(executorId,rpcEnv,serializer,closureSerializer,serializerManager,mapOutputTracker,shuffleManager,broadcastManager,blockManager,securityManager,metricsSystem,memoryManager,outputCommitCoordinator,conf)envInstance}
二、RpcEnv测试
接下来首先做一个小测试,通过spark-core提供的功能,模拟测试Rpc通信的过程。
1、构建后端服务
首先需要有一个长时间运行的后端服务,服务端完整代码如下所示,首先通过RpcEnv.create
方法构造一个RpcEnv对象,然后通过RpcEnv.setupEndpoint
方法向该对象中set一个自定义的HelloworldEndpoint
,该类需要继承自RpcEndpoint
。在RpcEndpoint
中有一些方法可以覆盖实现,比如onStart
可以增加一些服务启动时的逻辑功能,onStop
可以增加一些服务停止时的功能,receiveAndReply
可以处理客户端发送过来的请求。
object HelloworldServer {def main(args: Array[String]) {// 初始化RpcEnv环境val conf = new SparkConfval rpcEnv: RpcEnv = RpcEnv.create("hello-server", "localhost", 52345, conf, new SecurityManager(conf))// 当前RpcEnv设置后端服务val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)rpcEnv.setupEndpoint("hello-service", helloEndpoint)// 等待客户端访问该后端服务rpcEnv.awaitTermination()}
}class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {override def onStart(): Unit = {println("start hello endpoint")}override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case SayHi(msg) => {println(s"receive $msg")context.reply(s"hi, $msg")}case SayBye(msg) => {println(s"receive $msg")context.reply(s"bye, $msg")}}override def onStop(): Unit = {println("stop hello endpoint")}
}case class SayHi(msg: String)
case class SayBye(msg: String)
运行效果如下图所示,
2、构建前端请求
后端服务稳定运行后,我们如何访问该服务,接下来看一下客户端的代码,
object HelloworldClient {def main(args: Array[String]) {// 初始化RpcEnv环境val conf = new SparkConf// 这里的rpc环境主机需要指定本机,端口号可以任意指定val rpcEnv = RpcEnv.create("hello-client", "localhost", 52346, conf, new SecurityManager(conf))// 根据Server端IP + Port获取后端服务的引用,得到的是RpcEndpointRef类型对象val endpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")// 1、客户端异步请求// 客户端通过RpcEndpointRef#ask方法异步访问服务端,服务端通过RpcEndpoint#receiveAndReply方法获取到该请求后处理val future = endpointRef.ask[String](SayBye("neo"))// 客户端请求成功/失败时的处理方法future.onComplete {case scala.util.Success(value) ⇒ println(s"Got the result = $value")case scala.util.Failure(e) => println(s"Got error: $e")}// 客户端等待超时时间Await.result(future, Duration("5s"))// 2、客户端同步请求val resp = endpointRef.askWithRetry[String](SayHi("hehe"))print(resp)}
}
运行效果如下,可以看到52436端口被用来启动了一个hello-client
服务,接下来就是通过获取的EndpointRef
连接到上面启动的Driver上。
代码的关键是RpcEnv.setupEndpointRef
,通过一个RpcAddress
类指定server服务的主机名和端口号,并且要指定访问server上的哪个服务,下面代码中的hello-service
必须与上面保持一致。如果写的不是hello-service
可以看到报错如下,客户端还是正常连接到了服务端的端口,但是无法在服务端找到spark://hello-x@localhost:52345
服务。
总结来说就是,服务端需要通过RpcEnv.setupEndpoint
设置一个RpcEndpoint
的具体实现类,该类有一些必须实现的方法处理客户端的请求。客户端通过RpcEnv.setupEndpointRef
的方式获取服务端RpcEndpoint
服务的引用,得到的RpcEndpointRef
对象有send
,ask
,askSync
等方法去访问服务。
三、RpcEnv分析
Spark中Driver端和Executor端通信主要通过RpcEnv
来实现。两端的RpcEnv
对象创建过程在SparkEnv#create
方法中已经看到过了。
有关Rpc的代码在org.apache.spark.rpc
包中,其中还有一个名为netty
的子package。下面过程中涉及到的类主要有。这些不同类型的对象主要可以分为三类,分别是
- 环境相关,主要包括
RpcEnv
,NettyRpcEnv
,RpcEnvConfig
,NettyRpcEnvFactory
, - Server相关,主要是
RpcEndpoint
,ThreadSafeRpcEndpoint
, - Client相关,代表
RpcEndpoint
的引用,比如RpcEndpointRef
,NettyRpcEndpointRef
1、RpcEnv生成调用栈
生成RpcEnv
对象的基本调用过程如下所示,最终是通过NettyRpcEnvFactory#create
方法得到了一个NettyRpcEnv
对象,NettyRpcEnv
继承自RpcEnv
类。
SparkEnv#create
----> RpcEnv#create
--------> NettyRpcEnvFactory#create
RpcEnv#create
在RpcEnv
中有两个create
方法,该方法的实现以及在SparkEnv
中的调用方式如下,
/**
* systemName: sparkDeiver/sparkExecutor
* bindAddress: Driver端IP地址,或者Executor端的IP地址
* advertiseAddress: Driver端IP地址,或者Executor端的IP地址
* port: Executor端为空,Driver端启动时的端口号
*/
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf, securityManager, clientMode = !isDriver)// 定义def create(name: String,host: String,port: Int,conf: SparkConf,securityManager: SecurityManager,clientMode: Boolean = false): RpcEnv = {create(name, host, host, port, conf, securityManager, 0, clientMode)}def create(name: String,bindAddress: String,advertiseAddress: String,port: Int,conf: SparkConf,securityManager: SecurityManager,numUsableCores: Int,clientMode: Boolean): RpcEnv = {val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,numUsableCores, clientMode)new NettyRpcEnvFactory().create(config)}
该方法执行完成后,会在Driver端和Executor各启动一个RpcEnv环境。接下来看怎么使用这个RpcEnv环境。
四、RpcEnv使用
RpcEnv
生成后,接下来主要在RpcEndpoint
和RpcEndpointRef
中使用。
1、在心跳中的使用
接下来以心跳为例,分析Spark中的RpcEnv通信过程。
(1)Driver端启动HeartbeatReceiver
服务定期接受Executor端请求
在构建SparkContext
对象时,其中有几行关于HeartbeatReceiver
的代码。实际上HeartbeatReceiver
是一个RpcEndpointRef
实现类。
_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
...
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
...
env.rpcEnv.stop(_heartbeatReceiver)
通过主动调用RpcEnv.setupEndpoint
可以将一个RpcEndpoint
对象绑定到该RpcEnv
上。在这里,最终调用的是NettyRpcEnv.setupEndpoint
方法得到一个RpcEndpointRef
对象。
// NettyRpcEnv中override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {dispatcher.registerRpcEndpoint(name, endpoint)}
在Dispatcher
中,生成一个NettyRpcEndpointRef
对象并返回给调用方后,还会将该对象存入一个Map
中,待后面使用,该Map
的key是ndpointData
类型的,该类型有一个name
属性是在生成该RpcEndpoint
时指定的,在心跳这里name = HeartbeatReceiver
。
// Dispatcher中private val endpoints: ConcurrentMap[String, EndpointData] =new ConcurrentHashMap[String, EndpointData]private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]...def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {val addr = RpcEndpointAddress(nettyEnv.address, name)val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)synchronized {if (stopped) {throw new IllegalStateException("RpcEnv has been stopped")}if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")}val data = endpoints.get(name)endpointRefs.put(data.endpoint, data.ref)receivers.offer(data) // for the OnStart message}endpointRef}
在HeartbeatReceiver.onStart
方法中,启动了一个名为"heartbeat-receiver-event-loop-thread"的线程,以参数spark.network.timeoutInterval
设置的时间间隔定期的调用自己的ask
方法处理超时的节点。
(2)Executor端定期汇报心跳
(a)Executor发送心跳信息的完整过程
Executor上启动一个名为“driver-heartbeater”的线程,以参数spark.executor.heartbeatInterval
设置的时间间隔(默认为10s)定期通过Executor.reportHeartBeat
方法向Driver发送心跳Heartbeat
对象。整个过程如下所示,
private[spark] case class Heartbeat(executorId: String,accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updatesblockManagerId: BlockManagerId)/** 向Driver汇报心跳,心跳中包括active状态的task信息 **/private def reportHeartBeat(): Unit = {...val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)try {val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))if (response.reregisterBlockManager) {logInfo("Told to re-register on heartbeat")env.blockManager.reregister()}heartbeatFailures = 0} catch {...}}
上面主要调用了RpcEndpointRef.askWithRetry
方法,将由具体的RpcEndpoint.receiveAndReply
方法接收该请求并作出响应,在心跳这个示例中,是由HeartbeatReceiver.receiveAndReply
方法来处理请求的。
(b)Executor连接到Driver的HeartBeatReceiver
在reportHeartBeat()
方法中有主要用到了一个heartbeatReceiverRef
对象,该对象的生成如下,
private val heartbeatReceiverRef =RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
在RpcUtils.makeDeiverRef
方法中可以看到,最终也是类似于上面Server启动时注册那样,通过rpcEnv.setupEndpointRef
来获取一个RpcEndpointRef
对象。
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {val driverHost: String = conf.get("spark.driver.host", "localhost")val driverPort: Int = conf.getInt("spark.driver.port", 7077)Utils.checkHost(driverHost, "Expected hostname")rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)}
rpcEnv.setupEndpointRef
的调用栈如下,
RpcEnv.setupEndpointRef
--> RpcEnv.setupEndpointRefByURI
----> NettyRpcEnv.asyncSetupEndpointRefByURI
所以,在NettyRpcEnv.asyncSetupEndpointRefByURI
可以找到Executor获取RpcEndpointRef
的过程。
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {val addr = RpcEndpointAddress(uri)val endpointRef = new NettyRpcEndpointRef(conf, addr, this)val verifier = new NettyRpcEndpointRef(conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this)verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(endpointRef.name)).flatMap { find =>if (find) {Future.successful(endpointRef)} else {Future.failed(new RpcEndpointNotFoundException(uri))}}(ThreadUtils.sameThread)}
Client端通过name, host, port三者组合连接到Server起的Rpc服务上。在前面的示例中,这三个参数组成的URI内容为spark://hello-server@localhost:52345
。
2、Executor和Driver通信
整个过程大致是这样的,在Driver端会启动一个CoarseSchedulerBackend.DriverEndpoint
,在Executor端会启动一个CoarseExecutorBackend
,这两者都是RpcEndpoint
的子类。
Driver端的DriverEndpoint
启动好后,就可以由DriverEndpoint.receiveAndReply
方法准备好了处理有关Executor启动、停止等的逻辑,并且由于在Executor启动时发送的信号中获得了Executor的Ref,可以在其他方法中直接调用比如LaunchTask
等动作,Driver通过这种方式向Executor发送各种指令。
Executor端通过org.apache.spark.deploy.yarn
包中的一些类触发了Executor启动命令后,会在本机启动CoarseExecutorBackend
,启动的第一时间就通过CoarseExecutorBackend.onStart
方法向Driver报告,这时候,该Executor的引用就已经被Driver记录了。后面,当接受Driver传递过来的一系列动作时,均由CoarseExecutorBackend.receive
方法进行处理,在这个方法中可以处理的信号类型有,RegisteredExecutor
,RegisterExecutorFailed
,StopExecutor
,LaunchTask
,KillTask
,Shutdown
。这些事件类型从字面意思就可以直接理解。
CoarseSchedulerBackend
和CoarseExecutorBackend
涉及到的事件信号类型都记录在CoarseGrainedClusterMessage
中。
(1)Driver端启动CoarseSchedulerBackend
服务
参考Spark Scheduler模块源码分析之TaskScheduler和SchedulerBackend,在YarnClientSchedulerBackend.start
方法中调用了CoarseSchedulerBackend.start
方法,然后接下来一系列调用栈如下所示,
CoarseSchedulerBackend.start
--> createDriverEndpointRef
----> createDriverEndpoint
------> DriverEndpoint.receiveAndReply
在DriverEndpoint.receiveAndReply
方法中,关于Executor的处理方法有三个,分别是RegisterExecutor
,StopExecutors
,RemoveExecutor
,在这个方法中会注册一个executorRef
,通过该对象向Executor发送信号。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {// 注册Executorcase RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>// 如果是已经启动过的Executor,则向Executor发送由于ID重复导致注册失败的信息if (executorDataMap.contains(executorId)) {executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))context.reply(true)} else {// If the executor's rpc env is not listening for incoming connections, `hostPort`// will be null, and the client connection should be used to contact the executor.val executorAddress = if (executorRef.address != null) {executorRef.address} else {context.senderAddress}logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")addressToExecutorId(executorAddress) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val data = new ExecutorData(executorRef, executorRef.address, hostname,cores, cores, logUrls)// This must be synchronized because variables mutated// in this block are read when requesting executorsCoarseGrainedSchedulerBackend.this.synchronized {executorDataMap.put(executorId, data)if (currentExecutorIdCounter < executorId.toInt) {currentExecutorIdCounter = executorId.toInt}if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}// 向Executor发送注册Executor成功的信息executorRef.send(RegisteredExecutor)// Note: some tests expect the reply to come after we put the executor in the mapcontext.reply(true)// 并且记入Listener中listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))makeOffers()}// Driver停止case StopDriver =>...// Executor全部停止case StopExecutors =>logInfo("Asking each executor to shut down")for ((_, executorData) <- executorDataMap) {executorData.executorEndpoint.send(StopExecutor)}context.reply(true)// 移除Executorcase RemoveExecutor(executorId, reason) =>// We will remove the executor's state and cannot restore it. However, the connection// between the driver and the executor may be still alive so that the executor won't exit// automatically, so try to tell the executor to stop itself. See SPARK-13519.executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))removeExecutor(executorId, reason)context.reply(true)case RetrieveSparkAppConfig =>...}
Driver端服务启动好之后,就可以针对不同的请求事件进行不同的动作了。比如启动Task的LaunchTask
动作。这里注意,待启动的Task并不是在这里随机分配给任意Executor执行的,而是在生成Task描述信息TaskDescription
时,就已经根据一定的策略以及当前Executors的现状分配好了。具体可以参考Spark Scheduler模块源码分析之TaskScheduler和SchedulerBackend
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {val serializedTask = ser.serialize(task)// 首先确保Task相关信息序列化后的大小不超过 spark.rpc.message.maxSize MB,默认为128MB,超过该参数大小的Task无法分配执行if (serializedTask.limit >= maxRpcMessageSize) {scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +"spark.rpc.message.maxSize (%d bytes). Consider increasing " +"spark.rpc.message.maxSize or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {// 获取Executor的引用,在统计信息中减去即将分配的core数val executorData = executorDataMap(task.executorId)executorData.freeCores -= scheduler.CPUS_PER_TASKlogDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")// 向Executor发送LaunchTask事件executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}}
(2)Executor端启动CoarseGrainedExecutorBackend
服务
这是在Executor端启动的一个服务,长时间运行,可以接收和处理Driver端发送的请求。结合前面的Executor启动过程,当通过Yarn将启动CoarseGrainedExecutorBackend
进程发送到其他节点后的具体调用栈如下,
CoarseGrainedExecutorBackend.main
--> CoarseGrainedExecutorBackend.run
在run
方法中,首先会在本地创建一个SparkEnv
,然后在SparkEnv.rpcEnv
上注册一个CoarseGrainedExecutorBackend
服务,这个过程如下所示,
// 创建Executor端SparkEnv
val env = SparkEnv.createExecutorEnv(driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
// 启动后台服务。
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
在CoarseGrainedExecutorBackend.onStart
方法中,有一些启动Executor时就需要运行的逻辑,建立一个Driver端RpcEndpointRef
。
@volatile var driver: Option[RpcEndpointRef] = None
...
override def onStart() {logInfo("Connecting to driver: " + driverUrl)// 通过Driver端的host和port连接到DriverrpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>// This is a very fast action so we can use "ThreadUtils.sameThread"driver = Some(ref)ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))}(ThreadUtils.sameThread).onComplete {// This is a very fast action so we can use "ThreadUtils.sameThread"case Success(msg) =>// Always receive `true`. Just ignore itcase Failure(e) =>exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)}(ThreadUtils.sameThread)}
然后通过该Ref访问CoarseGrainedExecutorBackend.ask
方法发送一个RegisterExecutor
信号用于注册Executor,这里就会由Executor连接到Driver端,告诉Driver这里已经启动好了一个Executor实例,并且把自己的Ref也通过注册信号告诉Driver,这样Driver就可以通过知道Executor的引用发送各种动作指令了。
在CoarseGrainedExecutorBackend.receive
方法中,当接收到Driver端传来的各种请求时,Executor端会有不同的响应。在接收到RegisteredExecutor
对象时,会生成一个Executor
对象。
override def receive: PartialFunction[Any, Unit] = {// 启动Executorcase RegisteredExecutor =>logInfo("Successfully registered with driver")try {executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)} catch {case NonFatal(e) =>exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)}// Executor失败case RegisterExecutorFailed(message) =>...// 启动Taskcase LaunchTask(data) =>...// 杀死Taskcase KillTask(taskId, _, interruptThread) =>...// 停止Executorcase StopExecutor =>...// 停止case Shutdown =>...}
(3)Executor获取Task相关文件
Executor在通过CoarseGrainedExecutorBackend.receive
响应LaunchTask
事件时,将会进入Executor.launch
方法。在这个方法中,得到一个TaskRunner
对象。
def launchTask(context: ExecutorBackend,taskId: Long,attemptNumber: Int,taskName: String,serializedTask: ByteBuffer): Unit = {val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)runningTasks.put(taskId, tr)threadPool.execute(tr)}
TaskRunner
是Executor
的内部类,接下来进入TaskRunner.run
。在这个方法中,首先通过Task.deserializeWithDependencies
反序列化Task信息,获取依赖的File文件和Jar文件,然后调用updateDependencies(taskFiles, taskJars)
方法就可以在Executor端拉取文件。调用栈如下,
Executor.updateDependencies
--> Util.fetchFile
----> Util.doFetchFile
------> Util.downloadFile // 对于所有走NettyRpcEnv的master模式
在Util.downloadFile
方法中,接收到的是一个InputStream
对象,对应一个输出到本地的OutputStream
就可以将该文件下载到本地。
所以,重点看一下Util.doFetchFile
方法中的逻辑。
Util.doFetchFile
--> NettyRpcEnv.openChannel
看看在NettyRpcEnv.openChannel
中如何获取输入文件流,在方法的开头可以看到基本上是通过主机名+端口号+文件路径通过网络从Driver端直接拉取的。
override def openChannel(uri: String): ReadableByteChannel = {val parsedUri = new URI(uri)require(parsedUri.getHost() != null, "Host name must be defined.")require(parsedUri.getPort() > 0, "Port must be defined.")require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")val pipe = Pipe.open()val source = new FileDownloadChannel(pipe.source())try {val client = downloadClient(parsedUri.getHost(), parsedUri.getPort())val callback = new FileDownloadCallback(pipe.sink(), source, client)client.stream(parsedUri.getPath(), callback)} catch {case e: Exception =>pipe.sink().close()source.close()throw e}source}
(4)Driver端通过RpcEnv发送Jar包和文件
在任务提交时,Jar文件通过参数spark.jars
设置,如果是On Yarn模式还可以通过spark.yarn.dist.jars
参数设置。File文件通过spark.files
参数设置。
SparkContext
启动好SparkEnv
后(该对象中包含前面生成的RpcEnv
对象)后的代码如下,下面代码在SparkContext
中。将jar或者文件添加到RpcEnv.fileServer
中。
val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
_conf.set("spark.repl.class.uri", replUri)def addJar(path: String) {...key = env.rpcEnv.fileServer.addJar(new File(path))...
}
...
def addFile(path: String, recursive:Boolean): Unit = {...env.rpcEnv.fileServer.addFile(new File(uri.getPath))...
}
接下来只需要看两点:如何将文件信息传递给executor,RpcEnv.fileServer
是什么
a) 如何将文件信息传递给Executor
上一步是通过反序列化Task信息,获取该Task需要的File文件和Jar文件,那么Task所需要的文件就是在序列化的时候就已经注册好了的。这段逻辑在Task.serializeWithDependencies
中,调用栈如下,
// 参考 https://blog.csdn.net/dabokele/article/details/51932102#t16
TaskSetManager.resourceOffer
--> Task.serializeWithDependencies
看一下Task.serializeWithDependencies
的调用过程,Files和Jars都是在SparkContext
中准备的。SparkContext
调用addJar
和adFile
方法后,会将jar信息和file信息记入addFiles
和addJars
对象中,这两个对象都是Map类型。key是RpcEnv.fileServer
中添加的文件路径,对于走Netty的,是以“spark://host:port/files/…”格式的一个文件路径。
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
b) RpcEnv.fileServer
是什么
在RpcEnv
中有一个fileServer
属性,该属性是RpcEnvFileServer
类型。在实际使用的NettyRpcEnv
中的fileServer
属性是NettyStreamManager
类型的。所以,Driver端通过SparkContext.addJar
和SparkContext.addFile
方法都间接的调用了NettyStreamManager.addJar
和NettyStreamManager.addFile
方法。可以看一下NettyStreamManager.addJar
的逻辑,
override def addFile(file: File): String = {val existingPath = files.putIfAbsent(file.getName, file)require(existingPath == null || existingPath == file,s"File ${file.getName} was already registered with a different path " +s"(old path = $existingPath, new path = $file")s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"}override def addJar(file: File): String = {val existingPath = jars.putIfAbsent(file.getName, file)require(existingPath == null || existingPath == file,s"File ${file.getName} was already registered with a different path " +s"(old path = $existingPath, new path = $file")s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"}override def addDirectory(baseUri: String, path: File): String = {val fixedBaseUri = validateDirectoryUri(baseUri)require(dirs.putIfAbsent(fixedBaseUri.stripPrefix("/"), path) == null,s"URI '$fixedBaseUri' already registered.")s"${rpcEnv.address.toSparkURL}$fixedBaseUri"}
其中rpcEnv.address
的逻辑如下,这里得到的是一个RpcAddress
对象,
override lazy val address: RpcAddress = {if (server != null) RpcAddress(host, server.getPort()) else null}
从这里可以看到,在NettyRpcEnv
中有一个NettyStreamManager
对象,该对象是RpcEnvFileServer
的子类。即在RpcEnv
中有一个RpcEnvFileServer
服务在运行,供Executor节点通过host+port+path的方式拉取文件。
Spark运行环境之SparkEnv和通信工具RpcEnv相关推荐
- Ubuntu搭建Spark运行环境
前言 因为之前研究的方向是分布式系统,重点放在了Hadoop分布式文件系统上.现如今,社会对机器学习的需求势如破竹.为了调整研究方向,而且不抛弃原本的研究成果,研究反向便从分布式系统转为分布式机器学习 ...
- 大数据容器化-基于Kubernetes(k8s)构建spark运行环境
Apache Spark 在大数据处理与分析领域,Apache Spark无疑占据着重要地位.它的特点是基于内存计算,支持各类资源管理平台,其中以YARN最为常见,同时又与Hadoop平台集成,在集群 ...
- 【CentOS】Spark 运行环境(Local、Standalone)
Spark 作为一个数据处理框架和计算引擎,被设计在所有常见的集群环境中运行, 在国内工作中主流的环境为 Yarn,不过逐渐容器式环境也慢慢流行起来.接下来,我们就分别看看不同环境下 Spark 的运 ...
- 三, Spark 四种运行环境配置总结
三, Spark 运行环境 Spark 的运行模式有 Local(也称单节点模式),Standalone(集群模式),Spark on Yarn(运行在Yarn上),Mesos以及K8s, Windo ...
- 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】
视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...
- spark开发环境搭建及部署
spark开发环境搭建 1.下载开发工具luna eclipse 或者 Intellij IDEA(官网下载的 scala for eclipse如果不能用可以使用 luna) 2.安装jdk1.7配 ...
- Spark2.1.0——运行环境准备
学习一个工具的最好途径,就是使用它.这就好比<极品飞车>玩得好的同学,未必真的会开车,要学习车的驾驶技能,就必须用手触摸方向盘.用脚感受刹车与油门的力道.在IT领域,在深入了解一个系统的原 ...
- 实现微信小程序编译和运行环境系列 (进阶篇)
动手实现微信小程序和小游戏编译打包和运行环境平台 (进阶篇) # 前言 距离上一篇初始篇过了一段时间,在初始篇里面主要分享了微信小程序工具 和微信小程序的文件组成,以及小程序架构的基本大纲和描述,看了 ...
- Puppet:维护运行环境一致性的利器
作者:焦振清 配置管理工具的定位 每次我提到配置管理工具,有些同学就会问类似的问题:容器化时代和Serverless时代,还需要配置管理工具吗?我们先不去讨论容器化之后是否需要配置管理工具,那什么时候 ...
最新文章
- 挖坑挖到cnblogs.com来...
- 网站锁定php文件命令,PHP文件锁定读写的一点注意_php
- 「神策 2021 数据驱动大会」主会场回顾
- Mysql UTF8 varchar与Oracle ZHS16GBK varchar2同长度下存汉字的差异
- 老spring3.2版本 redistemplate 报错_植物大战僵尸加强版本
- # android开发:4-1、Activity启动方式、生命周期、不同activity的数据传递
- TCP/IP系列——长连接与短连接的区别
- NOIP2016D2T2 蚯蚓
- 修改php.ini以达到 屏蔽错误信息
- dsoframer java_基于DsoFramer控件的Office编辑控件
- STM32CubeProgrammer STM32CubeIDE下载算法 外部存储QSPI Flash
- 制作U盘启动盘 优启通
- 百度云推送push的使用
- 解决IE兼容H5的问题
- 一个毕业设计 儿童趣味数学 app
- Teemo Attacking
- 旁门左道:让移动游戏在APP Store下载量暴涨的邪门功夫
- google使用方法及技巧
- FlexRay网络管理与测试
- 17年1月9日,小程序来了。深度解析2017微信公开课