本期内容:

  • 解密Spark Streaming Job架构和运行机制

  • 解密Spark Streaming容错架构和运行机制

理解SparkStreaming的Job的整个架构和运行机制对于精通SparkStreaming是至关重要的。我们知道对于一般的Spark应用程序来说,是RDD的action操作触发了Job的运行。那对于SparkStreaming来说,Job是怎么样运行的呢?我们在编写SparkStreaming程序的时候,设置了BatchDuration,Job每隔BatchDuration时间会自动触发,这个功能肯定是SparkStreaming框架提供了一个定时器,时间一到就将编写的程序提交给Spark,并以Spark job的方式运行。

这里面涉及到两个Job的概念:

  1. 每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行),为什么使用线程池呢?

    a),作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;

    b),有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持;

  2. 上面Job提交的Spark Job本身。单从这个时刻来看,此次的Job和Spark core中的Job没有任何的区别。

下面我们看看job运行的过程:

1.首先实例化SparkConf,设置运行期参数。

val conf = new SparkConf().setAppName("UpdateStateByKeyDemo")

2.实例化StreamingContext,设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口。

val ssc = new StreamingContext(conf,Seconds(20))

3.在实例化StreamingContext的过程中,实例化JobScheduler和JobGenerator 。

StreamingContext.scala的第183行

private[streaming] val scheduler = new JobScheduler(this)

JobScheduler.scala的第50行

private val jobGenerator = new JobGenerator(this)

4.StreamingContext调用start方法。

def start(): Unit = synchronized {state match {case INITIALIZED =>startSite.set(DStream.getCreationSite())StreamingContext.ACTIVATION_LOCK.synchronized {StreamingContext.assertNoOtherContextIsActive()try {validate()// Start the streaming scheduler in a new thread, so that thread local properties// like call sites and job groups can be reset without affecting those of the// current thread.ThreadUtils.runInNewThread("streaming-start") {sparkContext.setCallSite(startSite.get)sparkContext.clearJobGroup()sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")scheduler.start()}state = StreamingContextState.ACTIVE} catch {case NonFatal(e) =>logError("Error starting the context, marking it as stopped", e)scheduler.stop(false)state = StreamingContextState.STOPPEDthrow e}StreamingContext.setActiveContext(this)}shutdownHookRef = ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)// Registering Streaming Metrics at the start of the StreamingContextassert(env.metricsSystem != null)env.metricsSystem.registerSource(streamingSource)uiTab.foreach(_.attach())logInfo("StreamingContext started")case ACTIVE =>logWarning("StreamingContext has already been started")case STOPPED =>throw new IllegalStateException("StreamingContext has already been stopped")}
}

5.在StreamingContext.start()内部启动JobScheduler的Start方法。

scheduler.start()

在JobScheduler.start()内部实例化EventLoop,并执行EventLoop.start()进行消息循环。

在JobScheduler.start()内部构造ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:

def start(): Unit = synchronized {if (eventLoop != null) return // scheduler has already been startedlogDebug("Starting JobScheduler")eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)}eventLoop.start()// attach rate controllers of input streams to receive batch completion updatesfor {inputDStream <- ssc.graph.getInputStreamsrateController <- inputDStream.rateController} ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker = new ReceiverTracker(ssc)inputInfoTracker = new InputInfoTracker(ssc)receiverTracker.start()jobGenerator.start()logInfo("Started JobScheduler")
}

6.JobGenerator启动后会不断的根据batchDuration生成一个个的Job

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {// Set the SparkEnv in this thread, so that job generation code can access the environment// Example: BlockRDDs are created in this thread, and it needs to access BlockManager// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.SparkEnv.set(ssc.env)Try {jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batchgraph.generateJobs(time) // generate jobs using allocated block} match {case Success(jobs) =>val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))case Failure(e) =>jobScheduler.reportError("Error generating jobs for time " + time, e)}eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

7.ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过ReceivedBlockTracker来管理接受到的元数据信息。

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {if (isTrackerStarted) {throw new SparkException("ReceiverTracker already started")}if (!receiverInputStreams.isEmpty) {endpoint = ssc.env.rpcEnv.setupEndpoint("ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))if (!skipReceiverLaunch) launchReceivers()logInfo("ReceiverTracker started")trackerState = Started}
}

二. Spark Streaming容错机制:

 我们知道DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操作就是在固定时间上操作RDD。所以从某种意义上而言,Spark Streaming的基于DStream的容错机制,实际上就是划分到每一次形成的RDD的容错机制,这也是Spark Streaming的高明之处。

Spark Streaming的容错要考虑两个方面:

  1. Driver运行失败时的恢复

    使用Checkpoint,记录Driver运行时的状态,失败后可以读取Checkpoint并恢复Driver状态。

  2. 具体的每次Job运行失败时的恢复

    要考虑到Receiver的失败恢复,也要考虑到RDD计算失败的恢复。Receiver可以采用写wal日志的方式。RDD的容错是spark core天生提供的,基于RDD的特性,它的容错机制主要就是两种:

  01. 基于checkpoint;

在stage之间,是宽依赖,产生了shuffle操作,lineage链条过于复杂和冗长,这时候就需要做checkpoint。

  02. 基于lineage(血统)的容错:

  一般而言,spark选择血统容错,因为对于大规模的数据集,做检查点的成本很高。考虑到RDD的依赖关系,每个stage内部都是窄依赖,此时一般基于lineage容错,方便高效。

  总结: stage内部做lineage,stage之间做checkpoint。

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

转载于:https://blog.51cto.com/lqding/1769936

第3课:SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错...相关推荐

  1. 通过案例对 spark streaming 透彻理解三板斧之三:spark streaming运行机制与架构

    本期内容: 1. Spark Streaming Job架构与运行机制 2. Spark Streaming 容错架构与运行机制 事实上时间是不存在的,是由人的感官系统感觉时间的存在而已,是一种虚幻的 ...

  2. 第43课: Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等

    第43课: Spark 1.6 RPC内幕解密:运行机制.源码详解.Netty与Akka等 Spark 1.6推出了以RpcEnv.RPCEndpoint.RPCEndpointRef为核心的新型架构 ...

  3. 通过案例对SparkStreaming透彻理解-3

    2019独角兽企业重金招聘Python工程师标准>>> 本期内容: 解密Spark Streaming Job架构和运行机制 解密Spark Streaming 容错架构和运行机制 ...

  4. 通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验

    本期内容 : spark streaming另类在线实验 瞬间理解spark streaming本质 一.  我们最开始将从Spark Streaming入手 为何从Spark Streaming切入 ...

  5. 理解JAVA与C的运行机制

    1.java的运行机制 java的编译过程,将java的源程序(扩展名为.java的文件),由java编译程序将java的字节码文件(.class文件)在jvm上运行,机器码有cpu运行, jvm编译 ...

  6. 万字长文,带你彻底理解EF Core 5的运行机制,让你成为团队中的EF Core专家

    目录 1.将EF的ToTraceString移植为EF Core的ToQueryString 2.从EF Core记录详细信息 2.1. 简单的日志记录 2.2.响应EF Core 事件 2.3.使用 ...

  7. 深入理解GPU硬件架构及运行机制

    目录 一.导言 1.1 为何要了解GPU? 1.2 内容要点 1.3 带着问题阅读 二.GPU概述 2.1 GPU是什么? 2.2 GPU历史 2.2.1 NV GPU发展史 2.2.2 NV GPU ...

  8. 透彻理解位图与矢量图的本质区别(小包子觉得讲的很清晰)

    其实每个人都能轻松而透彻地理解位图与矢量图的本质区别 位图与矢量图的区别(为什么要再进行解释)      播放录像时按空格键暂停/继续播放 (关于位图与矢量图的区别,各种教材和网上解释的有很多,但是本 ...

  9. 透彻理解Spring事务设计思想之手写实现

    2019独角兽企业重金招聘Python工程师标准>>> 前言 事务,是描述一组操作的抽象,比如对数据库的一组操作,要么全部成功,要么全部失败.事务具有4个特性:Atomicity(原 ...

最新文章

  1. spring-boot学习资料
  2. 活动目录(Active Directory)安装
  3. [译]Selenium Python文档:一、安装
  4. 数据结构与算法(十二):八大经典排序算法再回顾
  5. Web架构师成长之路
  6. Apache RocketMQ部署文档
  7. 4--RESTful应用程序
  8. Win 2012 OS 安装.Net Framework 3.5
  9. 数据库练习(1)——建立数据库
  10. 你所不知道的 AI 进展
  11. 宇枫资本理财中要避免这些
  12. 马哥教育的python课程到底好不好_马哥教育的网络授课怎么样?
  13. android打开dialog黑色背景
  14. 计算机游戏化教学案例,[幼儿园课程游戏化教学实践]幼儿园课程游戏化案例
  15. 使用腾讯云服务器搭建个人网盘
  16. React 问题总结
  17. 次时代终端工具:WindTerm(含下载)
  18. Arduino 开发ESP8266(ESP12F)模块
  19. 近红外光谱特征选择、特征提取区别及稀疏表示
  20. app自动化测试之Appium问题分析及定位

热门文章

  1. java+yeild+sleep_Java并发编程--yield sleep和wait的区别
  2. iphone相册怎么加密_iphone相册加密码锁,保护隐私
  3. Java项目:健身俱乐部管理系统(java+SSM+Mysql+Jsp)
  4. python3安装setuptools步骤_setuptools、pip的安装
  5. 【java】快速复制数组方法arraycopy的使用
  6. Java中的拆箱与装箱
  7. wowpve服务器优势,PVE服务器法师对本职业的一点看法
  8. Vue父组件网络请求回数据后再给子组件传值demo示例
  9. iOS UICollectionView实现瀑布流(3)
  10. [二]Java虚拟机 jvm内存结构 运行时数据内存 class文件与jvm内存结构的映射 jvm数据类型 虚拟机栈 方法区 堆 含义...