一.基本概念

1.消费者和消费组

Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个T1主题,该主题有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息,如下所示:

Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:

二.消息接收

1.必要参数设置

/*** Kafka 消费者分析*/
@Slf4j
public class KafkaConsumerAnalysis {public static final String brokerList = "192.168.37.129:9092";public static final String topic = "test";public static final String groupId = "group.yfy";public static final AtomicBoolean isRunning = new AtomicBoolean(true);
​public static Properties initConfig() {Properties props = new Properties();// 与KafkaProducer中设置保持一致props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 必填参数,该参数和KafkaProducer中的相同,制定连接Kafka集群所需的broker地址清单,可以设置一个或者多个props.put("bootstrap.servers", brokerList);// 消费者隶属于的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称props.put("group.id", groupId);// 指定KafkaConsumer对应的客户端ID,默认为空,如果不设置KafkaConsumer会自动生成一个非空字符串props.put("client.id", "consumer.client.id.demo");
​return props;}public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));
​// 正则订阅主题
//        consumer.subscribe(Pattern.compile("test.*"));
​// 指定订阅的分区
//        consumer.assign(Arrays.asList(new TopicPartition("test", 0)));
​try {while (isRunning.get()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());System.out.println("key = " + record.key() + ", value = " + record.value());//do something to process record.}}} catch (Exception e) {log.error("occur exception ", e);} finally {consumer.close();}}
}

1)这是 个无限循环。消费者实际上是一个长期运行的应用程序,它通过持续轮询kafka请求数据。

2)消费者必须持续对 Kafka进行轮询,否则会被认为己经死亡,分区会被移交给群组里的其他消费者。传给poll() 方法的参数是一个超时时间,用于控制poll()方法的的阻塞时间。如果该参数被设为 0,poll ()会立即返回 ,否则它会在指定的毫秒数内一直等待 broker 返回数据。

3)poll ()方法能返回一个记录列表。每条记录都包含了记录所属主题的信息、记录分区的信息、记录在分区里的偏移量以及记录的键值对。我们一般会遍历这个列表 ,逐条处理这些记录。 poll()方法有一个超时参数,它指定了方法在多久之后可以返回,不管有没有可用的数据都要返回。 超时时间的设置取决于应用程序对响应速度的要求,比如要在多长时间内把控制权归还给执行轮询的线程。4)在退出应用程序之前 close()方怯关闭消费者。网络连接和 socket 也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳井认定它已死亡,因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息

2.订阅主题和分区

创建完消费者后我们便可以订阅主题了,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接Kafka与其他系统时非常有用。比如订阅所有的测试主题:

consumer.subscribe(Pattern.compile("test.*"));

指定订阅的分区

consumer.assign(Arrays.asList(new TopicPartition("test", 0)));

3.反序列化

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

4.位移提交

对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。

当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。

自动提交

这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。

需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

同步提交

public class CheckOffsetAndCommit {public static final String brokerList = "192.168.37.129:9092";public static final String topic = "test";public static final String groupId = "group.yfy";private static AtomicBoolean running = new AtomicBoolean(true);
​public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
​// 手动提交开启props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}
​public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
​
​TopicPartition tp = new TopicPartition(topic, 0);consumer.assign(Arrays.asList(tp));long lastConsumedOffset = -1;while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);if (records.isEmpty()) {break;}List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();consumer.commitSync();//同步提交消费位移}System.out.println("comsumed offset is " + lastConsumedOffset);OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);System.out.println("commited offset is " + offsetAndMetadata.offset());long posititon = consumer.position(tp);System.out.println("the offset of the next record is " + posititon);}
}

异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。

public class OffsetCommitAsyncCallback extends ConsumerClientConfig {
​private static AtomicBoolean running = new AtomicBoolean(true);
​public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));
​try {while (running.get()) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do some logical processing.}// 异步回调consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception == null) {System.out.println(offsets);} else {log.error("fail to commit offsets {}", offsets, exception);}}});}} finally {consumer.close();}
​try {while (running.get()) {consumer.commitAsync();}} finally {try {consumer.commitSync();} finally {consumer.close();}}}
}

但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,我们可以使用同步和异步提交结合的方式提交。在消费者关闭前commitSync()提交一下。

        try {while (running.get()) {ConsumerRecords<String, String> records = consumer.poll(1000);consumer.commitAsync();}} finally {try {consumer.commitSync();} finally {consumer.close();}}
  • 如果一切正常,我们使用commitAsync()方法来提交。这样速度更快,而且即使这次提交失败,下一次提交很可能会成功。

  • 如果直接关闭消费者,就没有所谓的“下一次提交”了。使用 commitSync()方法也会一直重试,直到提交成功或发生无法恢复的错误。

5.指定位移消费

到目前为止,我们知道消息的拉取是根据poll()方法中的逻辑来处理的,但是这个方法对于普通开发人员来说就是个黑盒处理,无法精确掌握其消费的起始位置。

seek()方法正好提供了这个功能,让我们得以追踪以前的消费或者回溯消费

public class SeekDemo extends ConsumerClientConfig {
​
​public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));// timeout参数设置多少合适?太短会使分区分配失败,太长又有可能造成一些不必要的等待consumer.poll(Duration.ofMillis(2000));// 获取消费者所分配到的分区Set<TopicPartition> assignment = consumer.assignment();System.out.println(assignment);for (TopicPartition tp : assignment) {// 参数partition表示分区,offset表示指定从分区的哪个位置开始消费consumer.seek(tp, 10);}
//        consumer.seek(new TopicPartition(topic,0),10);while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//consume the record.for (ConsumerRecord<String, String> record : records) {System.out.println(record.offset() + ":" + record.value());}}}
​
}

增加判断是否分配到了分区

public class SeekDemoAssignment extends ConsumerClientConfig {
​
​public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));long start = System.currentTimeMillis();Set<TopicPartition> assignment = new HashSet<>();while (assignment.size() == 0) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(assignment);for (TopicPartition tp : assignment) {consumer.seek(tp, 10);}while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//consume the record.for (ConsumerRecord<String, String> record : records) {System.out.println(record.offset() + ":" + record.value());}}}
}
​

指定从分区末尾开始消费

        // 指定从分区末尾开始消费Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);for (TopicPartition tp : assignment) {consumer.seek(tp, offsets.get(tp));}

演示位移越界操作,修改代码如下:

        // 指定从分区末尾开始消费Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);for (TopicPartition tp : assignment) {consumer.seek(tp, offsets.get(tp)+1);}

6.消费者拦截器

消费者拦截器主要是在消费到消息或者在提交消费位移时进行的一些定制化的操作。

public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {private static final long EXPIRE_INTERVAL = 10 * 1000;
​@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {System.out.println("before:" + records);long now = System.currentTimeMillis();Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords= new HashMap<>();for (TopicPartition tp : records.partitions()) {List<ConsumerRecord<String, String>> tpRecords = records.records(tp);List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();for (ConsumerRecord<String, String> record : tpRecords) {if (now - record.timestamp() < EXPIRE_INTERVAL) {newTpRecords.add(record);}}if (!newTpRecords.isEmpty()) {newRecords.put(tp, newTpRecords);}}return new ConsumerRecords<>(newRecords);}
​@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {offsets.forEach((tp, offset) ->System.out.println(tp + ":" + offset.offset()));}
​@Overridepublic void close() {}
​@Overridepublic void configure(Map<String, ?> configs) {}
}

实现自定义拦截器之后,需要在KafkaConsumer中配置指定这个拦截器,如下

props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptorTTL.class.getName());

发送端同时发送两条消息,其中一条修改timestamp的值来使其变得超时,如下:

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 10 * 1000, "Kafka-demo-001", "hello, Kafka!->超时");

启动消费端运行如下,只收到了未超时的消息:

before:org.apache.kafka.clients.consumer.ConsumerRecords@7adda9cc
topic = test, partition = 1, offset = 18
key = Kafka-demo-001, value = hello, Kafka!

三.其它消费者参数

fetch.min.bytes

这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力。

2.fetch.max.wait.ms

上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms。

3.max.partition.fetch.bytes

这个参数指定了每个分区返回的最多字节数,默认为1M。也就是说,KafkaConsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M。如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。

4.max.poll.records

这个参数控制一个poll()调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量。

Kafka消费者详解相关推荐

  1. kafka 消费者详解

    前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用 kafka consumer 常用的 kafka consumer 配置 消费者 和 消费者组 什么是消费者? ...

  2. Kafka 原理详解

    Kafka 原理详解 1 kakfa基础概念说明 Broker:消息服务器,就是我们部署的一个kafka服务 Partition:消息的水平分区,一个Topic可以有多个分区,这样实现了消息的无限量存 ...

  3. Spring Cloud Eureka 入门 (三)服务消费者详解

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 原创出处:www.bysocket.com 泥瓦匠BYSocket 希望转载,保留摘要,谢谢! "真正的进步 ...

  4. Kafka生产者与消费者详解

    什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...

  5. Kafka 生产者及消费者详解

    一.Kafka 生产者 1.1 分区策略 1)分区的原因 (1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群 ...

  6. python使用kafka原理详解_Python操作Kafka原理及使用详解

    Python操作Kafka原理及使用详解 一.什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理 ...

  7. kafka实战教程(python操作kafka),kafka配置文件详解

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...

  8. java多线程之——生产者和消费者(详解及提高)

    目录 前情引入 简单介绍 预备知识 代码及详解 简单代码 基本解释 生产者线程类 消费者线程类 测试类 执行流程 控制台输出 自我提高 问题一 问题二 升级代码 总结 前情引入 做一些简单的认识和告知 ...

  9. Kafka配置详解-Consumer配置

    转载自:http://orchome.com/535 3.4 kafka消费者配置 在0.9.0.0中,我们引入了新的Java消费者来替代早期基于Scala的简单和高级消费者.新老客户端的配置如下. ...

最新文章

  1. python六十一: __module__属性
  2. 在听伏尔加纤夫曲 超酷
  3. JAVA_SE基础——24.面向对象的内存分析
  4. 4-30 HTML 细节摘录
  5. DataTemplate 以及Template Selector 学习笔记
  6. 在Fedora 25中更换openjdk为oracle jdk
  7. directx修复工具 4.0_A12-A13最稳定越狱工具发布,支持iOS13.0—iOS13.3
  8. JQuery-表单验证
  9. 计算机硬盘应该什么格式化,电脑硬盘格式化方法总结 【图文】
  10. 第九十九章 SQL函数 NOW
  11. 华为U8500使用心得
  12. Package cmake is not available, but is referred to by another package.
  13. IP-SAN存储技术
  14. 短信注册验证以及邮箱激活
  15. SSH-keygen linux教程
  16. 可以指定列fillna吗_京东e卡可以购买指定的京东自营商品?是真的吗
  17. 01 Java体系
  18. Keras Conv1d 参数及输入输出详解
  19. 【沐风老师】3DMAX一键种草插件GrassScatter使用方法详解
  20. 珈和卫星遥感助力2021年小麦“两病一虫”防控

热门文章

  1. java面试题3(java基础)
  2. 关于 pip install mysqlclent安装失败 ERROR: No matching distribution found for mysqlclient 的解决方案
  3. [ARM-assembly]-A64指令集合总结
  4. Django Models 多条件查询 以及Q/F查询
  5. 密码学基础知识(三)古典密码
  6. 广西中专机器人应用与维护_我校2018级工业机器人应用与维护专业跟岗实习
  7. (31)驱动开发环境配置(VS2010+WDK7600)
  8. linux终端窗口玩法
  9. MySql 查询同一字段多个结果合并到一行显示 GROUP_CONCAT
  10. 【Shell】使用记录