spark eventLoop模型
Sprak中,线程之前的交互采用eventLoop模型。
当JobGenerate中的clock达到触发新一次job生成的时间后,并不会直接驱动graph去生成job,而是通过往eventLoop中发送一个JobGenerate事件以触发job生成事件的产生。
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()private val stopped = new AtomicBoolean(false)// Exposed for testing.
private[spark] val eventThread = new Thread(name) {setDaemon(true)override def run(): Unit = {try {while (!stopped.get) {val event = eventQueue.take()try {onReceive(event)} catch {case NonFatal(e) =>try {onError(e)} catch {case NonFatal(e) => logError("Unexpected error in " + name, e)}}}} catch {case ie: InterruptedException => // exit even if eventQueue is not emptycase NonFatal(e) => logError("Unexpected error in " + name, e)}}}
eventLoop维护了一个队列用来存放事件,由于队列的先进先出特性,导致可以按照时间顺序对相关事件进行处理,一个eventLoop也只维护了一个eventThread,将会不断循环往上文所述的队列尝试拉取事件,通过onReceive()方法,这里如果onReceive()方法的事件处理为同步处理,如果阻塞将会导致下一个事件的处理延时。
eventLoop除了在JobGenerate中用来触发job的生成外,还在JobScheduler中用来向消息总线汇报一个任务的启动与完成。
在JobScheduler中,所有job在准备启动前,都会被封装成一个JobHandler,在这个JobHandler的run()方法中,实现了job启动的生命周期行为,并在这个方法中每个job的行为都会通过eventLoop向消息总线报告其行为。
var _eventLoop = eventLoop
if (_eventLoop != null) {_eventLoop.post(JobStarted(job, clock.getTimeMillis()))// Disable checks for existing output directories in jobs launched by the streaming// scheduler, since we may need to write output to an existing directory during checkpoint// recovery; see SPARK-4835 for more details.SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {job.run()}_eventLoop = eventLoopif (_eventLoop != null) {_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))}
类比job,task的生命周期与消息总线的汇报也是通过eventLoop中的,由DAGScheduler实现。DAGScheduler中,job转换为stage这一最重要的步骤,也是通过eventLoop来投递JobSubmitted事件进行通知传递的。
spark eventLoop模型相关推荐
- Spark编程模型几大要素
不多说,直接上干货! Spark编程模型几大要素 Driver Program 输入-Transformation-Action 缓存 共享变量 转载于:https://www.cnblogs.com ...
- Spark入门实战系列--3.Spark编程模型(上)--编程模型及SparkShell实战
[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark编程模型 1.1 术语定义 l应用程序(Application): 基于Spa ...
- Spark 内存模型
文章目录 环境参数 Executor 内存划分 堆内内存(On-Heap Memory) 堆外内存(Off-Heap Memory) 动态调节机制 Task 能申请到的内存 新博客地址 环境参数 sp ...
- Spark编程模型(之莎士比亚文集词频统计实现)
Spark编程模型之莎士比亚文集词频统计 前段时间因为学校的云计算比赛我无意间接触到了Spark云计算框架,从此对其一发不可收拾,无论从其执行效率还有他的其他方面的架构都感觉到无比强大,作为一个云计算 ...
- Spark详解(三):Spark编程模型(RDD概述)
1. RDD概述 RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现.RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行 ...
- Spark 编程模型(上)
Spark的编程模型 核心概念(注意对比MR里的概念来学习) Spark Application的组成 Spark Application基本概念 Spark Application编程模型 回顾sc ...
- [Kafka与Spark集成系列三] Spark编程模型
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...
- 第14课:Spark 分布式模型训练及调优(实战)
上节课已经为大家介绍了 Apache Spark 项目的基本情况,以及分布式深度神经网络的解决方案.这节课我们将给出一个 Deeplearning4j+Spark 的建模实例,包括从配置 Maven ...
- 《循序渐进学Spark 》Spark 编程模型
本节书摘来自华章出版社<循序渐进学Spark >一书中的第1章,第3节,作者 小象学院 杨 磊,更多章节内容可以访问云栖社区"华章计算机"公众号查看. Spark机制原 ...
最新文章
- mysqlselectdb php_PHP MySQL Select(数据库查询)
- 这几个 Python 的小技巧,你会么?
- 全球最厉害的14位程序员!
- java明星养成游戏_#IT明星不是梦#Java14不得不知的5个新功能
- Zuul:Cookie和动态路由
- Android的intent之间复杂参数的传递
- 博客园博客PDF生成器
- win7个人计算机的ip地址,win7计算机ip地址查询_win7本机ip地址查询
- 手机里tencent文件夹能删吗_手机内存卡里的tencent这个文件夹可以删吗?
- 2011年国庆老家记录
- 网页设计html5实训心得,网页设计实习心得
- 港科报道 | 8位校友入选香港25青年科创先锋人物
- qomo linux最新版本,Qomo Linux下一个版本将推驱动中心
- MyCms 活码二维码(动态二维码)源码版介绍
- 基于LabVIEW的WIFI通信人机交互界面设计
- sun oracle服务器登管理口,Sun Solaris系统管理命令对照简介
- DFS(深度优先遍历)
- 【Linux】系统管理命令
- 傲慢与偏见之 - 谷歌中国逆袭史
- MES系统质量管理怎么做?
热门文章
- Python学习笔记之if语句(一)
- Android之使用SoundPool播放一小段音频,实现猜歌的功能
- Linux发行版CentOS下Docker的安装和卸载
- Javascript——声明提升(函数、变量提升)
- 什么是JDK,什么是JRE?JDK的安装和环境变量的配置
- html中a标签根据一个参数跳转不同的页面
- testlink匹配mysql8_如何安装Testlink
- python视频人脸检测_Python基于OpenCV实现视频的人脸检测
- android注册广播两种方式,Android 注册广播的两种方式对比
- python中字典和集合的使用