Kafka 的消费方式

  消息队列主要有两个消费方式:pull(拉)模式push(推)模式
  pull(拉)模式 consumer采用从broker中主动拉取数据,push(推)模式 broker决定消息发送速率。而每个消费者受限于所部署的服务器性能限制,如果使用push(推)模式很可能会出现有的consumer还来不及消费;因此kafka是采用的pull(拉)模式,可以根据消费者服务器的性能去调节消费者消费的消息速率。当然pull(拉)模式也有一定的缺陷,如果kafka中没有数据,消费者可能会陷入循环中,一直返回空数据。

消费者总体工作流程


  生产者向每个分区的leader发送数据,follower主动与leader进行数据的同步保证数据的可靠性,消费者消费分区的数据,一个消费者可以消费一个或者多个分区的数据,而每个消费者是独立,并不存在相互关联的; 但是每个分区的数据只能有消费者组中的一个消费者消费。

消费者组原理

消费者组

  Consumer Grep (CG): 消费者组,有多个consumer组成,形成一个消费者组的条件是所有的消费者的groupid相同

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
  • 消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
  • 如果向消费者组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置不会接收任何消息

消费者组初始化流程

  首先了解一个组件:coordinator,它是辅助实现消费者组的初始化和分区的分配。每个broker节点中都有一个对应的coordinator。coordinator的节点选择 = groupid 的hashcode值 % 50 (_consumer_offset的分区数量),而消费者组中的消费者就是通过这个公式来选择coordinator节点
  例如:groupid的hashcode值是1,1%50=1,那么_consumer_offset主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。

消费者组详细消费流程

  • 消费者组在进行工作之前,会先创建一个消费者网络连接客户端,主要用于与kafka集群进行交互
  • 消费者组发送消费请求用于抓取数据的初始化
  • 调用send方法发送请求
  • 发送完请求后通过回调方法拉取对应的结果,放入消息队列中
  • 消费者从队列中拉取指定数量的消息进行处理
  • 将消息进行反序列化,经过拦截器在进行数据的处理

消费者API

示例1:消费一个主题
  注意:在消费者API代码中必须配置消费者组ID。命令行启动消费者不填组ID会被自动填写随机的消费者组ID

public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// ack配置,默认-1(all)properties.put(ProducerConfig.ACKS_CONFIG, "1");// 重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 10);KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("first", "ACK测试" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata data, Exception e) {if (e == null) {System.out.println("分区:" + data.partition());} else {System.err.println("发送失败");System.err.println(e);}}});}// 关闭资源producer.close();
}

示例2:消费一个主题下的指定分区

public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接kafkaproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者groupid, 注意:必须的配置此参数properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅主题对应的分区List<TopicPartition> topics = new ArrayList<>();topics.add(new TopicPartition("first", 0));consumer.assign(topics);// 消费数据while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据poll.forEach(System.out::println);}
}

分区的分配以及再平衡

  kafka有四种主流的分区非配策略:Range、RoundRobin、Sticky、CooperativeSticky,可以通过 partition.assignment.strategy 参数修改分区的分配策略。默认策略是Range+CooperativeSticky。kafka可以同时使用多个分区分配策略。

Range 策略

  Range是对每个topic而言的。首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序;通过 partitions数 / consumers数来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费几个分区
  注意: 如果只是针对1个topic而言,消费者多消费一个分区影响不是很大,但是如果有N多个topic,那么针对每个topic前面的消费者都将多消费一个分区,容易产生数据倾斜

RoundRobin 策略

  RoundRobin 是针对集群所有topic而言的,RoundRobin 轮询分区策略,是把所有的partition和所有的consumer都列出来,然后按照hashcode进行排序,最后通过轮询算法来分配partition给到各个消费者

Sticky 策略

  粘性分区可以理解为分配的结果带有"粘性的",即在执行一次新的分配之前,考虑上一次分配的结果,尽可能少的调整分配的变动,可以节省大量的开支。粘性分区是Kafka从0.11x版本开始引入的分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变

示例:

public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");// 配置分区分配策略,可配置参数: RangeAssignor    RoundRobinAssignor    StickyAssignor    CooperativeStickyAssignorproperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}
}

offset位移

offset默认的维护位置

  kafka0.9版本之前,consumer默认将offset保存在zookeeper中,从0.9版本开始,consumer默认将offset保存在kafka一个内置的topic中:__consumer_offsets。原因:如果每个消费者都和zookeeper进行交互储存对应的offset,将会占用大量的网络资源,效率比较低下

  __consumer_offsets主题里面采用key-value的方式保存数据。key是 group.id + topic + 分区号,value是当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,也就是 group.id + topic + 分区号就保留最新的数据。

  默认情况下,kafka是不允许查看系统主题的数据的,可以通过 exclude.internal.topic 修改,默认true表示不能消费系统主题
示例:

# 修改kafka consumer的配置文件(vi config/consumer.properties),新增以下参数
exclude.internal.topic=false# 为了方便,创建一个新的topic
./bin/kafka-topics.sh --bootstrap-server 192.168.0.120:9092 --create --topic testOffset --partitions 2 --replication-factor 1# 启动生产者并向 testOffset 发送数据
./bin/kafka-console-producer.sh --bootstrap-server 192.168.0.120:9092 --topic testOffset# 启动消费者消费 testOffset 的数据,指定组是为了方便观察储存位置
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.120:9092 --topic testOffset --group test# 查看消费者消费注意__consumer_offsets
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.120:9092 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

自动提交offset

  为了使我们能够专注于自己的业务逻辑,kafka提供了自动提交offset的功能。自动提交offset的相关参数如下:

  • enable.auto.commit:是否开始自动提交offset,默认true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认5s

    示例:
public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());// 自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 提交时间间隔properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}
}

手动提交offset

  虽然自动提交offset简单便利,但是由于其是基于时间提交的,开发员难以把握offset提交的时机。因此kafka还提供了手动提交offset的API
  手动提交offset的方法有两种:commitSync 同步提交、commitAsync 异步提交。两者的相同点都会将本次提交的一批数据最高的偏移量提交,不同的是:同步提交阻塞当前线程, 一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败),而异步提交没有重试机制,故有可能提交失败

  • commitSync 同步提交:必须等待offset提交完毕再去消费下一批数据
  • commitAsync 异步提交:发送完提交offset请求后,就开始消费下一批数据

    示例1:同步提交
public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());// 手动提交配置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}// 同步提交offsetconsumer.commitSync();}
}

示例2:异步提交

public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());// 手动提交配置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}// 异步提交offsetconsumer.commitAsync();}
}

指定offset消费

  目前有earliest、latest、none消费方式,默认latest,通过参数 auto.offset.reset 可以进行修改

  • earliest: 自动将偏移量重置为最早的偏移量 --from-beginning
  • latest: 自动将偏移量重置为最新偏移量
  • none: 如果未找到消费者组的先前偏移量,则向消费者抛出异常
    示例:
public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");// 修改消费方式  earliest、latest、noneproperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);// 指定位置进行消费Set<TopicPartition> partitions = consumer.assignment();// 保证分区分配方案已经制定完成while (partitions.size() == 0) {consumer.poll(Duration.ofSeconds(1));// 拉取一次数据partitions = consumer.assignment();// 更新}for (TopicPartition partition : partitions) {consumer.seek(partition, 100);}while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}
}

指定时间消费

public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);// 指定位置进行消费Set<TopicPartition> partitions = consumer.assignment();// 分区集合// 保证分区分配方案已经制定完成while (partitions.size() == 0) {consumer.poll(Duration.ofSeconds(1));// 拉取一次数据partitions = consumer.assignment();// 更新}// 把时间转换为对应的offsetMap<TopicPartition, Long> map = new HashMap<>();for (TopicPartition partition : partitions) {map.put(partition, System.currentTimeMillis() - 24 * 3600 * 1000); // 消费分区一天前到现在的数据}Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(map);// 通过时间获取offset的集合for (TopicPartition partition : partitions) {OffsetAndTimestamp offsetAndTimestamp = offsets.get(partition);consumer.seek(partition, offsetAndTimestamp.offset()); // 指定消费的offset}while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}
}

漏消费和重复消费

  **重复消费:**已经消费的数据但是offset没有提交

  **漏消费:**先提交offset后消费,有可能造成数据的漏消费

  出现这两种情况我们可以通过kafka的 事务 来解决

消费者事务

  如果想完成consumer端的精准一次性消费,那么需要 kafka消费端将消费过程和提交offset过程做原子绑定。 此时我们需要将kafka的offset保存到支持事务的自定义介质如MySQL中。

数据积压,消费者如何提高吞吐量

  在kafka中,默认的日志储存时间为7天,超过了7天数据就会被删除。如果你消费一个主题的数据比如消费的了4天,但是只消费了数据量的20%,而到了7天时这个主题里面的数据还没有被消费完而将会删除时,这种情况可以加快消费速度。

  1. 如果是kafka的消费能力不足,则可以考虑增加topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数,两者缺一不可

  2. 如果是下游的数据处理不及时,提高每批次拉取的数量。 批次拉取数量过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压

源码地址:https://gitee.com/peachtec/hxz-study

kafka的Consumer 消费者(六)相关推荐

  1. 关于Kafka 的 consumer 消费者手动提交详解

    前言 在上一篇 Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输.本篇则重点介绍kafka中的 consumer 消费者的讲解. 应用场景 在上一篇kaf ...

  2. kafka之Consumer消费者基本概念

    概念 消费者 消费者(Consumer)即读取Kafka集群中某些topic消息的程序,kafka中消费者分为两种类型: 消费者组 (consumer group) 独立消费者 (standalong ...

  3. kafka 主动消费_Kafka核心API——Consumer消费者

    Consumer之自动提交 在上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了.因此,本文 ...

  4. Apache Kafka Consumer 消费者集

    1.目标 在我们的上一篇文章中,我们讨论了Kafka Producer.今天,我们将讨论Kafka Consumer.首先,我们将看到什么是Kafka Consumer和Kafka Consumer的 ...

  5. 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,

    目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...

  6. kafka生产者、消费者java示例

    1. 生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.Ke ...

  7. Jmeter之创建Kafka生产者和消费者进行性能测试

    目录 1. A Brief Overview of Apache Kafka 2. Pepper-Box Serialized Config 3. Pepper Box Kafka Sampler 4 ...

  8. Kafka设计解析(六)- Kafka高性能架构之道

    原创文章,转载请务必将下面这段话置于文章开头处. 本文转发自技术世界,原文链接 http://www.jasongj.com/kafka/high_throughput/ 摘要 上一篇文章<Ka ...

  9. 【kafka】Consumer is not subscribed to any topics

    1.概述 一个网友的问题,然后我帮他解决,后来没告诉我后面结果如何了,先转载记录一下 转载: https://blog.csdn.net/github_32521685/article/details ...

最新文章

  1. MyBatis复习笔记2:配置文件详解
  2. Target runtime Apache Tomcat 6.0 is not defined
  3. java class类文件结构
  4. window server 2012 IE10 增强的安全设置 如何关闭
  5. Linux网络编程 | IO模型 :阻塞IO、非阻塞IO、信号驱动IO、异步IO、多路复用IO
  6. P4317-花神的数论题【组合数学】
  7. centos 生产 ssh-key
  8. java .z文件_java 压缩文件
  9. git commit 规范校验配置和版本发布配置
  10. 时间戳和字符串互相转换
  11. Yum介绍与常见用法
  12. Spring与Struts2的整合
  13. Android WorkManager 实战讲解
  14. 2021年上半年数据库系统工程师下午真题及答案解析
  15. Android指南针应用编写
  16. Innovator Admin 一个aras的管理器,又一个package安装方法
  17. 算法竞赛进阶指南——二分:防线
  18. SQLyog 报错2058 :连接 mysql 8.0.11 解决方法
  19. html横向滚动效果,html 中 鼠标滑轮实现横向滚动
  20. 无需下载 网页版 Matlab

热门文章

  1. 【1】视频稳像——调研
  2. 限制波尔兹曼机(Restricted Boltzmann Machines)
  3. UIautomator2 移动端自动化
  4. android 关于启动页广告的总结。
  5. 盘点那些高考语录,总有一句惊呆你!
  6. 计算机专业教育与创新创业,计算机专业创新创业教育模式的发展
  7. 【备战2020】高考数学全套知识点(二)
  8. Java学习笔记——1
  9. Windows下ppt放映获取不到窗口的问题
  10. ParameterizedType类的使用