提交 offset 到 kafka,一般都是作为外部进度的监控,想实时知道作业消费的位置和 lag 情况

如果启用了 checkpoint,但是禁用 CommitOffsetsOnCheckpoints, kafka 消费组的 offset 不会提交到 kafka,也就是说: 消费组的 offset 是不会有变化的

也就是true or false设置后,flink都是主动管理。

如果是true,flink会把offset提交给kafka

用法示例:

//创建kafka数据流
val properties = new Properties() properties.setProperty("bootstrap.servers", GlobalConfigUtils.getBootstrap) properties.setProperty("zookeeper.connect", GlobalConfigUtils.getZk) properties.setProperty("group.id", GlobalConfigUtils.getConsumerGroup) properties.setProperty("enable.auto.commit" , "true")//TODO properties.setProperty("auto.commit.interval.ms" , "5000") properties.setProperty("auto.offset.reset" , "latest") properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
val kafka09 = new FlinkKafkaConsumer09[String](GlobalConfigUtils.getIntputTopic,new SimpleStringSchema(),properties
)
/** *
如果checkpoint启⽤用,当checkpoint完成之后,Flink Kafka Consumer将会提交offset保存 到checkpoint State中,
这就保证了了kafka broker中的committed offset与 checkpoint stata中的offset相⼀一致。 ⽤用户可以在Consumer中调⽤用setCommitOffsetsOnCheckpoints(boolean) ⽅方法来选择启⽤用 或者禁⽤用offset committing(默认情况下是启⽤用的)
* */
kafka09.setCommitOffsetsOnCheckpoints(true)
kafka09.setStartFromLatest()//start from the latest record
kafka09.setStartFromGroupOffsets()
//添加数据源addSource(kafka09)
val data: DataStream[String] = env.addSource(kafka09)

Reference:

[1]【源码】 flink 消费 kafka 消费组 offset 提交
[2]flink⼿手动维护kafka偏移量量

关于flink的setCommitOffsetsOnCheckpoints相关推荐

  1. 2021年大数据Flink(四十四):​​​​​​扩展阅读 End-to-End Exactly-Once

    目录 扩展阅读 End-to-End Exactly-Once 流处理的数据处理语义 At-most-once-最多一次 At-least-once-至少一次 Exactly-once-精确一次 En ...

  2. 1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等

    1.12.Flink Kafka-Connector详解 1.12.1.Kafka Consumer消费策略设置 1.12.2.Kafka Consumer的容错 1.12.3.动态加载Topic 1 ...

  3. flink连接kafka整合hbase,scala

    解析kafka当中的json格式的数据,入hbase import java.util.Propertiesimport com.alibaba.fastjson.{JSON, JSONObject} ...

  4. 五万字 | Flink知识体系保姆级总结

    本文目录: 一.Flink简介 二.Flink 部署及启动 三.Flink 运行架构 四.Flink 算子大全 五.流处理中的 Time 与 Window 六.Flink 状态管理 七.Flink 容 ...

  5. Flink DataStream读写Kafka

    Flink提供了Kafka连接器,用于从或向Kafka读写数据. 本文总结Flink与Kafka集成中的问题,并对一些疑点进行总结和梳理. 问题一: 读Kafka的方式 ## 读取一个Topic Fl ...

  6. 大数据——Flink 知识点整理

    目录 1. Flink 的特点 2. Flink 和 SparkStreaming 的对比 3. Flink 和 Blink.Alink之间的关系 4. JobManager 和 TaskManage ...

  7. Flink学习-DataStream-HDFSConnector(StreamingFileSink)

    Flink学习-DataStream-HDFSConnector(StreamingFileSink) Flink系列文章 更多Flink系列文章请点击Flink系列文章 更多大数据文章请点击大数据好 ...

  8. 大数据之flink数据一致性

    一.flink分析结果写入redis 1.下载flink-hadoop整合包,放入所有节点 2.KafkaToRedisWordCount package cn._51doit.flink.day08 ...

  9. 一文带你全方位(架构,原理及代码实现)了解Flink(3.2W字建议收藏)

    注:最底部有PDF目录 一 flink简介 1.1 什么是flink Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎.F ...

最新文章

  1. 2018-2019 20165226 Exp9 Web安全基础
  2. IntelliJ IDEA 修改单行注释的格式
  3. opencv waitKey() 函数理解及应用
  4. 1.5编程基础之循环控制_41数字统计
  5. Centos 7编译安装 LAMP 环境
  6. Scrapy学习之报错ModuleNotFoundError: No module named 'win32api'
  7. (转)Spring4.2.5+Hibernate4.3.11+Struts2.3.24整合开发
  8. js排序算法详解-选择排序
  9. 两个简洁的页面:404和Loading
  10. 017年美国大学生数学建模竞赛E题优秀论文解读
  11. 主题抽取的核心——主题词表
  12. redis客户端工具redis-insight推荐
  13. 基于java小区物业管理系统(含源文件)
  14. 浅谈互联网寒冬Android进阶之路
  15. DHT11温湿度模块
  16. Ubuntu桌面卡死、You are in emergency mode
  17. 凭借这份《2022测试面经》候选者逆袭面试官,offer拿到手软
  18. 听吐的微信提示音终于能改了
  19. 华人泰斗黄煦涛逝世,贤伉俪深情六十载
  20. 统计遗传学:第三章,群体遗传

热门文章

  1. 12306春节高速抢票
  2. servlet实现文件上传,预览,下载和删除
  3. Lesson 03:运算符与流程控制
  4. textmetric结构
  5. 怎样理解阻塞非阻塞与同步异步的区别?
  6. net start mysql 发生系统错误2 系统找不到指定的文件
  7. angularjsl路由_AngularJS实现路由实例
  8. nodeJS 事件绑定
  9. dataSource 转 jdbctemplate
  10. java解析静态AIS原始数据