对于Spark Streaming程序而言,一旦运行起来后,就会不断的从数据流中消费数据,按照Batch间隔生成BatchRDD进行处理,即处于7*24小时运行的状态,除非我们主动将其停止或者遇到异常退出。所谓Graceful Shutdown,即优雅的将Spark Streaming程序停止,本文将重点探讨三点(写作背景:Spark 2.2):

  • 为什么需要Graceful Shutdown?
  • 如何触发Graceful Shutdown?
  • Graceful Shutdown过程是怎样的?

为什么需要Graceful Shutdown

  考虑使用Graceful Shutdown的前提是,业务对数据的准确性要求很高,不允许数据丢失。如果这个前提不成立,其实可以不使用Graceful Shutdown,直接采用"yarn -kill application_id"即可。
  Spark Streaming是基于micro-batch机制工作的,程序在运行中,由Receiver负责从Stream中不断读取数据(比如1秒读取一次),当Batch Interval到达时,会将收下来的数据组合成一个新的BatchRDD来处理。在这个过程中,如果程序出现异常退出,可能会导致正在处理的BatchRDD中的数据或者已经接收下来但是还没有生成BatchRDD的数据丢失。为了避免数据丢失,Spark Streaming引入了Checkpoint和WAL机制,将程序运行的上下文信息和接收的数据持久化到磁盘,从而可以在异常退出后能恢复到上次继续处理。

  Checkpoint机制保证了数据不丢失,但是为程序更新带来了隐患。因为保存下来的数据中包含了当前程序运行的上下文信息,将程序停止、更换新的代码、再重新启动起来时,轻则更新的代码没有生效,重则程序报错无法运行。因此,更新Spark Streaming程序代码时,必须将Checkpoint清除掉,可是这样又引入了数据丢失的问题。
  Graceful Shutdown便是为解决这样的问题而生。通过Graceful Shutdown,首先将Reciever关闭,不再接收新数据,然后将已经收下来的数据都处理完,最后再退出。这样一来,Checkpoint就可以被安全删除了。

如何触发Graceful Shutdown

  关于如何触发Graceful Shutdown,Spark官方文档并没有给出具体的方法。从代码来看,是通过Hadoop的ShutdownHook来实现的,StreamingContext在初始化时会注册一个Hook函数。因此,理论上一切可以触发Shutdown Hook的方法都可以触发Spark Streaming的Graceful Shutdown。

  logDebug("Adding shutdown hook") // force eager creation of loggershutdownHookRef = ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)private def stopOnShutdown(): Unit = {val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")// Do not stop SparkContext, let its own shutdown hook stop itstop(stopSparkContext = false, stopGracefully = stopGracefully)}

  笔者推荐的方法是:发送"kill -SIGTERM pid"信号给Driver进程,在YARN Cluster模式下,也就等同于ApplicationMaster进程。这种做法对代码本身没有任何侵入,具体方法是:

  • 设定Spark配置项: spark.streaming.stopGracefullyOnShutdown = true
  • 发送指令:kill -SIGTERM ApplicationMaster_PID

  自动化脚本可以参考:git: spark_streaming_graceful_shutdown。因为笔者所在产品的Spark业务都是部署在AWS EMR服务上面,所以主要针对如何对EMR上面某个Cluster里面的某个Spark Streaming Application进行Graceful Shutdown而言,基本思路是如下。

Graceful Shutdown 过程

  了解了什么情况下用、怎么用Graceful Shutdown就基本可以应对大多数的场景了,但是人类的好奇心岂会停止?用过Graceful Shutdown的人都会发现,它需要经历2~3 Batch的间隔才会完全停下来,为什么需要这么久?中间在做什么?Graceful Shutdown真的可以做到数据不丢失吗?以笔者的业务为例,就出现了数据丢失,当然缘由是我们对Spark Streaming源码的一处定制改动引发的。
  要搞清楚个中缘由,阅读源码自然是最佳途径,从上面提到的stopOnShutdown入手,一路看下去即可。这里笔者将尽可能跳过源码,以两张图的形式来简单介绍,这两张图也是笔者在公司内部分享时使用的。
  第一种图,是Graceful Shutdown被触发后,Driver中的Spark日志信息(这里的Batch Interval是5分钟)。从图中可以看到,整个过程基本可以分为四个步骤:

  • 在09:03:33时发送SIGTERM信号,StreamingContext进入Graceful Shutdown阶段;
  • 发送信号给所有的Receiver,通知Receiver退出,Receiver退出的具体工作由Receiver所在的Executor来完成;
  • 停止JobGenerator,这个过程从09:04:20持续到09:15:01才结束,包含了三个Batch时间(09:05:00、09:10:00、09:15:00),确保接收的数据被处理完并不再产生新的Job;
  • 按顺序停止其他各个资源

  可以看到,"停止JobGenerator"是整个过程的重点,第二张图主要针对这点。从图中可以看到:

  • 由于Spark Streaming是按照Batch Interval来运转的,已经接收下来但是还没有分配到BatchRDD中的数据,需要等到一个新的Batch到来才会被分配。因此这里可能需要花费1个Batch间隔的时间,之所以说是"可能",是因为如果刚好没有这种"已接收但是没有分配"的数据,就直接过了这关了。
  • 停止JobGeneration Timer只是将stopped这个变量置为true而已,图中的代码为Timer里面的Loop代码,可以看到是通过双重确定来保证没有新的Job产生和处理,这个过程最多需要经历2个Batch的时间才过关。

后话

  最后来说说Graceful Shutdown的缺陷,目前来看有两点:

  • 整个过程需要2~3个Batch 间隔的时间,具体时间取决于Batch Interval值的设定,不管怎么说,都会出现一段时间的Downtime,能不能接受这样的行为就取决于业务了。
  • 如笔者上述所言,在我们对Spark Streaming源码做了一处定制改动后,发现Graceful Shutdown变得不再可靠(后续会撰文来分享),同样的情形会不会出现在读者的业务中呢?

(全文完,本文地址:https://blog.csdn.net/zwgdft/article/details/85849153 )
(版权声明:本人拒绝不规范转载,所有转载需征得本人同意,并且不得更改文字与图片内容。大家相互尊重,谢谢!)

Bruce
2019/01/06 晚


更新于2019/02/15:有新坑,请留意

  笔者撰写这篇文章时,使用的是Spark 2.2.0 + Hadoop 2.7.3,后来在升级时发现了另一个坑:在Hadoop 2.8.0之后,Hadoop对ShutHookManager进行了一个调整(HADOOP-12950),会对所有Shutdown Hook函数默认设置10秒超时时间,超过就会强制停止。这个调整会导致Spark的Graceful Shutdown功能受到影响(SPARK-25020),会在Log中看到如下信息。目前社区已在Hadoop 2.9.2中修复该问题(HADOOP-15679),修复方法为增加了一个配置项:hadoop.service.shutdown.timeout,默认值为30秒。
  针对这个坑,笔者建议跳过Hadoop 2.7.3与2.9.2之间的版本,并在新版本中设置hadoop.service.shutdown.timeout >= 4 * BatchInterval。

19/02/15 02:04:49 ERROR Utils: Uncaught exception in thread pool-4-thread-1
java.lang.InterruptedException: sleep interruptedat java.lang.Thread.sleep(Native Method)at org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:132)at org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:123)at org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:681)at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:680)at org.apache.spark.streaming.StreamingContext.org$apache$spark$streaming$StreamingContext$$stopOnShutdown(StreamingContext.scala:714)at org.apache.spark.streaming.StreamingContext$$anonfun$start$1.apply$mcV$sp(StreamingContext.scala:599)at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1992)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)at scala.util.Try$.apply(Try.scala:192)at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

详解Spark Streaming的Graceful Shutdown相关推荐

  1. 详解 Spark RDD 的转换操作与行动操作

    前言 本期继续讲解 Spark 核心 RDD 编程部分,内容比较干货也比较长,建议大家先收藏. 学习目标 RDD 的创建 RDD 的转换操作 RDD 的行动操作 惰性求值 1. RDD 的创建 Spa ...

  2. linux搭建spark集群,详解Spark+Zookeeper搭建高可用Spark集群

    Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎:现在形成一个高速发展应用广泛的生态系统. Spark三种分布式部署方式比较 目前Apache Spark支持三种分布式部署方式, ...

  3. 详解Spark运行模式(local+standalone+yarn)

    一.简述 Spark 有多种运行模式: 1.可以运行在一台机器上,称为 Local(本地)运行模式. 2.可以使用 Spark 自带的资源调度系统,称为 Standalone 模式. 3.可以使用 Y ...

  4. 详解spark任务提交至yarn的集群和客户端模式

  5. Apache Spark 内存管理详解

    原文出处: IBM developerWorks Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色.理解 Spark 内存管理的基本原理,有助于更好地开发 ...

  6. Spark 内存管理详解(下):内存管理

    本文转自:Spark内存管理详解(下)--内存管理 本文最初由IBM developerWorks中国网站发表,其链接为Apache Spark内存管理详解 在这里,正文内容分为上下两篇来阐述,这是下 ...

  7. 一万五千字详解HTTP协议

    点击上方蓝色字体,选择"设为星标" 回复"资源"获取更多资源 本篇文章篇幅比较长,先来个思维导图预览一下. 一.概述 1.计算机网络体系结构分层 2.TCP/I ...

  8. Spark2.1 内存管理详解

    本文中阐述的原理基于 Spark 2.1 版本,阅读本文需要读者有一定的 Spark 和 Java 基础,了解 RDD.Shuffle.JVM 等相关概念. 在执行 Spark 的应用程序时,Spar ...

  9. Spark Streaming 源码详解

    原地址 本系列内容适用范围:* 2015.12.05 update, Spark 1.6 全系列 √ (1.6.0-preview,尚未正式发布) * 2015.11.09 update, Spark ...

  10. Spark详解(十三):Spark Streaming 运行架构分析

    1. 运行架构 SparkStreaming的主要功能包括流处理引擎的流数据接收与存储以及批处理作业的生成与管理,而Spark核心负责处理Spark Streaming发送过来的作用.Spark St ...

最新文章

  1. OpenCV 【十五】绘直线/椭圆/矩形/圆及其填充
  2. 禁止页面复制功能 js禁止复制 禁用页面右键菜单
  3. CentOS6.5 缺少 libstdc++.so.6(GLIBCXX_3.4.15)
  4. 十强决赛即将拉开帷幕!TECHSPARK星星之火IT创新大赛诚邀您观赛
  5. vue watch 修改滚动条_Vue.js 中滚动条始终定位在底部的方法
  6. 计算机编程语言排行榜—TIOBE世界编程语言排行榜(2021年08月份最新版)
  7. Python名片管理系统
  8. 蘑菇街交易平台 数据库架构演进历程
  9. 微信朋友验证消息是什么来源_微信好友来源怎么改?来源朋友验证消息是什么意思?...
  10. 名帖17 吴让之 篆书《吴让之篆书墨迹》
  11. shared_ptr的引用计数原理
  12. matlab火箭模型,基于Matlab/Simulink的新型火箭建模与仿真平台搭建
  13. 华为南研所提前批面试(2015年8月22日)
  14. VLD的安装使用及其问题
  15. 【FPGA教程案例78】通信案例4——基于FPGA的RLS自适应滤波算法实现
  16. Magik Eye将在2019年嵌入式视觉峰会上宣布突破性的3D感应技术Invertible Light™
  17. 申请GOOGLE的企业邮局
  18. 2019年终总结核医学相关研究分享
  19. 批量的十六进制转字符串(ASCII码) shell脚本
  20. 物联网智能停车解决方案

热门文章

  1. CTSC2018 APIO2018 颓废 + 打铁记
  2. android 钢琴识别音阶对错_练习音阶琶音的方法——你确定自己没弄错?
  3. Vue中使用地图平台MapboxGL
  4. CentOS 7设置获取动态及静态IP地址方法
  5. 项目生命周期和产品生命周期的不同
  6. base64图片上传解析不了问题
  7. 如何将word文件的大小进行压缩?
  8. 房东家的网线不用账号和密码就能上网怎么设置路由器
  9. linux 蓝牙 iphone,Linux On iPhone 7 现在可运行 Wayland
  10. tersorrt安装_pytorch/mxnet模型tensorrt部署