目录

1、概述

2、LaunchDriver

3、LaunchDriver

4、总结


1、概述

worker肯定是实现RPC通信的,否则别人没法给你发消息。他继承的是ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint是线程安全的,意味着处理一条消息完成后再处理下一个消息。换句话说,在处理下一条消息时,可以看到对ThreadSafeRpcEndpoint的内部字段的更改,并且ThreadSafeRpcEndpoint中的字段不需要是volatile或等效的。但是,不能保证同一个线程将为不同的消息执行相同的ThreadSafeRpcEndpoint。即顺序处理消息,不能同时并发处理。

Worker本身在实际运行的时候是作为一个进程,他会接收master的指令,有几个非常重要的指令,如LaunchExecutor,LaunchDriver等。这两个指令是Schedule进行资源调度(Master的schedule方法中)的时候发送的。

2、LaunchDriver

Worker在收到Driver发送的LaunchDriver类型的信息后。

(1)首先首先打印一个日志,master传进来的时候肯定会告诉你driverId的,然后new 一个DriverRunner,然后把这个实例通过driverId交给一个HashMap数据结构val drivers = new HashMap[String, DriverRunner]。key就是driverId,value就是DriverRunner。这个数据结构非常重要,因为我们在worker上可能启动很多Executor,需要根据ExecutorId管理具体的DriverRunner,DriverRunner内部通过线程的方式启动了另外一个进程,所以可以简单理解DriverRunner是Driver所在进程中本身的process,这个就是一个代理模式。

(2)管理Driver的执行,包括在Driver失败的时候自动重启,要是在Standalone的模式下。失败是否重试是看DriverDescription中的supervise是否为true,如果指定了这个参数为true,driver在失败的时候worker会负责启动这个Driver

(3)构建好DriverRunner实例,并且已经将其加入到drivers中后,调用DriverRunner的start方法。在start方法中通过一个线程启动Driver,并管理Driver,线程运行的时候run方法会被执行。

在run方法中有个prepareAndRunDriver用于准备Driver需要的jar并运行Driver

(4)在prepareAndRunDriver方法中,会创建工作目录,下载jar包到本地,并封装好Driver的启动Command,通过buildProcessBuilder来启动Driver。

driverDesc.command这个指定他启动的时候应该运行什么类,就是类的入口。driverDesc是Master远程发送过来的,为CoarseGrainedExecutorBackend

进入到runDriver方法中,有个initialize方法,里面重定向输出和error,将stout和stderr重定向到baseDir下,这样就可以通过log看一下曾经执行的情况。

然后执行runCommandWithRetry,在参数中会构造ProcessBuilderLike。ProcessBuilderLike在apply的时候就new ProcessBuilderLike,在这里面processBuilder.start()

在runCommandWithRetry方法中,会一直循环,孵化出一个进程,有个方法这个是阻塞的,言外之意就是当他回复过来的时候估计就有问题了,那就判断一下

(5)在prepareAndRunDriver方法中启动Driver之后,如果运行出状况了,出状况后会给自己发一个消息

(6)在这里不同的情况打印log日志,最关键的是sendToMaster(driverStateChanged)发送给master。发送的类型是DriverStateChanged

(7)到Master方法中。master收到这个消息后就把他remove掉,是从自己的内存数据结构中remove,同时把这个driver曾经占用的数据,还有持久化的都remove,然后再次发消息给worker确认下,因为发生了资源的变动再次进行schedule

(8)回到Worker中,start之后记录耗了多少内存CPU

3、LaunchDriver

Worker在收到Driver发送的LaunchExecutor类型的信息后首先new 一个ExecutorRunner,然后start。过程与启动Driver类似,就不细说。

在ExecutorRunner的start方法中会通过一个线程启动Executor,并管理Executor,线程运行的时候run方法会被执行。

fetchAndRunExecutor方法中类似driver中创建该executor工作目录,下载运行的jar,开启执行application的进程executor。并向worker发送ExecutorStateChanged的事件通知,

worker先向自己发送ExecutorStateChanged的消息

在start方法之后,记录耗了多少内存CPU,然后向master发送接收到的ExecutorStateChanged的事件通知

4、总结

driver进程就是executor进程;ExecutorBackend是进程名称,standalone模式下是CoarseGrainedExecutorBackend。在CoarseGrainedExecutorBackend中有我们的Executor对象本身,Executor和ExecutorBackend是一对一的关系,就是一个ExecutorBackend进程里面有一个Executor,在Executor内部是通过线程池并发处理的方式来处理Spark提交过来的Task。

注意:Executor启动之后要向我们的driver注册。

Spark Worker源码相关推荐

  1. spark最新源码下载并导入到开发环境下助推高质量代码(Scala IDEA for Eclipse和IntelliJ IDEA皆适用)(以spark2.2.0源码包为例)(图文详解)...

    不多说,直接上干货! 前言   其实啊,无论你是初学者还是具备了有一定spark编程经验,都需要对spark源码足够重视起来. 本人,肺腑之己见,想要成为大数据的大牛和顶尖专家,多结合源码和操练编程. ...

  2. spark的源码编译

    编译spark源码并导入到IDEA中 一. 目的 1. 根据需要自定义编译spark相应的模块 2. 修改spark源码并重新编译spark 二. 环境需求 1. 操作系统为CentOS6.x 64b ...

  3. spark.mllib源码阅读:GradientBoostedTrees

    Gradient-Boosted Trees(GBT或者GBDT) 和 RandomForests 都属于集成学习的范畴,相比于单个模型有限的表达能力,组合多个base model后表达能力更加丰富. ...

  4. SparkSQL 之 Shuffle Join 内核原理及应用深度剖析-Spark商业源码实战

    本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客.版权声明:禁止转载,欢迎学习.QQ邮箱 ...

  5. spark word2vec 源码详细解析

    spark word2vec 源码详细解析 简单介绍spark word2vec skip-gram 层次softmax版本的源码解析 word2vec 的原理 只需要看层次哈弗曼树skip-gram ...

  6. 第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

    第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密 /* 王家林老师授课http://weibo.com/ilovepai ...

  7. Spark Shuffle源码分析系列之PartitionedPairBufferPartitionedAppendOnlyMap

    概述 SortShuffleWriter使用ExternalSorter进行ShuffleMapTask数据内存以及落盘操作,ExternalSorter中使用内存进行数据的缓存过程中根据是否需要ma ...

  8. 3supervisor启动worker源码分析-worker.clj

    supervisor启动worker源码分析-worker.clj supervisor通过调用sync-processes函数来启动worker,关于sync-processes函数的详细分析请参见 ...

  9. 线程池解析(三)——Worker源码解析

    相关文章 线程池解析(一)--概念总结 线程池解析(二)--execute.addWorker源码解析 线程池解析(三)--Worker源码解析 线程池解析(四)--submit源码解析(Runnab ...

最新文章

  1. Joiner的简单了解
  2. 皮一皮:论北方有多冷...
  3. rgb颜色判断语句_首款RGB智能指纹挂锁,既能报警还能减压,简直无锁不能
  4. GraphQL是什么“渣渣“?它想干掉RESTful
  5. c# datetime._C#| 带示例的DateTime.DayOfWeek属性
  6. java怎么让1的数据2可以拥有,【如何让代码变“高级”(二)】-这样操作值得一波666(Java Stream)(这么有趣)...
  7. python基础七--集合
  8. 【手写数字识别】基于matlab GUI RBM神经网络手写数字识别【含Matlab源码 1109期】
  9. 服务器装系统后安装驱动失败,windows10系统下驱动人生安装驱动失败如何解决
  10. 苹果id登陆不上去怎么回事_创建苹果id是出现请联络iTunes支持人员完成交易是怎么回事?...
  11. cell flash cache 的使用
  12. 双系统时间不一致问题
  13. Blogspot.com再次解封
  14. 天然“降脂药”,帮你“吃掉”血脂!
  15. BUPT-CSAPP 期末复习书后参考题节选及评注
  16. 开卷有益:架构整洁之道
  17. 高等学校数字校园建设解决方案
  18. c语言佮保留俩位小数,阅读语言论文,关于性阅读障碍儿童词汇识别中字形信息作用相关参考文献资料-免费论文范文...
  19. Spring Boot 之 spring.factories的用法
  20. python期末考试重点_python语言基础与应用期末考试OJ

热门文章

  1. 学习《深度学习入门:基于Python的理论与实现》高清中文版PDF+源代码
  2. PHP页面显示中文字符出现乱码
  3. SQLlite 分页
  4. 浅析Java内存模型
  5. 阿里巴巴后台的使用体验
  6. tcp窗口滑动以及拥塞控制
  7. 从单片机转到嵌入式Linux的跨度大吗?
  8. Linux kernel同步机制
  9. 本地html页面传递表单值,js实现两个页面表单传值并接收
  10. mysql配置环境变量(win 10)_mysql配置环境变量(win 10)