目录

自动重启策略和恢复

重启策略配置方式

重启策略分类

代码演示

手动重启并恢复-了解

1.把程序打包

2.启动Flink集群(本地单机版,集群版都可以)

3.访问webUI

4.使用FlinkWebUI提交

5.取消任务

6.重新启动任务并指定从哪恢复

7.关闭/取消任务

8.关闭集群


自动重启策略和恢复

重启策略配置方式

  • 配置文件中

在flink-conf.yml中可以进行配置,示例如下:

restart-strategy: fixed-delayrestart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s
  • 代码中

还可以在代码中针对该任务进行配置,示例如下:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔))

重启策略分类

  • 默认重启策略

如果配置了Checkpoint,而没有配置重启策略,那么代码中出现了非致命错误时,程序会无限重启

  • 无重启策略

Job直接失败,不会尝试进行重启
 设置方式1:flink-conf.yaml
 restart-strategy: none
 ​
 设置方式2:
 无重启策略也可以在程序中设置
 val env = ExecutionEnvironment.getExecutionEnvironment()
 env.setRestartStrategy(RestartStrategies.noRestart())

  • 固定延迟重启策略--开发中使用

设置方式1:
 重启策略可以配置flink-conf.yaml的下面配置参数来启用,作为默认的重启策略:
 例子:
 restart-strategy: fixed-delay
 restart-strategy.fixed-delay.attempts: 3
 restart-strategy.fixed-delay.delay: 10 s
 ​
 设置方式2:
 也可以在程序中设置:
 val env = ExecutionEnvironment.getExecutionEnvironment()
 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
   3, // 最多重启3次数
   Time.of(10, TimeUnit.SECONDS) // 重启时间间隔
 ))
 上面的设置表示:如果job失败,重启3次, 每次间隔10

  • 失败率重启策略--开发偶尔使用

 设置方式1:
 失败率重启策略可以在flink-conf.yaml中设置下面的配置参数来启用:
 例子:
 restart-strategy:failure-rate
 restart-strategy.failure-rate.max-failures-per-interval: 3
 restart-strategy.failure-rate.failure-rate-interval: 5 min
 restart-strategy.failure-rate.delay: 10 s
 ​
 设置方式2:
 失败率重启策略也可以在程序中设置:
 val env = ExecutionEnvironment.getExecutionEnvironment()
 env.setRestartStrategy(RestartStrategies.failureRateRestart(
   3, // 每个测量时间间隔最大失败次数
   Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
   Time.of(10, TimeUnit.SECONDS) // 两次连续重启的时间间隔
 ))
 上面的设置表示:如果5分钟内job失败不超过三次,自动重启, 每次间隔10s (如果5分钟内程序失败超过3次,则程序退出)

代码演示

package cn.it.checkpoint;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;/*** Author lanson* Desc 演示Checkpoint+重启策略*/
public class CheckpointDemo02_RestartStrategy {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//===========Checkpoint参数设置====//===========类型1:必须参数=============//设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier!env.enableCheckpointing(1000);//设置State状态存储介质/*if(args.length > 0){env.setStateBackend(new FsStateBackend(args[0]));}else {env.setStateBackend(new FsStateBackend("file:///D:/ckp"));}*/if(SystemUtils.IS_OS_WINDOWS){env.setStateBackend(new FsStateBackend("file:///D:/ckp"));}else{env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));}//===========类型2:建议参数===========//设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了)//如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500menv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0//设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是  false不是//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是trueenv.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败//设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值)//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//===========类型3:直接使用默认的即可===============//设置checkpoint的执行模式为EXACTLY_ONCE(默认)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟//设置同一时间有多少个checkpoint可以同时执行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1//=============重启策略===========//-1.默认策略:配置了Checkpoint而没有配置重启策略默认使用无限重启//-2.配置无重启策略//env.setRestartStrategy(RestartStrategies.noRestart());//-3.固定延迟重启策略--开发中使用!//重启3次,每次间隔10s/*env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, //尝试重启3次Time.of(10, TimeUnit.SECONDS))//每次重启间隔10s);*///-4.失败率重启--偶尔使用//5分钟内重启3次(第3次不包括,也就是最多重启2次),每次间隔10s/*env.setRestartStrategy(RestartStrategies.failureRateRestart(3, // 每个测量时间间隔最大失败次数Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔Time.of(10, TimeUnit.SECONDS) // 每次重启的时间间隔));*///上面的能看懂就行,开发中使用下面的代码即可env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2.SourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.Transformation//3.1切割出每个单词并直接记为1SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//value就是每一行String[] words = value.split(" ");for (String word : words) {if(word.equals("bug")){System.out.println("手动模拟的bug...");throw new RuntimeException("手动模拟的bug...");}out.collect(Tuple2.of(word, 1));}}});//3.2分组//注意:批处理的分组是groupBy,流处理的分组是keyByKeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);//3.3聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.sinkresult.print();//5.executeenv.execute();}
}

手动重启并恢复-了解

1.把程序打包

2.启动Flink集群(本地单机版,集群版都可以)

/export/server/flink/bin/start-cluster.sh

3.访问webUI

http://node1:8081/#/overview

http://node2:8081/#/overview

4.使用FlinkWebUI提交

cn.checkpoint.CheckpointDemo01

5.取消任务

6.重新启动任务并指定从哪恢复

cn.it.checkpoint.CheckpointDemo01

hdfs://node1:8020/flink-checkpoint/checkpoint/9e8ce00dcd557dc03a678732f1552c3a/chk-34

7.关闭/取消任务

8.关闭集群

/export/server/flink/bin/stop-cluster.sh

2021年大数据Flink(二十八):Flink 容错机制 自动重启策略和恢复相关推荐

  1. 2021年大数据ELK(十八):Beats 简单介绍和FileBeat工作原理

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Beats 简单介绍和FileBeat工作原理 一.Beats 二.FileB ...

  2. 客快物流大数据项目(二十八):大数据服务器环境准备

    目录 大数据服务器环境准备 一.服务器规划 二.Linux虚拟机环境搭建

  3. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

  4. 2021年大数据Hadoop(十五):Hadoop的联邦机制 Federation

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 Hadoop的联邦机制 Federation 背景概述 F ...

  5. 2021年大数据Hadoop(十四):HDFS的高可用机制

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 HDFS的高可用机制 HDFS高可用介绍 组件介绍 Nam ...

  6. 2021年大数据Kafka(十二):❤️Kafka配额限速机制❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka配额限速机制 限制producer端的速率 限制c ...

  7. 2021年大数据HBase(十二):Apache Phoenix 二级索引

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix 二级索引 一.索引分类 ...

  8. 2021年大数据Hive(十二):Hive综合案例!!!

    全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive综合案例 一.需求描述 二.项目表的字段 三.进 ...

  9. 2021年大数据Kafka(十):kafka生产者数据分发策略

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 生产者数据分发策略 策略一:用户指定了partition 策 ...

最新文章

  1. java连接mongodb_java连接mongodb源码解读
  2. Echarts的入门
  3. 使用线性回归识别sklearn中的手写数字digit
  4. 【C#】三种结构:顺序、分支(if、switch、条件运算符)、循环
  5. mac 下php,Mac 下 PHP
  6. JAVA标准输出错误输出,从tsls输出中提取标准错误
  7. 《指针的编程艺术(第二版)》一3.8 改错题
  8. 当数据库遇上自动驾驶,阿里云 DAS 在自治诊断的突破
  9. 正则表达式的贪婪与非贪婪模式
  10. 通过Telnet查询注册服务
  11. cookie helper.php,CookieHelper cook crud 工具类
  12. Qt + 运动控制 (固高运动控制卡)【2】运动控制卡初始化和关闭
  13. 专访李运华:程序员如何在技术上提升自己
  14. (error) CLUSTERDOWN Hash slot not served
  15. 用ReadyBoost加速Windows 7
  16. qemu-kvm设备hot-plug原理分析
  17. 关于java WEb怎么调用matlab(二)
  18. pcb焊接的一些技巧
  19. mipi传输距离3米_HDMI信号远距离如何实现传输?
  20. 软件定义网络SDN(计算机网络-网络层)

热门文章

  1. 端口映射问题:Bad Request This combination of host and port requires TLS.
  2. 2022-2028年中国加气站行业市场研究及前瞻分析报告
  3. 阿里巴巴图学习框架 euler 安装运行记录
  4. VS2012代码提示快捷键
  5. Ubuntu20.04安装zabbix以及Cannot create the configuration file解决
  6. 第五周周记(国庆第三天)
  7. SpringCloud Alibaba微服务实战(五) - Sentinel实现限流熔断
  8. C++ sizeof 运算符的使用
  9. TypeError: cannot concatenate ‘str‘ and ‘list‘ objects
  10. hdu5701-中位数计数