2019独角兽企业重金招聘Python工程师标准>>>

本期内容:

  1. 解密Spark Streaming Job架构和运行机制

  2. 解密Spark Streaming 容错架构和运行机制

  一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。

  Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。

本节课通过从job和容错的整体架构上来考察Spark Streaming的运行机制。

用之前已有的最简单的例子:

// Socket来源的单词计数
// YY课堂:每天20:00现场授课频道68917580
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCountSelfScala")
val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
words.print()
ssc.start()

跟踪源码可以发现:

在初始化 StreamingContext时,创建了如下几个对象:

// StreamingContext.scala line 183
private[streaming] val scheduler = new JobScheduler(this)

而JobScheduler在初始化的时候,会初始化jobGenerator,且包含receiverTracker。

// JobScheduler.scala line 50
private val jobGenerator = new JobGenerator(this) // line 50
val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus()// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null // 56

再看创建DStream的部分

// StreamingContext.scala line 327
def socketTextStream(hostname: String,port: Int,storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String] = withNamedScope("socket text stream") {socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}// StreamingContext.scala line 345
def socketStream[T: ClassTag](hostname: String,port: Int,converter: (InputStream) => Iterator[T],storageLevel: StorageLevel): ReceiverInputDStream[T] = {new SocketInputDStream[T](this, hostname, port, converter, storageLevel) // line 351
}
// SocketInputDStream.scala line 33
private[streaming]
class SocketInputDStream[T: ClassTag](ssc_ : StreamingContext,host: String,port: Int,bytesToObjects: InputStream => Iterator[T],storageLevel: StorageLevel) extends ReceiverInputDStream[T](ssc_) {// 这个方法是关键def getReceiver(): Receiver[T] = {new SocketReceiver(host, port, bytesToObjects, storageLevel)}
}

再看 ssc.start

// StreamingContext.scala line 594
def start(): Unit = synchronized {state match {case INITIALIZED =>startSite.set(DStream.getCreationSite())StreamingContext.ACTIVATION_LOCK.synchronized {StreamingContext.assertNoOtherContextIsActive()try {validate()// Start the streaming scheduler in a new thread, so that thread local properties// like call sites and job groups can be reset without affecting those of the// current thread.ThreadUtils.runInNewThread("streaming-start") {sparkContext.setCallSite(startSite.get)sparkContext.clearJobGroup()sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")scheduler.start() // line 610}state = StreamingContextState.ACTIVE} catch {case NonFatal(e) =>logError("Error starting the context, marking it as stopped", e)scheduler.stop(false)state = StreamingContextState.STOPPEDthrow e}StreamingContext.setActiveContext(this)}shutdownHookRef = ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)// Registering Streaming Metrics at the start of the StreamingContextassert(env.metricsSystem != null)env.metricsSystem.registerSource(streamingSource)uiTab.foreach(_.attach())logInfo("StreamingContext started")case ACTIVE =>logWarning("StreamingContext has already been started")case STOPPED =>throw new IllegalStateException("StreamingContext has already been stopped")}
}

第610行,调用了scheduler.start,scheduler就是之前初始化是产生的JobScheduler。

// JobScheduler.scala line 62
def start(): Unit = synchronized {if (eventLoop != null) return // scheduler has already been startedlogDebug("Starting JobScheduler")eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)}eventLoop.start()// attach rate controllers of input streams to receive batch completion updatesfor {inputDStream <- ssc.graph.getInputStreamsrateController <- inputDStream.rateController} ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker = new ReceiverTracker(ssc) // line 80inputInfoTracker = new InputInfoTracker(ssc)receiverTracker.start()jobGenerator.start()logInfo("Started JobScheduler")
}

请看80行,将receiverTracker初始化:

// ReceiverTracker.scala line 101
private[streaming]
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {private val receiverInputStreams = ssc.graph.getReceiverInputStreams()private val receiverInputStreamIds = receiverInputStreams.map { _.id }private val receivedBlockTracker = new ReceivedBlockTracker(ssc.sparkContext.conf,ssc.sparkContext.hadoopConfiguration,receiverInputStreamIds,ssc.scheduler.clock,ssc.isCheckpointPresent,Option(ssc.checkpointDir))

调用receiverTracker.start和jobGenerator.star

// ReceiverTracker.scala line 148
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {if (isTrackerStarted) {throw new SparkException("ReceiverTracker already started")}if (!receiverInputStreams.isEmpty) {endpoint = ssc.env.rpcEnv.setupEndpoint("ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))if (!skipReceiverLaunch) launchReceivers() // line 157logInfo("ReceiverTracker started")trackerState = Started}
}

launchReceivers()

// ReceiverTracker.scala line 413
private def launchReceivers(): Unit = {val receivers = receiverInputStreams.map(nis => {val rcvr = nis.getReceiver() // 这个就是SocketInputDStream.getReceiver(),本例中是SocketReceiver ,见SocketInputDStream.scala line 34rcvr.setReceiverId(nis.id)rcvr})runDummySparkJob()logInfo("Starting " + receivers.length + " receivers")endpoint.send(StartAllReceivers(receivers)) // line 423
}

看看StartAllReceivers是如何被消费的?

// ReceiverTracker.scala line 448
// Local messages
case StartAllReceivers(receivers) =>val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) // 尽量负载均匀for (receiver <- receivers) {val executors = scheduledLocations(receiver.streamId)updateReceiverScheduledExecutors(receiver.streamId, executors)receiverPreferredLocations(receiver.streamId) = receiver.preferredLocationstartReceiver(receiver, executors) // 启动接收器,不再进一步深究,有兴趣的可以继续查看源码}

再回到JobScheduler.scala line 83,jobGenerator.start

// JobGenerator.scala line 79
def start(): Unit = synchronized {if (eventLoop != null) return // generator has already been started// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.// See SPARK-10125checkpointWritereventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = {jobScheduler.reportError("Error in job generator", e)}}eventLoop.start()if (ssc.isCheckpointPresent) {restart()} else {startFirstTime()}
}

至此消息接收和Job生成器已启动。

在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法

  1.JobGenerator启动后会不断的根据batchDuration生成一个个的Job

  2.ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过ReceivedBlockTracker来管理接受到的元数据信息

  每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行)。

  为什么使用线程池呢?

   1.作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;

  2.有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持。

  第二部分:从容错架构的角度透视Spark Streaming

  我们知道DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操作就是在固定时间上操作RDD。所以从某种意义上而言,Spark Streaming的基于DStream的容错机制,实际上就是划分到每一次形成的RDD的容错机制,这也是Spark Streaming的高明之处。

  RDD作为 分布式弹性数据集,它的弹性主要体现在:

  1.自动的分配内存和硬盘,优先基于内存

  2.基于lineage容错机制

  3.task会指定次数的重试

  4.stage失败会自动重试

  5.checkpoint和persist 复用

  6.数据调度弹性:DAG,TASK和资源管理无关。

  7.数据分片的高度弹性

  基于RDD的特性,它的容错机制主要就是两种:一是checkpoint,二是基于lineage(血统)的容错。一般而言,spark选择血统容错,因为对于大规模的数据集,做检查点的成本很高。但是有的情况下,不如说lineage链条过于复杂和冗长,这时候就需要做checkpoint。

  考虑到RDD的依赖关系,每个stage内部都是窄依赖,此时一般基于lineage容错,方便高效。在stage之间,是宽依赖,产生了shuffle操作,这种情况下,做检查点则更好。总结来说,stage内部做lineage,stage之间做checkpoint。

后续的会有什么更深的内幕?且听下回分解。

转载于:https://my.oschina.net/corleone/blog/669520

通过案例对SparkStreaming透彻理解-3相关推荐

  1. 第3课:SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错...

    本期内容: 解密Spark Streaming Job架构和运行机制 解密Spark Streaming容错架构和运行机制 理解SparkStreaming的Job的整个架构和运行机制对于精通Spar ...

  2. 透彻理解Spring事务设计思想之手写实现

    2019独角兽企业重金招聘Python工程师标准>>> 前言 事务,是描述一组操作的抽象,比如对数据库的一组操作,要么全部成功,要么全部失败.事务具有4个特性:Atomicity(原 ...

  3. 【手写系列】透彻理解MyBatis设计思想之手写实现

    前言 MyBatis,曾经给我的感觉是一个很神奇的东西,我们只需要按照规范写好XXXMapper.xml以及XXXMapper.java接口.要知道我们并没有提供XXXMapper.java的实现类, ...

  4. 【手写系列】透彻理解Spring事务设计思想之手写实现

    事务,是描述一组操作的抽象,比如对数据库的一组操作,要么全部成功,要么全部失败.事务具有4个特性:Atomicity(原子性),Consistency(一致性),Isolation(隔离性),Dura ...

  5. 一维数据高斯滤波器_透彻理解高斯混合模型

    高斯混合模型GMM是一个非常基础并且应用很广的模型.对于它的透彻理解非常重要. 本文从高斯分布开始逐步透彻讲解高斯混合模型 高斯分布 高斯分布有两个参数: μ = mean(数据的中心) σ2 =va ...

  6. rtosucos和linux区别,为什么我们需要uCos?带你透彻理解RTOS

    原标题:为什么我们需要uCos?带你透彻理解RTOS 与uCos见面还是大学的时候,老师让我为毕业设计选一个课题,要求有关嵌入式实时操作系统,于是开始在网上搜索,顺理成章的就发现了uCos,于是开始了 ...

  7. 如何透彻理解 Paxos 算法?

    我们主要讲解"如何透彻理解 Paxos 算法"? 文章目录 Quorum 机制 Quorum 定义 Quorum 的应用 Paxos 节点的角色和交互 Paxos 的节点角色 Pr ...

  8. 透彻理解BN(Batch Normalization)层

    什么是BN Batch Normalization是2015年论文<Batch Normalization: Accelerating Deep Network Training by Redu ...

  9. 透彻理解SLAM中的非线性最小二乘问题

    下面是我从yuque复制过来的,格式有些问题,需要的可以直接看我笔记原文: https://www.yuque.com/docs/share/61fb4428-e631-4b96-b531-b3b09 ...

最新文章

  1. leetcode-160-相交链表(simple)
  2. 重磅直播|嵌入式开发漫漫之路—从小白到技术骨干
  3. windows安装xampp时出现,unable to realloc xxxxxxxx bytes
  4. 可能是GitHub上最好用的文字语法校验工具
  5. 大并发服务器架构 大型网站架构演变
  6. mysql安装配置yum_在CentOS 7下使用yum配置MySQL源并安装MySQL
  7. ZZULIOJ 1121: 电梯
  8. MySQL(一)存储引擎
  9. 《你必须知道的.NET》第五章读书笔记
  10. 会php会javascript,javascript – 只会php和js但不会java,能做手机应用开发吗?
  11. Java版SLG游戏《竜退治2》
  12. Unity3D中粒子系统
  13. android swf 播放器 源码,Android 9.0 flash播放器播放swf源码讲解
  14. xshell如何登陆数据库_Xshell怎么连接数据库?
  15. mysql dateofweek_日历表-月的周数
  16. MySQL社区版下载地址
  17. 华为认证的考试费用和重认证
  18. Python词云图:指定形状、颜色和字体
  19. 看中国魅力女强人 访格力电器总裁董明珠
  20. Android监听蓝牙与设备连接状态、关闭和打开状态

热门文章

  1. Spring MVC中的视图解析ViewResolver
  2. 为什么喝酒有的人会脸红,有的人会脸发白
  3. 编写自己的Arduino库
  4. mysql序列号发生器
  5. 1126 求递推序列的第N项 (Fnb + mod + 思维)
  6. 二.Sql语言的分类及运算符
  7. iOS学习笔记11-多线程入门
  8. 谭浩强C语言程序设计 学习辅导练习题
  9. C语言拼接字符串 -- 使用strcat()函数
  10. Xcode8.0 去除控制台多余打印