Flink重启策略

为什么需要设置重启策略?

当任务失败时,Flink需要重新启动失败的任务和其他受影响的任务,以将作业恢复到正常状态。

重新启动策略和故障转移策略用于控制任务重新启动。重新启动策略决定是否以及何时可以重新启动失败/受影响的任务。故障转移策略决定应重新启动哪些任务以恢复作业。

NOTE:重启策略需要配合Checkpoint启动,因为需要用到flink的内部State

使用RestartStrategy

配置文件配置

配置文件中是DataSet&DataStream通用的。

如果enableCheckpoint()没有设置,那么restart-strategy默认为:none

如果设置了enableCheckpoint(),那么restart-strategy为:fixed-delay且delay=1s

#这里有3种不同的重启策略,
restart-strategy: none, off, disable|fixeddelay, fixed-delay|failurerate, failure-rate

通过ExecutionConfig配置

//限定重启次数
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // 尝试重启的次数Time.of(10, TimeUnit.SECONDS) // 每次重启之间的时间间隔,即重启尝试时间
)//限定失败率//如果Duration被设为5分钟 = 300s,那么10s尝试重启一次,那么实际可重试30次//failure-rate = n/30 其中n为重试但失败的次数,如果达到一定的阈值,那么任务重启失败
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, // 5min内允许失败的最大次数,可以适当调整Time.of(5, TimeUnit.MINUTES), //用来衡量失败率的时间间隔Time.of(10, TimeUnit.SECONDS) //2个连续的重试尝试之间的时间间隔
))

重启策略 Restart strategy

fixed-delay

#假如 restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts=3 [default]
restart-strategy.fixed-delay.delay=2s [default]举个栗子:
===> 假如 delay=1s,attempts=1,那么重启的策略就为每2秒尝试重启一次,要么重启成功,要么失败进入下一次重启尝试,如果累计重试次数达到3次但是任然没有成功,那么这个task重启就算失败

failure-rate

restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s举个栗子:
===> 假如failure-rate-interval=5min,max-failures-per-interval=3,delay=10,那么重启策略就是每10s尝试重启一次,如果连续重试失败次数超过3次,那么表示重启失败

non-restart

不启用重启策略

fallback-restart

Flink自动管理重启策略,如果用这个策略,那么默认就是使用fixed-dalay

失败策略Failover strategy

官网参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html

可以通过flink-conf.yaml来设置failover strategy

Restart All Failover Strategy

  • 这个策略是重启整个job中所有的task,从失败恢复到正常状态

Restart Pipelined Region Failover Strateg

  • 用来决定在region 失败策略中的region范围,这种策略比重启所有任务代价要小的多env.getConfig.setExecutionMode(ExecutionMode.PIPELINED)
jobmanager.execution.failover-strategy value to config
Restart all 重启所有的任务 Full
Restart pipelined region 重启单个分区内的任务 Region

简单的实践Checkpoint代码

package com.shufang.state.chekpointimport com.shufang.broadcast.People
import com.shufang.entities.WorkPeople
import com.shufang.source.MyUDFPeopleSource
import org.apache.flink.api.common.ExecutionMode
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.co.{BroadcastProcessFunction, KeyedBroadcastProcessFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject CheckPointDemo {def main(args: Array[String]): Unit = {//获取执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//用来决定在region (failover strategy)失败策略中的region范围env.getConfig.setExecutionMode(ExecutionMode.PIPELINED)/*** --------------------------------------checkpoint的配置-----------------------------------------------*/env.enableCheckpointing(1000) //每1s checkpoint 一次env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //默认是EXACTLY_ONCEenv.getCheckpointConfig.setCheckpointInterval(1000) //每隔 1s进行一次checkpoint 的工作env.getCheckpointConfig.setCheckpointTimeout(6000) //如果checkpoint操作在6s之内没有完成,那么就discard终端该checkpoint操作//true:假如在checkpoint过程中产生了Error,那么Task直接显示失败//false:产生了error,Task继续运行,checkpoint会降级到之前那个状态env.getCheckpointConfig.setFailOnCheckpointingErrors(false) //默认为trueenv.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //在统一时间只能同时有1个checkpoint操作,其他的操作必须等当前操作执行完或者超时之后才能执行env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) //清除或保留状态env.getCheckpointConfig.setMinPauseBetweenCheckpoints(0) //下一个checkpoint操作触发之前最小的阻塞时间,必须>=0/** --------------------------------------配置重启策略----------------------------------------------------* When a task failure happens, (当一个任务失败后)* Flink needs to restart the failed task and other affected tasks to recover the job to a normal state.* (Flink 需要重启失败的任务和其他受影响的task并恢复到一个正常的状态)* 重启配置与checkpoint设置有关:* 如果没有开启checkpoint,那么重启策略为:no restart!* 如果开启了checkpoint,那么重启策略默认为:fixed-delay strategy is used with Integer.MAX_VALUE** restart-strategy 可以在flink-conf.yaml中进行设置,也可以通过env.setRestartStrategy()设置*//*env.setRestartStrategy(RestartStrategies.failureRateRestart(10,Time.minutes(5),Time.seconds(10)))*///env.setRestartStrategy(new RestartStrategies.FallbackRestartStrategyConfiguration) //自动按照fixed-dalay重启策略/*env.setRestartStrategy(new RestartStrategies.FailureRateRestartStrategyConfiguration(10,Time.minutes(5),Time.seconds(10)))*///env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration())//env.setRestartStrategy(new RestartStrategies.FixedDelayRestartStrategyConfiguration(5,Time.seconds(4)))//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,Time.seconds(4)))val config = new RestartStrategies.FailureRateRestartStrategyConfiguration(3, Time.minutes(5), Time.seconds(10))env.setRestartStrategy(config)val ds: DataStream[WorkPeople] = env.addSource(new MyUDFPeopleSource)val ds1: DataStream[(Int, Char)] = env.fromElements((1, '男'), (2, '女'))val describer = new MapStateDescriptor[Int, Char]("genderInfo", classOf[Int], classOf[Char])val bcStream: BroadcastStream[(Int, Char)] = ds1.broadcast(describer)val resultStream: DataStream[People] = ds.connect(bcStream).process(new BroadcastProcessFunction[WorkPeople, (Int, Char), People] {override def processElement(value: WorkPeople,ctx: BroadcastProcessFunction[WorkPeople, (Int, Char), People]#ReadOnlyContext,out: Collector[People]): Unit = {val gender: Char = ctx.getBroadcastState(describer).get(value.genderCode).charValue()out.collect(People(value.id, value.name, gender, value.address, value.price))}override def processBroadcastElement(value: (Int, Char), ctx: BroadcastProcessFunction[WorkPeople, (Int, Char), People]#Context, out: Collector[People]): Unit = {ctx.getBroadcastState(describer).put(value._1, value._2)}})ds.print("before:")resultStream.print("after:")env.execute("checkpoint")}
}

Flink重启策略Restart-Strategy相关推荐

  1. Flink重启策略和故障恢复策略

    当Flink的task发生故障的时候,Flink需要重启出错的task以及其他受到影响的task,以使得作业恢复到正常状态.Flink通过重启策略和故障恢复策略来控制task的重启,重启策略决定是否可 ...

  2. 【Flink】源码-Flink重启策略-简介 Task恢复策略 重启策略监听器

    文章目录 1.概述 3.固定间隔 4.失败率 4.1 案例 5. 无重启策略 5.1 案例 6.实际代码演示 7. Task恢复策略 8.重启策略监听器 8.1 测试 M.参考 1.概述 ​ Flin ...

  3. Flink的重启策略

    Flink的重启策略 Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启.集群可以通过默认的重启策略来重启,这个默认的重启策略通常在未指定重启策略的情况下使用,而如果Job提交的时候 ...

  4. 1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等

    1.15.Flink state(状态)管理与恢复 1.15.1.什么是state 1.15.2.状态(State) 1.15.3.Keyed State 1.15.4.Operator State ...

  5. 2021年大数据Flink(二十八):Flink 容错机制 自动重启策略和恢复

    目录 自动重启策略和恢复 重启策略配置方式 重启策略分类 代码演示 手动重启并恢复-了解 1.把程序打包 2.启动Flink集群(本地单机版,集群版都可以) 3.访问webUI 4.使用FlinkWe ...

  6. docker容器的重启策略:通过--restart来指定

    Docker容器的重启策略如下: 如:--restart=always no,默认策略,在容器退出时不重启容器 on-failure,在容器非正常退出时(退出状态非0),才会重启容器 on-failu ...

  7. docker学习笔记-为容器配置重启策略

    docker容器配置重启策略 docker restart policies docker容器的退出状态码 0 表示正常退出 非0 表示异常退出(退出状态码采用chroot标准) 125 Docker ...

  8. .net core i上 K8S(四).netcore程序的pod管理,重启策略与健康检查

    目录 1.pod管理 2.重启策略 3.健康检查 4.进入容器 正文 上一章我们已经通过yaml文件将.netcore程序跑起来了,但还有一下细节问题可以分享给大家. 1.pod管理 1.1创建pod ...

  9. Docker容器的重启策略

    1. Docker容器的重启策略 Docker容器的重启策略是面向生产环境的一个启动策略,在开发过程中可以忽略该策略. Docker容器的重启都是由Docker守护进程完成的,因此与守护进程息息相关. ...

  10. Docker学习之docker重启参数--restart=always的作用

    docker重启参数–restart=always的作用 -restart=always参数能够使我们在重启docker时,自动启动相关容器. Docker容器的重启策略如下: no,默认策略,在容器 ...

最新文章

  1. 一文详解PnP算法原理
  2. 安卓9去掉搜索栏_安卓福音,史上最强搞机工具箱,一键修手机
  3. 3.6 激活函数-深度学习-Stanford吴恩达教授
  4. appium---【Mac】Appium-Doctor提示WARN:“ opencv4nodejs cannot be found”解决方案
  5. html5app微信登陆,基于h5+的微信登陆,hbuilder打包
  6. c# 两个数的加减乘除
  7. Linux学习:第一章-Linux简介
  8. C语言程序设计第一次实验
  9. SOLA(苏拉)病毒
  10. 微信小程序 腾讯云ocr 身份证识别
  11. VB基础知识之Do...Loop循环
  12. 原理图学习(点读笔调试)
  13. 数据挖掘—Apriori算法
  14. 客户关系管理系统的三个不同阶段
  15. 计算机无法关闭密码保护,win7的密码保护共享关闭不了怎么办_解决win7的密码保护共享关闭不了的方法...
  16. excel打开html非常慢,excel打开速度很慢的解决方法
  17. 2022-2028全球与中国生物基聚氨酯(PU)市场现状及未来发展趋势
  18. MPLAB常见问题及解决方法
  19. Arduino压电震动传感器
  20. 13.DoS防御----BeEF浏览器渗透----暴力破解之美杜莎---DNS指南

热门文章

  1. DNS劫持和DNS污染的区别
  2. DNS污染是什么意思?DNS污染解决方案
  3. 计算机禁用网络后怎么打开,无线网关,教您笔记本无线网络禁用后怎么开启
  4. Redhat认证考试心得之一死记硬背篇
  5. np和tensor转换
  6. Android批量打包-如何一秒内打完几百个apk渠道包
  7. 日本語 IME输入法(Microsoft 输入法)切换问题
  8. 从哲学的视角看待项目生命周期——构建不一样的世界
  9. 怀旧小霸王游戏机网页源码
  10. 人类的心理行为模式(几个心理学实验)