生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis。使用的flink版本为1.11.1。

为了防止写入hive的文件数量过多,我设置了checkpoint为30分钟。

env.enableCheckpointing(1000 * 60 * 30); // 1000 * 60 * 30 => 30 minutes

达到的效果就是每30分钟生成一个文件,如下:

hive> dfs -ls /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/ ;Found 10 items-rw-r--r--   3 hdfs hive          0 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/_SUCCESS-rw-r--r--   3 hdfs hive     248895 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10911-rw-r--r--   3 hdfs hive     306900 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10912-rw-r--r--   3 hdfs hive     208227 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10913-rw-r--r--   3 hdfs hive     263586 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10911-rw-r--r--   3 hdfs hive     307723 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10912-rw-r--r--   3 hdfs hive     196777 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10913-rw-r--r--   3 hdfs hive     266984 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10911-rw-r--r--   3 hdfs hive     338992 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10912-rw-r--r--   3 hdfs hive     216655 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10913hive> 

但是,同时也观察到归属于这个作业的kafka消费组积压数量,每分钟消费数量,明显具有周期性消费峰值。

比如,对于每30分钟时间间隔度的一个观察,前面25分钟的“每分钟消费数量”都是为0,然后,后面5分钟的“每分钟消费数量”为300k。同理,“消费组积压数量”也出现同样情况,积压数量一直递增,但是到了30分钟的间隔,就下降到数值0。如图。

消费组每分钟消费数量

消费组积压数量

但其实,通过对hbase,hive,redis的观察,数据是实时写入的,并不存在前面25分钟没有消费数据的情况。

查阅资料得知,flink会自己维护一份kafka的offset,然后checkpoint时间点到了,再把offset更新回kafka。

为了验证这个观点,“flink在checkpoint的时候,才把消费kafka的offset更新回kafka”,同时,观察,savepoint机制是否会重复消费kafka,我尝试写一个程序,逻辑很简单,就是从topic "test"读取数据,然后写入topic "test2"。特别说明,这个作业的checkpoint是1分钟。

package com.econ.powercloud.jobsTest;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import javax.annotation.Nullable;import java.util.Properties;public class TestKafkaOffsetCheckpointJob {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(1000 * 60);        ParameterTool parameterTool = ParameterTool.fromArgs(args);        String bootstrapServers = parameterTool.get("bootstrap.servers") == null ? "localhost:9092" : parameterTool.get("bootstrap.servers");        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", bootstrapServers);        properties.setProperty("group.id", "prod-econ-flink-TestKafkaOffsetCheckpointJob-local");        properties.setProperty("transaction.timeout.ms", String.valueOf(1000 * 60 * 5));        String topic = "test";        FlinkKafkaConsumer stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);        DataStreamSource stringDataStreamSource = env.addSource(stringFlinkKafkaConsumer);        String producerTopic = "test2";        FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(producerTopic, new KafkaSerializationSchema() {            @Override            public ProducerRecord serialize(String element, @Nullable Long timestamp) {                return new ProducerRecord<>(producerTopic, element.getBytes());            }        }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);        stringDataStreamSource.addSink(kafkaProducer);        env.execute("TestKafkaOffsetCheckpointJob");    }}

提交作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 5fdd14f7fd3c93287635c9d61180d8a6[econ@dev-hadoop-node-c ~]$ 

使用"kafka-console-producer.sh"往topic "test"生成消息"a1":

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:>a1>

使用"kafka-console-consumer.sh"消费topic "test2"的消息:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1

证明作业逻辑本身没有问题,实现' 从topic "test"读取数据,然后写入topic "test2" '。

使用"kafka-consumer-groups.sh"观察消费组"prod-econ-flink-TestKafkaOffsetCheckpointJob-local"的积压数量,重点观察指标"LAG",可以看到LAG为1 :

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               3               0               -               -               -test            0          3               3               0               -               -               -test            2          5               6               1               -               -               -2020年10月18日 星期日 20时09分45秒 CSTRdeMacBook-Pro:kafka r$ 

证明flink消费了kafka数据后,不会更新offset到kafka。

停止作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 5fdd14f7fd3c93287635c9d61180d8a6Suspending job "5fdd14f7fd3c93287635c9d61180d8a6" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-5fdd14-53dfd9f8eccd[econ@dev-hadoop-node-c ~]$ 

再次启动作业,但是,不使用上面生成的savepoint:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 130568a2eeec96296237ed3e1f280f83[econ@dev-hadoop-node-c ~]$ 

观察topic "test2",发现,同样的数据"a1"被生产进入:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1

证明:flink在没有使用savepoint的时候,消费kafka的offset还是从kafka自身获取。

再仔细观察topic "test"的“消费组积压数量”,注意在"20时10分05秒"还观察到积压数值1,但是在"20时10分08秒"就发现积压数值都是0.

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               3               0               -               -               -test            0          3               3               0               -               -               -test            2          5               6               1               -               -               -2020年10月18日 星期日 20时10分05秒 CSTRdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               3               0               -               -               -test            0          3               3               0               -               -               -test            2          6               6               0               -               -               -2020年10月18日 星期日 20时10分08秒 CSTRdeMacBook-Pro:kafka r$ 

这是因为,在"20:10:06"完成了一次checkpoint,把offset更新回kafka。

Flink Checkpoint History

下面接着测试flink使用savepoint的情况下,是否会重复消费kafka数据。

使用"kafka-console-producer.sh"往topic "test"生成消息"a2":

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092>a1>a2>

使用"kafka-console-consumer.sh"消费topic "test2"的消息:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2

停止作业:

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 bb8b4ba7ddaad869c6469fab5e81d179Suspending job "bb8b4ba7ddaad869c6469fab5e81d179" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60[econ@dev-hadoop-node-c ~]$ 

观察topic "test"的“消费组积压数量”,发现LAG还是1:

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          3               4               1               -               -               -test            0          3               3               0               -               -               -test            2          6               6               0               -               -               -2020年10月18日 星期日 20时28分39秒 CSTRdeMacBook-Pro:kafka r$ 

flink使用savepoint启动作业,注意参数"-s":

[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d -s 'hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60' ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID d6cb6e1a6f9c0816ac4b61a1df38ddeb[econ@dev-hadoop-node-c ~]$ 

观察"kafka-console-consumer.sh"消费topic "test2"的情况,没有新的消息被打印:

RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2

再观察“消费组积压数量”,发现LAG值已经全部是0。

RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtest            1          4               4               0               -               -               -test            0          3               3               0               -               -               -test            2          6               6               0               -               -               -2020年10月18日 星期日 20时31分43秒 CSTRdeMacBook-Pro:kafka r$ 

证明:flink使用savepoint启动作业,不会重复消费kafka数据,也会正确更新kafka的offset。

重申,以上试验证明:

  1. flink消费了kafka数据后,不会更新offset到kafka,直到checkpoint完成。
  2. flink在没有使用savepoint重启作业的时候,消费kafka的offset还是从kafka自身获取,存在重复消费数据的情况。
  3. flink使用savepoint重启作业,不会重复消费kafka数据,也会正确更新kafka的offset。

flink 写kafka_flink消费kafka的offset与checkpoint相关推荐

  1. demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记

    前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...

  2. Flink Connectors之消费Kafka数据相关参数以及API说明

    1-参数设置 以下参数都必须/建议设置上 1.订阅的主题 2.反序列化规则 3.消费者属性-集群地址 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理) 5.消费者属性-of ...

  3. Flink系列:消费kafka时获取每条消息对应的topic

    目录 效果 使用方法 重写KafkaDeserializationSchema 最终效果 topic ---->>> csdn-nio4444 使用方法 public static ...

  4. flink 写kafka_网易云音乐基于 Flink + Kafka 的实时数仓建设实践

    简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景 Flink + Kafka 平台化设计 Kafka 在 ...

  5. kafka 的pom文件_Flink 消费 Kafka 数据

    kafka核心概念: Kafka 是一个消息队列,生产者向消息队列中写入数据,消费者从队列中获取数据并进行消费.可以认为一个 Topic 就是一个队列,每个 Topic 又会被分成多个 Partiti ...

  6. 【Kafka】Flink 消费 kafka 部分 分区 一直不提交 offset

    文章目录 1.概述 1.1 查看消费组 1.2 检测topic 1.3 内置topic 1.4 flink数据监控 1.5 提交offset指标 M.扩展 1.概述 一个环境,突然发现,Flink消费 ...

  7. flink 消费 kafka offset 自动提交

    flink 消费kafka 程序重启后,从原先的自动提交的点继续消费,earliest 不用再从开始消费 如果开启了checkpoint 以 checkpoint为准 ,enable.auto.com ...

  8. flink消费kafka从指定时间消费offset的日志

    有时生产上会按指定时间消费kafka的数据,具体日志如下: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] ...

  9. 记录一次Flink消费Kafka空转,无法拉取消息,checkpoint也能成功,但是位置点却不提交的异常处理

    起因 新起了一个业务,用flink消费实时集市kafka消息,在测试环境跑的好好的,验证也过了,然后上线. 刚上线的当天也好好的,晚上突然在某个点,就拉取不到消息了,上游一直有消息下来,但flink就 ...

最新文章

  1. 从qspi启动linux时间,Zynq-Linux移植学习笔记(二十三)——QSPI速度配置
  2. Photoshop抠图、污点处理等常用功能及快捷键
  3. 技术图文:Matlab向量 VS. Python列表
  4. OpenERP的优化---使用Nginx反向代理
  5. .NET中读取csv文件内容
  6. java 线程的基本概念_Java多线程——多线程的基本概念和使用
  7. Gossip算法原理
  8. mysql中使用安全等于 <=>
  9. 【实战 Ids4】小技巧篇:自定义登录页操作
  10. 从抛硬币试验看随机游走定义的基本概念错误
  11. Spring Batch:多种格式输出编写器
  12. zemax微透镜阵列示例_阵列反向! Ruby中的示例方法
  13. dedecms联动筛选_DEDECMS分类信息按联动类别筛选的实现方法
  14. isless()函数与C ++中的示例
  15. Mysql学习总结(84)—— Mysql的主从复制延迟问题总结
  16. [转]Spring注解-@Configuration注解、@Bean注解以及配置自动扫描、bean作用域
  17. 【报告分享】中国年轻用户电商消费洞察报告:寻找电商换道增长机遇.pdf(附下载链接)...
  18. Win10+VS2017+Ceres-Solver-1.13.0配置
  19. 全网首发:TeaVM编译时容易出错的几种代码
  20. dig命令查询结果解析

热门文章

  1. struts2服务端与android交互
  2. MySQL流浪记(三)—— Linux安装MySQL数据库5.7.30(亲测有效3分钟即可)
  3. 浅谈 Windows API 编程
  4. CompletableFuture详解~thenAccept
  5. Spring Data JPA 从入门到精通~EntityManager介绍
  6. oracle查看锁表进程,杀掉锁表进程
  7. arm shellcode 编写详析2
  8. php分区表,【MYSQL】分区表
  9. JAVA入门级教学之(深入throws的异常抛出机制)
  10. xlwings删除数据_xlwings如何删除行和列?