关于flink的setCommitOffsetsOnCheckpoints
提交 offset 到 kafka,一般都是作为外部进度的监控,想实时知道作业消费的位置和 lag 情况
如果启用了 checkpoint,但是禁用 CommitOffsetsOnCheckpoints, kafka 消费组的 offset 不会提交到 kafka,也就是说: 消费组的 offset 是不会有变化的
也就是true or false设置后,flink都是主动管理。
如果是true,flink会把offset提交给kafka
用法示例:
//创建kafka数据流
val properties = new Properties() properties.setProperty("bootstrap.servers", GlobalConfigUtils.getBootstrap) properties.setProperty("zookeeper.connect", GlobalConfigUtils.getZk) properties.setProperty("group.id", GlobalConfigUtils.getConsumerGroup) properties.setProperty("enable.auto.commit" , "true")//TODO properties.setProperty("auto.commit.interval.ms" , "5000") properties.setProperty("auto.offset.reset" , "latest") properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
val kafka09 = new FlinkKafkaConsumer09[String](GlobalConfigUtils.getIntputTopic,new SimpleStringSchema(),properties
)
/** *
如果checkpoint启⽤用,当checkpoint完成之后,Flink Kafka Consumer将会提交offset保存 到checkpoint State中,
这就保证了了kafka broker中的committed offset与 checkpoint stata中的offset相⼀一致。 ⽤用户可以在Consumer中调⽤用setCommitOffsetsOnCheckpoints(boolean) ⽅方法来选择启⽤用 或者禁⽤用offset committing(默认情况下是启⽤用的)
* */
kafka09.setCommitOffsetsOnCheckpoints(true)
kafka09.setStartFromLatest()//start from the latest record
kafka09.setStartFromGroupOffsets()
//添加数据源addSource(kafka09)
val data: DataStream[String] = env.addSource(kafka09)
Reference:
[1]【源码】 flink 消费 kafka 消费组 offset 提交
[2]flink⼿手动维护kafka偏移量量
关于flink的setCommitOffsetsOnCheckpoints相关推荐
- 2021年大数据Flink(四十四):扩展阅读 End-to-End Exactly-Once
目录 扩展阅读 End-to-End Exactly-Once 流处理的数据处理语义 At-most-once-最多一次 At-least-once-至少一次 Exactly-once-精确一次 En ...
- 1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等
1.12.Flink Kafka-Connector详解 1.12.1.Kafka Consumer消费策略设置 1.12.2.Kafka Consumer的容错 1.12.3.动态加载Topic 1 ...
- flink连接kafka整合hbase,scala
解析kafka当中的json格式的数据,入hbase import java.util.Propertiesimport com.alibaba.fastjson.{JSON, JSONObject} ...
- 五万字 | Flink知识体系保姆级总结
本文目录: 一.Flink简介 二.Flink 部署及启动 三.Flink 运行架构 四.Flink 算子大全 五.流处理中的 Time 与 Window 六.Flink 状态管理 七.Flink 容 ...
- Flink DataStream读写Kafka
Flink提供了Kafka连接器,用于从或向Kafka读写数据. 本文总结Flink与Kafka集成中的问题,并对一些疑点进行总结和梳理. 问题一: 读Kafka的方式 ## 读取一个Topic Fl ...
- 大数据——Flink 知识点整理
目录 1. Flink 的特点 2. Flink 和 SparkStreaming 的对比 3. Flink 和 Blink.Alink之间的关系 4. JobManager 和 TaskManage ...
- Flink学习-DataStream-HDFSConnector(StreamingFileSink)
Flink学习-DataStream-HDFSConnector(StreamingFileSink) Flink系列文章 更多Flink系列文章请点击Flink系列文章 更多大数据文章请点击大数据好 ...
- 大数据之flink数据一致性
一.flink分析结果写入redis 1.下载flink-hadoop整合包,放入所有节点 2.KafkaToRedisWordCount package cn._51doit.flink.day08 ...
- 一文带你全方位(架构,原理及代码实现)了解Flink(3.2W字建议收藏)
注:最底部有PDF目录 一 flink简介 1.1 什么是flink Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎.F ...
最新文章
- 2018-2019 20165226 Exp9 Web安全基础
- IntelliJ IDEA 修改单行注释的格式
- opencv waitKey() 函数理解及应用
- 1.5编程基础之循环控制_41数字统计
- Centos 7编译安装 LAMP 环境
- Scrapy学习之报错ModuleNotFoundError: No module named 'win32api'
- (转)Spring4.2.5+Hibernate4.3.11+Struts2.3.24整合开发
- js排序算法详解-选择排序
- 两个简洁的页面:404和Loading
- 017年美国大学生数学建模竞赛E题优秀论文解读
- 主题抽取的核心——主题词表
- redis客户端工具redis-insight推荐
- 基于java小区物业管理系统(含源文件)
- 浅谈互联网寒冬Android进阶之路
- DHT11温湿度模块
- Ubuntu桌面卡死、You are in emergency mode
- 凭借这份《2022测试面经》候选者逆袭面试官,offer拿到手软
- 听吐的微信提示音终于能改了
- 华人泰斗黄煦涛逝世,贤伉俪深情六十载
- 统计遗传学:第三章,群体遗传