本文目标:搞明白每个批次作业运行完毕后,是如何进行清理工作的。
  回到提交作业的地方,即JobGeneratorgenerateJobs这里,沿着这条线顺藤摸瓜找到清理任务的入口,可以看到任务生成成功后会提交任务运行,摸瓜路线:JobGenerator.generateJobs() --> jobScheduler.submitJobSet() --> JobHandler.run() --> eventLoop发送一个JobCompleted事件 --> jobScheduler.handleJobCompletion()顺藤摸瓜到这里之后,就找到了任务运行结束后的处理逻辑,对这个方法有必要简单分析一下,看注释:

private def handleJobCompletion(job: Job, completedTime: Long) {val jobSet = jobSets.get(job.time)//获取当前批次锁产生的所有job集合jobSet.handleJobCompletion(job)//job处理完成,将当前任务从incompleteJobs中去除job.setEndTime(completedTime)listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))//通知UI显示logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)if (jobSet.hasCompleted) {//判断当前批次的所有job是否均处理完毕jobSets.remove(jobSet.time) //当前批次所有任务处理完毕,移除当前批次的job集合jobGenerator.onBatchCompletion(jobSet.time)//进入批处理结束后收尾工作,入口!!!!logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(jobSet.totalDelay / 1000.0, jobSet.time.toString,jobSet.processingDelay / 1000.0))listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))}//如果任意一个job报错,直接报告异常,否则直接passjob.result match {case Failure(e) =>reportError("Error running job " + job, e)case _ =>}}//任务完成,从incompleteJobs删除对应jobdef handleJobCompletion(job: Job) {incompleteJobs -= jobif (hasCompleted) processingEndTime = System.currentTimeMillis()}def hasCompleted: Boolean = incompleteJobs.isEmpty

  注释列出了三个方法的代码,handleJobCompletionhasCompleted是辅助内容,很显然入口就在jobGenerator.onBatchCompletion(jobSet.time)这里,继续追踪发现路线:eventLoop.post(ClearMetadata(time)) --> jobGenerator.clearMetadata(),至此,clearMetadata就是我们要看的清理工作的核心方法,代码和注释如下:

private def clearMetadata(time: Time) {// 清理DStream中产生的RDD数据ssc.graph.clearMetadata(time)// If checkpointing is enabled, then checkpoint,// else mark batch to be fully processed// 如果设置了checkpoint,发布checkpoint消息进行checkpointif (shouldCheckpoint) {eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))} else {// If checkpointing is not enabled, then delete metadata information about// received blocks (block data not saved in any case). Otherwise, wait for// checkpointing of this batch to complete.val maxRememberDuration = graph.getMaxInputStreamRememberDuration()// 清理receiverTrack记录的block信息jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)//清理receiver数据源的信息jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)//更新最新处理完毕的批次时间markBatchFullyProcessed(time)}}

  关于这里的清理而言,之前的文章已经介绍过,每个批次数据流是由inputStreamoutputStream这种,中间会经历一系列的处理在每个DStream中生成RDDssc.graph.clearMetadata(time)这一步的工作就是把这些处理过程中产生的RDD数据给清理,清理的规则主要就是依据应用中设置的rememberDuration时间是多久,存活时间超过这个的都会被从每个DStreamgeneratedRDD中清理掉,具体清理路线就是从outputStream反向链式对依赖的DStream中数据进行清理。
  对于jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)而言,它的内部主要就是删除ReceiverTracker中自带的ReceiverBlockTracker中记录的关于Block的信息;同时,如果设置开启了WAL,则还会把内容写入到WAL中。不过这里有一点可以了解一下,先看看cleanupOldBlocksAndBatches的代码:

def cleanupOldBlocksAndBatches(cleanupThreshTime: Time) {// Clean up old block and batch metadatareceivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)// Signal the receivers to delete old block dataif (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {logInfo(s"Cleanup old received batch data: $cleanupThreshTime")synchronized {if (isTrackerStarted) {endpoint.send(CleanupOldBlocks(cleanupThreshTime))}}}}

  这段代码中,if条件是如果配置了在Receiver中允许对每个Block数据块写WAL,那么在任务运行完毕后需要删除对应Receiver中保存的数据块;否则,保留Receiver端的数据块不予删除。这样做很显然是为了支持容错,因为如果在ReceiverBlock的生成写了WAL,那么在出现异常进行恢复的时候,可以直接通过WAL日志来恢复数据块用于计算,所以压根不需要再保留计算过后的数据块,虽然保留数据块能省下恢复数据块的时间,但是属于空间换时间,而且不删除的话那写WAL日志就没有意义了;反之,如果没启动Receiver端写WAL的配置,那么就必须保留之前的Block,不然恢复的时候就会存在数据缺失。针对这块配置官网有直接的解释如下:

Since Spark 1.2, we have introduced write-ahead logs for achieving strong fault-tolerance guarantees. If enabled, all the data received from a receiver gets written into a write-ahead log in the configuration checkpoint directory. This prevents data loss on driver recovery, thus ensuring zero data loss (discussed in detail in the Fault-tolerance Semantics section). This can be enabled by setting the configuration parameter spark.streaming.receiver.writeAheadLog.enable to true. However, these stronger semantics may come at the cost of the receiving throughput of individual receivers. This can be corrected by running more receivers in parallel to increase aggregate throughput. Additionally, it is recommended that the replication of the received data within Spark be disabled when the write-ahead log is enabled as the log is already stored in a replicated storage system. This can be done by setting the storage level for the input stream to StorageLevel.MEMORY_AND_DISK_SER.

  这段话就说明了如果启动写WAL,那么单个Receiver的吞吐量就是有所下降,原因应该是由于之前如果不写WAL的时候,只需要生成这个Block,然后通过异步的方式执行Block的复制,但是现在如果需要执行写WAL之后,得先写WAL成功后,再生成Block,时间多了写WAL的时间,自然每次耗时是要多一些的,解决办法就是多设置一些Receiver
  继续回归正轨,上面说完了清除完DStreamGraph中所有DStream中数据后,如果不需要checkpoint情况下的清理工作。接下来继续进入JobGenerator.clearMetadata中需要进行checkpoint部分的逻辑,这里主要是发布一个DoCheckpoint事件,对应方法代码如下:

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {// 设置了需要checkpoint,并且当前批次的时间也符合设置的checkpoint时间间隔logInfo("Checkpointing graph for time " + time)// 更新graph中所有的DStream对应的需要checkpoint的内容。ssc.graph.updateCheckpointData(time)// 将spark streaming自身的运行状态进行checkpoint保存checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)} else if (clearCheckpointDataLater) {markBatchFullyProcessed(time)}}

  至此,主要的清理工作和后续的checkpoint备份工作已经完成,当然这里对于checkpoint的具体内容这一部分还可以继续深入了解,不过关于checkpoint的这部分内容展开来研究内容也会更多,后续我们可以找机会专门写关于checkpoint的内容。

Spark Streaming每个批次完毕后的清道夫工作分析相关推荐

  1. Cris 的 Spark Streaming 笔记

    一.Spark Streaming 概述 1.1 Spark Streaming是什么 Spark Streaming用于流式数据的处理.Spark Streaming支持的数据输入源很多,例如:Ka ...

  2. 通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验

    本期内容 : spark streaming另类在线实验 瞬间理解spark streaming本质 一.  我们最开始将从Spark Streaming入手 为何从Spark Streaming切入 ...

  3. grafana计算不同时间的差值_大数据时代!如何基于Spark Streaming构建实时计算平台...

    随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台 ...

  4. Spark Streaming与流处理

    Spark Streaming与流处理 一.流处理 1.1 静态数据处理 在流处理之前,数据通常存储在数据库,文件系统或其他形式的存储系统中.应用程序根据需要查询数据或计算数据.这就是传统的静态数据处 ...

  5. Spark Streaming 实时计算在甜橙金融监控系统中的应用、性能优化、任务监控

    1 写在前面 目前公司对实时性计算的需要及应用越来越多,本文选取了其中之一的 Spark Streaming 来介绍如何实现高吞吐量并具备容错机制的实时流应用.在甜橙金融监控系统项目中,需要对每天亿万 ...

  6. 基于 ELK Stack 和 Spark Streaming 的日志处理平台设计与实现

    概述 大数据时代,随着数据量不断增长,存储与计算集群的规模也逐渐扩大,几百上千台的云计算环境已不鲜见.现在的集群所需要解决的问题不仅仅是高性能.高可靠性.高可扩展性,还需要面对易维护性以及数据平台内部 ...

  7. spark streaming 的 Job创建、调度、提交

    2019独角兽企业重金招聘Python工程师标准>>> 上文已经从源码分析了Receiver接收的数据交由BlockManager管理,整个数据接收流都已经运转起来了,那么让我们回到 ...

  8. 大数据技术之Spark Streaming概述

    前言 数据处理延迟的长短 实时数据处理:毫秒级别 离线数据处理:小时 or 天 数据处理的方式 流式(streaming)数据处理 批量(batch)数据处理 spark Streaming也是基于s ...

  9. Spark Streaming 编程指南[中英对照]

    2019独角兽企业重金招聘Python工程师标准>>> 基于Spark 2.0 Preview的材料翻译,原[英]文地址: http://spark.apache.org/docs/ ...

最新文章

  1. python基础学习笔记第一天
  2. Altium designer中提示some net were not able to be matched问题解决办法
  3. js实现贪吃蛇小游戏
  4. 树链剖分+线段树 单点修改 区间求和 模板
  5. 使用Azure Pipelines从GitHub发布NuGet包
  6. 面试不谈钱,难道要我跟你谈恋爱?真会扯
  7. 【Linux】Linux 标准目录结构
  8. SLAM之g2o安装
  9. 【编程好习惯】永远将头文件作为定义和引用的桥梁
  10. Capture One Pro 22 for Mac(RAW图像处理软件)中文版
  11. str_pos php,关于php中str_replace替换漏洞的分析
  12. SSM框架01--springmvc
  13. Node.js 教程第十三篇——WebSocket
  14. 生产系统仿真软件,实现数字化工厂的利器!
  15. mongodb的初步使用
  16. SEM和TEM的相同点和不同点
  17. 【cocos creator与C++知识分享】 二.anysdk打包微信登录、微信分享
  18. 一个实验了解什么是ISIS
  19. 沈阳市计算机学校1996届,生命科学学院1996届应用生物班校友回母校举行毕业20周年联谊会...
  20. 适合新手的MySQL的基本操作第三期——存储过程篇

热门文章

  1. android开机调用搜狗输入法
  2. 欧姆龙NJ/NXPLC 全ST程序案例
  3. 前端知识体系思维导图
  4. 基于java企业门户网站设计与实现
  5. 2020iPadAir(第四代)对比iPadPro(第二代)
  6. LC-3 指令集注释规范
  7. 养生指南 3 : 人的健康离不开两大要素 : 足够的气血 / 畅通的经络
  8. Win32 Disk Imager Error 5: Access is Denied 解决方案
  9. Git入门介绍-1-简单介绍
  10. Vulnhub靶场题解