2019独角兽企业重金招聘Python工程师标准>>>

上文已经从源码分析了Receiver接收的数据交由BlockManager管理,整个数据接收流都已经运转起来了,那么让我们回到分析JobScheduler的博客中。

// JobScheduler.scala line 62def start(): Unit = synchronized {if (eventLoop != null) return // scheduler has already been startedlogDebug("Starting JobScheduler")eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)}eventLoop.start()// attach rate controllers of input streams to receive batch completion updatesfor {inputDStream <- ssc.graph.getInputStreamsrateController <- inputDStream.rateController} ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker = new ReceiverTracker(ssc)inputInfoTracker = new InputInfoTracker(ssc)receiverTracker.start()jobGenerator.start()logInfo("Started JobScheduler")}

前面好几篇博客都是 由 receiverTracker.start() 延展开。延展完毕后,继续下一步。

// JobScheduler.scala line 83
jobGenerator.start()

jobGenerator的实例化过程,前面已经分析过。深入下源码了解到。

  1. 实例化eventLoop,此处的eventLoop与JobScheduler中的eventLoop不一样,对应的是不同的泛型。
  2. EventLoop.start
  3. 首次启动,startFirstTime
  // JobGenerator.scala line 78/** Start generation of jobs */def start(): Unit = synchronized {if (eventLoop != null) return // generator has already been started// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.// See SPARK-10125checkpointWritereventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = {jobScheduler.reportError("Error in job generator", e)}}eventLoop.start()if (ssc.isCheckpointPresent) {restart()} else {startFirstTime()}}
// JobGenerator.scala line 189/** Starts the generator for the first time */private def startFirstTime() {val startTime = new Time(timer.getStartTime())graph.start(startTime - graph.batchDuration)timer.start(startTime.milliseconds)logInfo("Started JobGenerator at " + startTime)}

将DStreamGraph.start

  1. 将所有的outputStreams都initialize,初始化首次执行时间,依赖的DStream一并设置。
  2. 如果设置了duration,将所有的outputStreams都remember,依赖的DStream一并设置
  3. 启动前验证,主要是验证chechpoint设置是否冲突以及各种Duration
  4. 将所有的inputStreams启动;读者扫描了下目前版本1.6.0InputDStraem及其所有的子类。start方法啥都没做。结合之前的博客,inputStreams都已经交由ReceiverTracker管理了。
// DStreamGraph.scala line 39def start(time: Time) {this.synchronized {require(zeroTime == null, "DStream graph computation already started")zeroTime = timestartTime = timeoutputStreams.foreach(_.initialize(zeroTime))outputStreams.foreach(_.remember(rememberDuration))outputStreams.foreach(_.validateAtStart)inputStreams.par.foreach(_.start())}}

至此,只是做了一些简单的初始化,并没有让数据处理起来。

再回到JobGenerator。此时,将循环定时器启动,

// JobGenerator.scala line 193timer.start(startTime.milliseconds)

循环定时器启动;读者是不是很熟悉,是不是在哪见过这个循环定时器?

没错,就是BlockGenerator.scala line 105 、109 ,两个线程,其中一个是循环定时器,定时将数据放入待push队列中。

// RecurringTimer.scala line 59def start(startTime: Long): Long = synchronized {nextTime = startTimethread.start()logInfo("Started timer for " + name + " at time " + nextTime)nextTime}

具体的逻辑是在构造是传入的方法:longTime => eventLoop.post(GenerateJobs(new Time(longTime)));

输入是Long,

方法体是eventLoop.post(GenerateJobs(new Time(longTime)))

// JobGenerator.scala line 58private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

只要线程状态不是stopped,一直循环。

  1. 初始化的时候将上面的方法传进来,  callback: (Long) => Unit 对应的就是  longTime => eventLoop.post(GenerateJobs(new Time(longTime)))
  2. start的时候 thread.run启动,里面的loop方法被执行。
  3. loop中调用的是 triggerActionForNextInterval。
  4. triggerActionForNextInterval调用构造传入的callback,也就是上面的 longTime => eventLoop.post(GenerateJobs(new Time(longTime)))
private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)extends Logging {
// RecurringTimer.scala line 27private val thread = new Thread("RecurringTimer - " + name) {setDaemon(true)override def run() { loop }}
// RecurringTimer.scala line 56/*** Start at the given start time.*/def start(startTime: Long): Long = synchronized {nextTime = startTimethread.start()logInfo("Started timer for " + name + " at time " + nextTime)nextTime}
// RecurringTimer.scala line 92private def triggerActionForNextInterval(): Unit = {clock.waitTillTime(nextTime)callback(nextTime)prevTime = nextTimenextTime += periodlogDebug("Callback for " + name + " called at time " + prevTime)}// RecurringTimer.scala line 100/*** Repeatedly call the callback every interval.*/private def loop() {try {while (!stopped) {triggerActionForNextInterval()}triggerActionForNextInterval()} catch {case e: InterruptedException =>}}
// ...一些代码
}

定时发送GenerateJobs 类型的事件消息,eventLoop.post中将事件消息加入到eventQueue中

// EventLoop.scala line 102def post(event: E): Unit = {eventQueue.put(event)}

同时,此EventLoop中的另一个成员变量 eventThread。会一直从队列中取事件消息,将此事件作为参数调用onReceive。而此onReceive在实例化时被override了。

// JobGenerator.scala line 86eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = {jobScheduler.reportError("Error in job generator", e)}}eventLoop.start()

onReceive调用的是

// JobGenerator.scala line 177/** Processes all events */private def processEvent(event: JobGeneratorEvent) {logDebug("Got event " + event)event match {case GenerateJobs(time) => generateJobs(time)// 其他case class}}

GenerateJobs case class 是匹配到 generateJobs(time:Time) 来处理

  1. 获取当前时间批次ReceiverTracker收集到的所有的Blocks,若开启WAL会执行WAL
  2. DStreamGraph生产任务
  3. 提交任务
  4. 若设置checkpoint,则checkpoint
// JobGenerator.scala line 240/** Generate jobs and perform checkpoint for the given `time`.  */private def generateJobs(time: Time) {// Set the SparkEnv in this thread, so that job generation code can access the environment// Example: BlockRDDs are created in this thread, and it needs to access BlockManager// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.SparkEnv.set(ssc.env)Try {jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batchgraph.generateJobs(time) // generate jobs using allocated block} match {case Success(jobs) =>val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))case Failure(e) =>jobScheduler.reportError("Error generating jobs for time " + time, e)}eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))}

上述代码不是特别容易理解。细细拆分:咋一看以为是try{} catch{case ... },仔细一看,是Try{}match{}

追踪下代码,原来Try是大写的,是一个伴生对象,apply接收的参数是一个方法,返回Try的实例。在scala.util.Try.scala 代码如下:

// scala.util.Try.scala line 155
object Try {/** Constructs a `Try` using the by-name parameter.  This* method will ensure any non-fatal exception is caught and a* `Failure` object is returned.*/def apply[T](r: => T): Try[T] =try Success(r) catch {case NonFatal(e) => Failure(e)}}

Try有两个子类,都是case class 。分别是Success和Failure。如图

再返回调用处,Try中的代码块最后执行的是 graph.generateJobs(time) 。跟踪下:

返回的是outputStream.generateJob(time)。

// DStreamGraph.scala line 111def generateJobs(time: Time): Seq[Job] = {logDebug("Generating jobs for time " + time)val jobs = this.synchronized {outputStreams.flatMap { outputStream =>val jobOption = outputStream.generateJob(time)jobOption.foreach(_.setCallSite(outputStream.creationSite))jobOption}}logDebug("Generated " + jobs.length + " jobs for time " + time)jobs}

从前文可知,outputStream其实都是ForEachDStream。进入ForEachDStream,override了generateJob。

  1. parent.getOrCompute(time) 返回一个Option[Job]。
  2. 若有rdd,则返回可能是new Job(time,jobFunc)
// ForEachDStream.scala line 46override def generateJob(time: Time): Option[Job] = {parent.getOrCompute(time) match {case Some(rdd) =>val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {foreachFunc(rdd, time)}Some(new Job(time, jobFunc))case None => None}}

那么ForEachDStream的parent是什么呢?看下我们的案例:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}object StreamingWordCountSelfScala {def main(args: Array[String]) {val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala")val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据val lines = ssc.socketTextStream("localhost", 9999) // 监听 本地9999 socket 端口val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reducewords.print() // 打印结果ssc.start() // 启动ssc.awaitTermination()ssc.stop(true)}
}

按照前文的描述:本例中 DStream的依赖是 SocketInputDStream << FlatMappedDStream << MappedDStream << ShuffledDStream << ForEachDStream

笔者扫描了下DStream及其所有子类,发现只有DStream有 getOrCompute,没有一个子类override了此方法。如此一来,是ShuffledDStream.getorCompute

在一般情况下,是RDD不存在,执行orElse代码快,

// DStream.scala line 338/*** Get the RDD corresponding to the given time; either retrieve it from cache* or compute-and-cache it.*/private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {// If RDD was already generated, then retrieve it from HashMap,// or else compute the RDDgeneratedRDDs.get(time).orElse {// Compute the RDD if time is valid (e.g. correct time in a sliding window)// of RDD generation, else generate nothing.if (isTimeValid(time)) {val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {// Disable checks for existing output directories in jobs launched by the streaming// scheduler, since we may need to write output to an existing directory during checkpoint// recovery; see SPARK-4835 for more details. We need to have this call here because// compute() might cause Spark jobs to be launched.PairRDDFunctions.disableOutputSpecValidation.withValue(true) {compute(time)  // line 352}}rddOption.foreach { case newRDD =>// Register the generated RDD for caching and checkpointingif (storageLevel != StorageLevel.NONE) {newRDD.persist(storageLevel)logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")}if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {newRDD.checkpoint()logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")}generatedRDDs.put(time, newRDD)}rddOption} else {None}}}

ShuffledDStream.compute

又调用parent.getOrCompute

// ShuffledDStream.scala line 40override def compute(validTime: Time): Option[RDD[(K, C)]] = {parent.getOrCompute(validTime) match {case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))case None => None}}

MappedDStream的compute,又是父类的getOrCompute,结果又调用compute,如此循环。

// MappedDStream.scala line 34override def compute(validTime: Time): Option[RDD[U]] = {parent.getOrCompute(validTime).map(_.map[U](mapFunc))}

FlatMappedDStream的compute,又是父类的getOrCompute。结果又调用compute,如此循环。

// FlatMappedDStream.scala line 34override def compute(validTime: Time): Option[RDD[U]] = {parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))}

直到DStreamshi SocketInputDStream,也就是inputStream时,compute是继承自父类。

先不考虑if中的逻辑,直接else代码块。

进入createBlockRDD

// ReceiverInputDStream.scala line 69override def compute(validTime: Time): Option[RDD[T]] = {val blockRDD = {if (validTime < graph.startTime) {// If this is called for any time before the start time of the context,// then this returns an empty RDD. This may happen when recovering from a// driver failure without any write ahead log to recover pre-failure data.new BlockRDD[T](ssc.sc, Array.empty)} else {// Otherwise, ask the tracker for all the blocks that have been allocated to this stream// for this batchval receiverTracker = ssc.scheduler.receiverTrackerval blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)// Register the input blocks information into InputInfoTrackerval inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)// Create the BlockRDDcreateBlockRDD(validTime, blockInfos)}}Some(blockRDD)}
new BlockRDD[T](ssc.sc, validBlockIds) line 127,RDD实例化成功
// ReceiverInputDStream.scala line 94private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {if (blockInfos.nonEmpty) {val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray// Are WAL record handles present with all the blocksval areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }if (areWALRecordHandlesPresent) {// If all the blocks have WAL record handle, then create a WALBackedBlockRDDval isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArrayval walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArraynew WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)} else {// Else, create a BlockRDD. However, if there are some blocks with WAL info but not// others then that is unexpected and log a warning accordingly.if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {logError("Some blocks do not have Write Ahead Log information; " +"this is unexpected and data may not be recoverable after driver failures")} else {logWarning("Some blocks have Write Ahead Log information; this is unexpected")}}val validBlockIds = blockIds.filter { id =>ssc.sparkContext.env.blockManager.master.contains(id)}if (validBlockIds.size != blockIds.size) {logWarning("Some blocks could not be recovered as they were not found in memory. " +"To prevent such data loss, enabled Write Ahead Log (see programming guide " +"for more details.")}new BlockRDD[T](ssc.sc, validBlockIds) // line 127}} else {// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD// according to the configurationif (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, Array.empty, Array.empty, Array.empty)} else {new BlockRDD[T](ssc.sc, Array.empty)}}}

此BlockRDD是Spark Core的RDD的子类,且没有依赖的RDD。至此,RDD的实例化已经完成。

// BlockRDD.scala line 30
private[spark]
class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])extends RDD[T](sc, Nil) // RDd.scala line 74
abstract class RDD[T: ClassTag](@transient private var _sc: SparkContext,@transient private var deps: Seq[Dependency[_]]) extends Serializable with Logging

至此,最终还原回来的RDD:

new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(flatMapFunc)).map(_.map[U](mapFunc)).combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)。

在本例中则为

new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true)

而最终的print为

() => foreachFunc(new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true),time)

其中foreachFunc为 DStrean.scala line 766

至此,RDD已经通过DStream实例化完成,现在再回顾下,是否可以理解DStream是RDD的模版。

不过别急,回到ForEachDStream.scala line 46 ,将上述函数作为构造参数,传入Job。

-------------分割线--------------

补充下Job创建的流程图,来源于版本定制班学员博客,略有修改。

补充下RDD按照lineage从 OutputDStream 回溯 创建RDD Dag的流程图,来源于版本定制班学员博客

补充案例中 RDD按照lineage从 OutputDStream 回溯 创建RDD Dag的流程图,来源于版本定制班学员博客

下节内容从源码分析Job提交,敬请期待。

转载于:https://my.oschina.net/corleone/blog/672999

spark streaming 的 Job创建、调度、提交相关推荐

  1. Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Sp ...

  2. .Spark Streaming(上)--实时流计算Spark Streaming原理介

    Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/474 ...

  3. Spark Streaming 新手指南(原始文章已经发布表在IBM Developworks)

    插个小广告:本人的<大话Java性能优化>一书已经在亚马逊.当当.京东.天猫出售,提前谢谢大家的支持. 亚马逊地址:https://www.amazon.cn/%E5%A4%A7%E8%A ...

  4. sparksteaming---实时流计算Spark Streaming原理介绍

    来源:http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spa ...

  5. 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义

    注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...

  6. 使用IntelliJ Idea开发Spark Streaming流应用程序

    使用IntelliJ Idea开发Spark Streaming流应用程序 一.实验目的 二.实验内容 三.实验原理 四.实验环境 五.实验步骤 5.1 启动IntelliJ Idea并创建spark ...

  7. Spark Streaming

    spark streaming介绍 Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性.高吞吐量.可容错性等特点.我们可以从kafka.flume.wi ...

  8. 大数据求索(8):Spark Streaming简易入门一

    大数据求索(8):Spark Streaming简易入门一 一.Spark Streaming简单介绍 Spark Streaming是基于Spark Core上的一个应用程序,可伸缩,高吞吐,容错( ...

  9. 用spark streaming实现黑名单实时过滤

    项目介绍: 本项目用spark streaming实现简单的黑名单实时过滤,用scala语言编写,用到的知识点如下: 1.RDD,弹性分布式数据集 2.ssc.socketTextStream(&qu ...

最新文章

  1. 真相!30K拿到互联网大厂offer,网友:我服了!
  2. JavaBean组件的基本使用-语法
  3. 【 MATLAB 】信号处理工具箱之波形产生函数 gauspuls
  4. 返还网 PK 返利网*2
  5. 【算法基础】常用的数据结构与算法
  6. 【Python面试】 说说Python面向对象三大特性?
  7. Python 实现微信小程序的用户登录
  8. SQL 注入竟然把我们的系统搞挂了
  9. ftp,http YUM库
  10. hive2mysql的udf_hive中的UDF函数
  11. c语言控制台不退出程序,怎样可以屏蔽控制台程序的关闭按钮
  12. Tcpdump的详细用法
  13. SSM网上超市购物商城管理系统、
  14. java简单封装FusionChartsFree.
  15. VS 2019 打包安装应用
  16. JS === 实现多个光标跟随事件
  17. 关于安全域的划分与风险管理
  18. odd ratio置信区间的计算,你学会了吗?
  19. 局部差异二进制 LDB - Local Difference Binary
  20. Java 用键盘输入 int型 String型 char型数据 示例:简单计算功能 eclipse

热门文章

  1. Uva 1625 - Color Length(DP)
  2. Swift - 重写导航栏返回按钮
  3. c语言支持默认参数吗,嵌入式C语言可以带“默认参数”的函数吗
  4. 异步通知实验(信号)
  5. 全国计算机等级考试题库二级C操作题100套(第79套)
  6. 云计算呼叫中心_SaaS云呼叫中心系统只用于销售或客服?
  7. 解决 swap file “*.swp”already exists!问题
  8. 信息系统管理十大知识领域
  9. Linux运维必备的40个命令总结(值得收藏)
  10. 后端:最受欢迎Java数据库访问框架(DAO层)