状态管理与容错机制

  • 术语
  • 状态管理
  • 容错机制
    • 状态一致性
    • 检查点(checkpoint)
    • 保存点(savepoint)
    • 状态后端(state backend)
  • 案例

术语

算子状态、键控状态、状态一致性、检查点、保存点、状态后端。

状态管理

流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过 90 度时发出警告。有状态的计算则会基于多个事件输出结果。

状态分为两类:

  • 算子状态(operator state)
    算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
  • 键控状态(keyed state)
    键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。

算子状态提供了三种类型的数据结构:

  • 列表状态(List state)
    将状态表示为一组数据的列表。
  • 联合列表状态(Union list state)
    也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
  • 广播状态(Broadcast state)
    如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

键控状态支持的数据类型:

  • ValueState保存单个的值,值的类型为 T
  • ListState保存一个列表,列表里的元素的数据类型为 T。
  • MapState<K, V>保存 Key-Value 对。
  • ReducingState
  • AggregatingState<I, O>

容错机制

状态一致性

  • at-most-once: 至多一次,这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。
  • at-least-once: 至少一次,这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
  • exactly-once: 精确一次,这指的是系统保证在发生故障后得到的计数结果与正确值一致。

检查点(checkpoint)

Flink 具体如何保证 exactly-once 呢? 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。Flink 检查点的核心作用是确保状态正确,即使遇到程序中断,也要正确。

  • 分界线(barrier)

保存点(savepoint)

保存点,主要用来手动保存Checkpoint,用于集群的迁移,升级,任务重启。

保存点与检查点的区别

以我个人目前的理解:
checkpoint,主要是用来做故障恢复。而savepoint,主要用户做主动恢复。前者是防止因为各种原因造成了任务执行中断,重启后尽量从某个时刻开始执行。savepoint,主要是认为触发,属于主动意图,与前者不同。

状态后端(state backend)

传送门–了解状态后端

  • MemoryStateBackend
    内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在 TaskManager 的 JVM 堆上;而将 checkpoint 存储在 JobManager 的内存中。
  • FsStateBackend
    将 checkpoint 存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟 MemoryStateBackend 一样,也会存在 TaskManager 的 JVM 堆上。
  • RocksDBStateBackend
    将所有状态序列化后,存入本地的 RocksDB 中存储。传送门–深入了解

总结:FsStateBackend、 RocksDBStateBackend主要用于生产环境,都是将checkpoint保存到指定的文件系统,但是区别在于FsStateBackend受限于JVM对内存能报错的状态数据容量,RocksDBStateBackend不受限与堆内存,而受限于本地磁盘容量。

案例

他人博客–传送门

  1. 停止集群,从检查点恢复
  2. 保存检查点,从保存检查点恢复
  1. 准备一个计算程序
    程序逻辑很简单,计算输入的单词总个数
/*** 计算输入的单词总数量*/
public class KeyStateTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ExecutionConfig config = env.getConfig();config.setAutoWatermarkInterval(0);env.setParallelism(1);//设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/flink/checkpoint"));
//        env.getCheckpointConfig().setCheckpointStorage("file:///E:\\BaiduNetdiskDownload\\大数据\\尚硅谷大数据技术之Flink(Java版)\\checkpoint\\totalWC");//检查点执行周期,单位毫秒env.enableCheckpointing(1000);//精确一次env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//检查点运行超时时间env.getCheckpointConfig().setCheckpointTimeout(60000);//检查点最小间隔时间env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//检查点并发数量,1:不允许并行多个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//是否优先使用查检查点恢复,官方标记过时,不建议使用,可能造成数据丢失或者重复输出env.getCheckpointConfig().setPreferCheckpointForRecovery(false);//容忍检查点失败次数env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);//设置检查点重启策略,重试6次,每次间隔5分钟env.setRestartStrategy(RestartStrategies.fixedDelayRestart(6,60*1000*5));DataStream<String> dataStream = env.socketTextStream("hadoop101", 9999);dataStream.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (value, out) -> {if (StringUtils.isNotBlank(value)) {String[] split = value.split("\\s+");for (int i = 0; i < split.length; i++) {String word = split[i];out.collect(new Tuple2<>(word, 1L));}}})//lamda表达式,泛型擦除,需要声明返回类型.returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(data -> "all").map(new RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {private ValueState<Long> totalCountState;@Overridepublic void open(Configuration parameters) throws Exception {this.totalCountState = getRuntimeContext().getState(new ValueStateDescriptor<>("totalCount",Long.class));}@Overridepublic Tuple2<String, Long> map(Tuple2<String, Long> value) throws Exception {Long totalCount = totalCountState.value();if(totalCount == null){totalCount = 0L;}totalCount += value.f1;totalCountState.update(totalCount);return new Tuple2<>(value.f0,totalCount);}}).print();env.execute();}
}
  1. 部署到yarn集群
bin/flink run -t yarn-per-job \
-c com.xbz.bigdata.study.flink.state.KeyStateTest \
/opt/module/flink-1.13.2/myjobs/flink-study-1.0.0.jar
  1. 在hadoop101:9999输入一些数据,让state出现数据
hello world
aa bb cc

次数的总次数状态应该是5

  1. 停止集群(注意不要使用yarn application --kill or flink cancel因为正常的停止flink应用,checkpoint数据会清空。–>这里是模拟故障
# 无法继续写数据
stop-dfs.sh
# 停止yarn
stop-yarn.sh

此时我们发现任务目录下存在chk-XX,有数据

5. 从检查点恢复任务
-s:指定检查点文件

bin/flink run -t yarn-per-job \
-s hdfs://hadoop101:8020/flink/checkpoint/2c5f4ed32bd658216aacb5d380adb543/chk-202/_metadata \
-c com.xbz.bigdata.study.flink.state.KeyStateTest \
/opt/module/flink-1.13.2/myjobs/flink-study-1.0.0.jar
  1. 继续输入一个单词,验证计算结果是否是从5开始的?

    状态恢复成功!

从保存点恢复
7. 首先savepoint,当前jobid=36f3ad2725eeba680742a8442350b24d
解释:因为我们的job是在per-job模式下启动,所以在savepoint指令中需要通过-t yarn-per-job -Dyarn.application.id=application_1631441939093_0006才能执行savepoint,启动application_1631441939093_000是yarn集群的应用ID

bin/flink savepoint \
-t yarn-per-job \
-Dyarn.application.id=application_1631441939093_0006 \
36f3ad2725eeba680742a8442350b24d \
hdfs://hadoop101:8020/flink/savepoint/

此时hdfs中出现了一个文件夹

  1. 从保存点恢复
bin/flink run -t yarn-per-job \
-s hdfs://hadoop101:8020/flink/savepoint/savepoint-36f3ad-8af7081a1e1b/_metadata \
-c com.xbz.bigdata.study.flink.state.KeyStateTest \
/opt/module/flink-1.13.2/myjobs/flink-study-1.0.0.jar

输入一个单词,测试一下输出是否为7?

完毕!

大数据入门--Flink(四)状态管理与容错机制相关推荐

  1. Flink状态管理和容错机制介绍

    作者: 施晓罡 本文来自2018年8月11日在北京举行的 Flink Meetup会议,分享来自于施晓罡,目前在阿里大数据团队部从事Blink方面的研发,现在主要负责Blink状态管理和容错相关技术的 ...

  2. 大数据入门的四个必备常识

    一.大数据分析的五个基本方面 1.可视化分析 大数据分析的使用者有大数据分析专家,同时还有普通用户,但是他们二者对于大数据分析最基本的要求就是可视化分析,因为可视化分析能够直观的呈现大数据特点,同时能 ...

  3. 大数据_Flink_Java版_状态管理(2)_算子状态---Flink工作笔记0061

    在flink中我们说,对于reduce,map,flatMap等这样的算子,他的状态,只是局限于这个任务的,不可能传输到其他任务对吧,因为我们知道,不同的任务可能在不同的节点上,那么我们知道状态都是存 ...

  4. Flink 状态管理与 Checkpoint 机制

    点击上方"zhisheng",选择"设为星标" 一.状态分类 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算.即你可以将中间的计算结果 ...

  5. 大数据入门(八)win10下的wordcount

    目录 方法一 上传文件到hdfs java project 方法二 参考 有两种方法:方法一需要借用eclipse自己编写代码,优点是有助于理解mapreduce,缺点复杂.方法二可以直接调用Hado ...

  6. 【大数据入门核心技术-Flume】(四)使用Flume采集数据到Hive

    [大数据入门核心技术-Kafka](七)Ka 录 一.准备工作 1.Hadoop环境安装 2.Flume安装部署 二.采集数据到HDFS 1.配置任务文件 2.启动传输 3.查看是否同步成功 三.常见 ...

  7. 女友问粉丝过万如何庆祝,我发万字长文《保姆级大数据入门篇》感恩粉丝们支持,学姐|学妹|学弟|小白看了就懂

    2021大数据领域优质创作博客,带你从入门到精通,该博客每天更新,逐渐完善大数据各个知识体系的文章,帮助大家更高效学习. 有对大数据感兴趣的可以关注微信公众号:三帮大数据 目录 粉丝破万了 新星计划申 ...

  8. 女友问粉丝过万如何庆祝,我发长文《保姆级大数据入门篇》感恩粉丝们支持,学姐|学弟看了就懂

    文章目录 粉丝破万了 新星计划申请时粉丝数 新星内卷抢热榜之旅 运营整顿新星执行新规 重整旗鼓输出内容为王 女友问粉丝过万如何庆祝 保姆级大数据入门篇 一.学习重点划定 二.Java和大数据关系 三. ...

  9. 2021年大数据HBase(四):HBase的相关操作-客户端命令式!【建议收藏】

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 HBase的相关操作-客户端命令式 1.进入HBase ...

最新文章

  1. mongodb 对象唯一索引_什么是MongoDB?简介,架构,功能和示例
  2. 同构多核和异构多核简单介绍
  3. 关于js的回调函数的一点看法
  4. 浅谈unity中gamma空间和线性空间
  5. 一休和尚 小布偶 晴天娃娃
  6. 兔子生崽问题编程_贵阳在哪里可以学到好的少儿编程
  7. spring依赖注入_Spring依赖注入
  8. 向Docker容器中导入sql文件
  9. 猎豹移动回应被谷歌下架:积极整改情况下被单方面下架
  10. 武汉月薪1万5,感到焦虑怎么办?
  11. macOS 12兼容机型列表 想知道你的Mac是否支持macOS Monterey吗?
  12. 计算机网络课程实验5——交换机VLAN
  13. 2.1.5、会员权限管理系统应用
  14. VS2017安装CLR
  15. codevs 4939 欧拉函数
  16. python常用内置库时间,日期与JSON转换
  17. 卡卡IT学院模式:轻培训业态重线下资源整合
  18. SMT操作员是做什么的?工作职责?
  19. 讯飞智能录音笔SR302为职场人带来办公新体验
  20. 基层服务项目服务器一般几年,基层事业单位有5年服务期,期间可以提拔或调动吗?看完知道了!...

热门文章

  1. 红米note4x开启root权限
  2. 红米note8pro微信无法连接服务器,在红米Note8Pro微信浏览网页background-image图片无法加载?...
  3. 2.2 拓扑空间与连续映射
  4. 2017吉林(长春)第十九届国际供热供暖、锅炉、空调及节能减排技术设备展览会会刊(参展商名录)
  5. 计算机实验adda转换心得体会,dsp实验ADDA
  6. Nginx反向代理的两种配置方式
  7. Linux更换镜像源
  8. 服务器系统2012桌面怎么弄,Windows Server 2012 R2配置成桌面操作系统的详细设置
  9. 2023轩辕奖出炉,怿星科技共创荣誉
  10. App引流推广:能够提高用户的转化的技术