flink 写kafka_flink消费kafka的offset与checkpoint
生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis。使用的flink版本为1.11.1。
为了防止写入hive的文件数量过多,我设置了checkpoint为30分钟。
env.enableCheckpointing(1000 * 60 * 30); // 1000 * 60 * 30 => 30 minutes
达到的效果就是每30分钟生成一个文件,如下:
hive> dfs -ls /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/ ;Found 10 items-rw-r--r-- 3 hdfs hive 0 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/_SUCCESS-rw-r--r-- 3 hdfs hive 248895 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10911-rw-r--r-- 3 hdfs hive 306900 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10912-rw-r--r-- 3 hdfs hive 208227 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10913-rw-r--r-- 3 hdfs hive 263586 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10911-rw-r--r-- 3 hdfs hive 307723 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10912-rw-r--r-- 3 hdfs hive 196777 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10913-rw-r--r-- 3 hdfs hive 266984 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10911-rw-r--r-- 3 hdfs hive 338992 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10912-rw-r--r-- 3 hdfs hive 216655 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10913hive>
但是,同时也观察到归属于这个作业的kafka消费组积压数量,每分钟消费数量,明显具有周期性消费峰值。
比如,对于每30分钟时间间隔度的一个观察,前面25分钟的“每分钟消费数量”都是为0,然后,后面5分钟的“每分钟消费数量”为300k。同理,“消费组积压数量”也出现同样情况,积压数量一直递增,但是到了30分钟的间隔,就下降到数值0。如图。
消费组每分钟消费数量
消费组积压数量
但其实,通过对hbase,hive,redis的观察,数据是实时写入的,并不存在前面25分钟没有消费数据的情况。
查阅资料得知,flink会自己维护一份kafka的offset,然后checkpoint时间点到了,再把offset更新回kafka。
为了验证这个观点,“flink在checkpoint的时候,才把消费kafka的offset更新回kafka”,同时,观察,savepoint机制是否会重复消费kafka,我尝试写一个程序,逻辑很简单,就是从topic "test"读取数据,然后写入topic "test2"。特别说明,这个作业的checkpoint是1分钟。
package com.econ.powercloud.jobsTest;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import javax.annotation.Nullable;import java.util.Properties;public class TestKafkaOffsetCheckpointJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000 * 60); ParameterTool parameterTool = ParameterTool.fromArgs(args); String bootstrapServers = parameterTool.get("bootstrap.servers") == null ? "localhost:9092" : parameterTool.get("bootstrap.servers"); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", bootstrapServers); properties.setProperty("group.id", "prod-econ-flink-TestKafkaOffsetCheckpointJob-local"); properties.setProperty("transaction.timeout.ms", String.valueOf(1000 * 60 * 5)); String topic = "test"; FlinkKafkaConsumer stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); DataStreamSource stringDataStreamSource = env.addSource(stringFlinkKafkaConsumer); String producerTopic = "test2"; FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(producerTopic, new KafkaSerializationSchema() { @Override public ProducerRecord serialize(String element, @Nullable Long timestamp) { return new ProducerRecord<>(producerTopic, element.getBytes()); } }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); stringDataStreamSource.addSink(kafkaProducer); env.execute("TestKafkaOffsetCheckpointJob"); }}
提交作业:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 5fdd14f7fd3c93287635c9d61180d8a6[econ@dev-hadoop-node-c ~]$
使用"kafka-console-producer.sh"往topic "test"生成消息"a1":
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:>a1>
使用"kafka-console-consumer.sh"消费topic "test2"的消息:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1
证明作业逻辑本身没有问题,实现' 从topic "test"读取数据,然后写入topic "test2" '。
使用"kafka-consumer-groups.sh"观察消费组"prod-econ-flink-TestKafkaOffsetCheckpointJob-local"的积压数量,重点观察指标"LAG",可以看到LAG为1 :
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtest 1 3 3 0 - - -test 0 3 3 0 - - -test 2 5 6 1 - - -2020年10月18日 星期日 20时09分45秒 CSTRdeMacBook-Pro:kafka r$
证明flink消费了kafka数据后,不会更新offset到kafka。
停止作业:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 5fdd14f7fd3c93287635c9d61180d8a6Suspending job "5fdd14f7fd3c93287635c9d61180d8a6" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-5fdd14-53dfd9f8eccd[econ@dev-hadoop-node-c ~]$
再次启动作业,但是,不使用上面生成的savepoint:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 130568a2eeec96296237ed3e1f280f83[econ@dev-hadoop-node-c ~]$
观察topic "test2",发现,同样的数据"a1"被生产进入:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1
证明:flink在没有使用savepoint的时候,消费kafka的offset还是从kafka自身获取。
再仔细观察topic "test"的“消费组积压数量”,注意在"20时10分05秒"还观察到积压数值1,但是在"20时10分08秒"就发现积压数值都是0.
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtest 1 3 3 0 - - -test 0 3 3 0 - - -test 2 5 6 1 - - -2020年10月18日 星期日 20时10分05秒 CSTRdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtest 1 3 3 0 - - -test 0 3 3 0 - - -test 2 6 6 0 - - -2020年10月18日 星期日 20时10分08秒 CSTRdeMacBook-Pro:kafka r$
这是因为,在"20:10:06"完成了一次checkpoint,把offset更新回kafka。
Flink Checkpoint History
下面接着测试flink使用savepoint的情况下,是否会重复消费kafka数据。
使用"kafka-console-producer.sh"往topic "test"生成消息"a2":
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092>a1>a2>
使用"kafka-console-consumer.sh"消费topic "test2"的消息:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2
停止作业:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 bb8b4ba7ddaad869c6469fab5e81d179Suspending job "bb8b4ba7ddaad869c6469fab5e81d179" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60[econ@dev-hadoop-node-c ~]$
观察topic "test"的“消费组积压数量”,发现LAG还是1:
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtest 1 3 4 1 - - -test 0 3 3 0 - - -test 2 6 6 0 - - -2020年10月18日 星期日 20时28分39秒 CSTRdeMacBook-Pro:kafka r$
flink使用savepoint启动作业,注意参数"-s":
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d -s 'hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60' ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID d6cb6e1a6f9c0816ac4b61a1df38ddeb[econ@dev-hadoop-node-c ~]$
观察"kafka-console-consumer.sh"消费topic "test2"的情况,没有新的消息被打印:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2
再观察“消费组积压数量”,发现LAG值已经全部是0。
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtest 1 4 4 0 - - -test 0 3 3 0 - - -test 2 6 6 0 - - -2020年10月18日 星期日 20时31分43秒 CSTRdeMacBook-Pro:kafka r$
证明:flink使用savepoint启动作业,不会重复消费kafka数据,也会正确更新kafka的offset。
重申,以上试验证明:
- flink消费了kafka数据后,不会更新offset到kafka,直到checkpoint完成。
- flink在没有使用savepoint重启作业的时候,消费kafka的offset还是从kafka自身获取,存在重复消费数据的情况。
- flink使用savepoint重启作业,不会重复消费kafka数据,也会正确更新kafka的offset。
flink 写kafka_flink消费kafka的offset与checkpoint相关推荐
- demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记
前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...
- Flink Connectors之消费Kafka数据相关参数以及API说明
1-参数设置 以下参数都必须/建议设置上 1.订阅的主题 2.反序列化规则 3.消费者属性-集群地址 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理) 5.消费者属性-of ...
- Flink系列:消费kafka时获取每条消息对应的topic
目录 效果 使用方法 重写KafkaDeserializationSchema 最终效果 topic ---->>> csdn-nio4444 使用方法 public static ...
- flink 写kafka_网易云音乐基于 Flink + Kafka 的实时数仓建设实践
简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景 Flink + Kafka 平台化设计 Kafka 在 ...
- kafka 的pom文件_Flink 消费 Kafka 数据
kafka核心概念: Kafka 是一个消息队列,生产者向消息队列中写入数据,消费者从队列中获取数据并进行消费.可以认为一个 Topic 就是一个队列,每个 Topic 又会被分成多个 Partiti ...
- 【Kafka】Flink 消费 kafka 部分 分区 一直不提交 offset
文章目录 1.概述 1.1 查看消费组 1.2 检测topic 1.3 内置topic 1.4 flink数据监控 1.5 提交offset指标 M.扩展 1.概述 一个环境,突然发现,Flink消费 ...
- flink 消费 kafka offset 自动提交
flink 消费kafka 程序重启后,从原先的自动提交的点继续消费,earliest 不用再从开始消费 如果开启了checkpoint 以 checkpoint为准 ,enable.auto.com ...
- flink消费kafka从指定时间消费offset的日志
有时生产上会按指定时间消费kafka的数据,具体日志如下: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] ...
- 记录一次Flink消费Kafka空转,无法拉取消息,checkpoint也能成功,但是位置点却不提交的异常处理
起因 新起了一个业务,用flink消费实时集市kafka消息,在测试环境跑的好好的,验证也过了,然后上线. 刚上线的当天也好好的,晚上突然在某个点,就拉取不到消息了,上游一直有消息下来,但flink就 ...
最新文章
- 从qspi启动linux时间,Zynq-Linux移植学习笔记(二十三)——QSPI速度配置
- Photoshop抠图、污点处理等常用功能及快捷键
- 技术图文:Matlab向量 VS. Python列表
- OpenERP的优化---使用Nginx反向代理
- .NET中读取csv文件内容
- java 线程的基本概念_Java多线程——多线程的基本概念和使用
- Gossip算法原理
- mysql中使用安全等于 <=>
- 【实战 Ids4】小技巧篇:自定义登录页操作
- 从抛硬币试验看随机游走定义的基本概念错误
- Spring Batch:多种格式输出编写器
- zemax微透镜阵列示例_阵列反向! Ruby中的示例方法
- dedecms联动筛选_DEDECMS分类信息按联动类别筛选的实现方法
- isless()函数与C ++中的示例
- Mysql学习总结(84)—— Mysql的主从复制延迟问题总结
- [转]Spring注解-@Configuration注解、@Bean注解以及配置自动扫描、bean作用域
- 【报告分享】中国年轻用户电商消费洞察报告:寻找电商换道增长机遇.pdf(附下载链接)...
- Win10+VS2017+Ceres-Solver-1.13.0配置
- 全网首发:TeaVM编译时容易出错的几种代码
- dig命令查询结果解析