1.消费位移确认

Kafka消费者消费位移确认有自动提交与手动提交两种策略。在创建KafkaConsumer对象时,通过参数enable.auto.commit设定,true表示自动提交(默认)。自动提交策略由消费者协调器(ConsumerCoordinator)每隔${auto.commit.interval.ms}毫秒执行一次偏移量的提交。手动提交需要由客户端自己控制偏移量的提交。

(1)自动提交。在创建一个消费者时,默认是自动提交偏移量,当然我们也可以显示设置为自动。例如,我们创建一个消费者,该消费者自动提交偏移量

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("enable.auto.commit", true);// 显示设置偏移量自动提交
props.put("auto.commit.interval.ms", 1000);// 设置偏移量提交时间间隔
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 创建消费者
consumer.subscribe(Arrays.asList("test"));// 订阅主题

(2)手动提交。在有些场景我们可能对消费偏移量有更精确的管理,以保证消息不被重复消费以及消息不被丢失。假设我们对拉取到的消息需要进行写入数据库处理,或者用于其他网络访问请求等等复杂的业务处理,在这种场景下,所有的业务处理完成后才认为消息被成功消费,这种场景下,我们必须手动控制偏移量的提交。

Kafka 提供了异步提交(commitAsync)及同步提交(commitSync)两种手动提交的方式。两者的主要区别在于同步模式下提交失败时一直尝试提交,直到遇到无法重试的情况下才会结束,同时,同步方式下消费者线程在拉取消息时会被阻塞,直到偏移量提交操作成功或者在提交过程中发生错误。而异步方式下消费者线程不会被阻塞,可能在提交偏移量操作的结果还未返

回时就开始进行下一次的拉取操作,在提交失败时也不会尝试提交。

实现手动提交前需要在创建消费者时关闭自动提交,即设置enable.auto.commit=false。然后在业务处理成功后调用commitAsync()或commitSync()方法手动提交偏移量。由于同步提交会阻塞线程直到提交消费偏移量执行结果返回,而异步提交并不会等消费偏移量提交成功后再继续下一次拉取消息的操作,因此异步提交还提供了一个偏移量提交回调的方法commitAsync(OffsetCommitCallback callback)。当提交偏移量完成后会回调OffsetCommitCallback 接口的onComplete()方法,这样客户端根据回调结果执行不同的逻辑处理。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("fetch.max.bytes", 1024);// 为了便于测试,这里设置一次fetch 请求取得的数据最大值为1KB,默认是5MB
props.put("enable.auto.commit", false);// 设置手动提交偏移量
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Arrays.asList("test"));
try {int minCommitSize = 10;// 最少处理10 条消息后才进行提交int icount = 0 ;// 消息计算器while (true) {// 等待拉取消息ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {// 简单打印出消息内容,模拟业务处理System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record. partition(), record.offset(), record.key(),record.value());icount++;}// 在业务逻辑处理成功后提交偏移量if (icount >= minCommitSize){consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (null == exception) {// TODO 表示偏移量成功提交System.out.println("提交成功");} else {// TODO 表示提交偏移量发生了异常,根据业务进行相关处理System.out.println("发生了异常");}}});icount=0; // 重置计数器}}
} catch(Exception e){// TODO 异常处理e.printStackTrace();
} finally {consumer.close();
}

3.5以时间戳查询消息

Kafka 在0.10.1.1 版本增加了时间戳索引文件,因此我们除了直接根据偏移量索引文件查询消息之外,还可以根据时间戳来访问消息。consumer-API 提供了一个offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,该方法入参为一个Map 对象,Key 为待查询的分区,Value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的偏移量和时间戳。需要注意的是,若待查询的分区不存在,则该方法会被一直阻塞。

假设我们希望从某个时间段开始消费,那们就可以用offsetsForTimes()方法定位到离这个时间最近的第一条消息的偏移量,在查到偏移量之后调用seek(TopicPartition partition, long offset)方法将消费偏移量重置到所查询的偏移量位置,然后调用poll()方法长轮询拉取消息。例如,我们希望从主题“stock-quotation”第0 分区距离当前时间相差12 小时之前的位置开始拉取消息

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("enable.auto.commit", true);// 显示设置偏移量自动提交
props.put("auto.commit.interval.ms", 1000);// 设置偏移量提交时间间隔
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.assign(Arrays.asList(new TopicPartition("test", 0)));
try {Map<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition,Long>();// 构造待查询的分区TopicPartition partition = new TopicPartition("stock-quotation", 0);// 设置查询12 小时之前消息的偏移量timestampsToSearch.put(partition, (System.currentTimeMillis() - 12 * 3600 * 1000));// 会返回时间大于等于查找时间的第一个偏移量Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes (timestampsToSearch);OffsetAndTimestamp offsetTimestamp = null;// 这里依然用for 轮询,当然由于本例是查询的一个分区,因此也可以用if 处理for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {// 若查询时间大于时间戳索引文件中最大记录索引时间,// 此时value 为空,即待查询时间点之后没有新消息生成offsetTimestamp = entry.getValue();if (null != offsetTimestamp) {// 重置消费起始偏移量consumer.seek(partition, entry.getValue().offset());}}while (true) {// 等待拉取消息ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records){// 简单打印出消息内容System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record.partition(), record.offset(), record.key(),record.value());}}
} catch (Exception e) {e.printStackTrace();
} finally {consumer.close();
}

3.6消费速度控制

提供 pause(Collection<TopicPartition> partitions)和resume(Collection<TopicPartition>
partitions)方法,分别用来暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据操作。通过这两个方法可以对消费速度加以控制,结合业务使用。

Kafka入门三:几种消费方式相关推荐

  1. nacos实现服务注册与两种消费方式

    nacos实现服务注册与两种消费方式 运行nacos 服务注册实例 两种服务消费方式 RestTemplet Feign 测试 参考 运行nacos 预备环境:64位操作系统.64位JDK1.8+.M ...

  2. 简单谈谈ActiveMQ的两种消费方式

    ActiveMQ 有两种消费方式,一种是q,一种是订阅式的.用q的方式提供生产的话始终保存在服务端,直到一个消费者把他消费完才可以返回一个状态.然后就是订阅的方式可以供多个消费者同时消费.我们当时用的 ...

  3. RocketMQ:两种消费方式:pull拉、push推

    RocketMQ:两种消息消费方式:pull拉.push推 1.推送方式pull模式: 拉取,DefaultMQPullConsumer模式. 是由客户端主动向MQ请求数据,主动权在客户端,先拉取数据 ...

  4. spring+kafka消费者的2种配置方式

    2019独角兽企业重金招聘Python工程师标准>>> 1.通过XML配置 <?xml version="1.0" encoding="UTF-8 ...

  5. linux-shell入门-shell两种使用方式-shell的基本特性

    初识shell 明确什么是shell shell就是一个命令的解释器 启动shell的两种方式 在shell的提示符下,通过输入命令实现相应的管理 通过shell的角本,即 shell script ...

  6. 真的,关于 Kafka 入门看这一篇就够了

    作者 | cxuan 责编 | 刘静 Kafka 系列的阶段性总结(万字长文,做好准备,建议先收藏再看) 初识 Kafka 什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个 ...

  7. Kafaka的消息消费方式

    Kafaka的三种消费方式 1.消费位移确认 Kafka消费者消费位移确认有自动提交与手动提交两种策略.在创建KafkaConsumer对象时,通过参数enable.auto.commit设定,tru ...

  8. Kafka 入门教程(超详细)

    文章目录 1. Kafka 概述 1.1 定义 1.2 消息队列 1.2.1 传统消息队列的应用场景 1.2.2 消息队列的两种形式 1.3 Kafka 基础架构 2. Kafka 的安装 2.1 安 ...

  9. 真的,Kafka 入门一篇文章就够了

    初识 Kafka 什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订 ...

最新文章

  1. 信息安全 CIO最关注什么?
  2. 灾难恢复演练成功的8大步骤
  3. java.两个例子充分阐述多态的可拓展性
  4. Python中读取文件中的json串,并将其写入到Excel表格中
  5. python-爬虫(1)
  6. CSDN转载博客的方法
  7. maven 打包指定依赖包_Maven打包成Jar文件时依赖包的问题
  8. gprof—Ubuntu中使用gprofile进行性能统计时没有数字结果
  9. Doing It in User Space
  10. 基础03 JVM到底在哪里?
  11. 转发的 呀 犯法不 顶级 的 学学不错 【分享】各大资源论坛推荐及优势特点
  12. 系统动力学Vensim的使用
  13. SQL server 2008卸载后有残留及彻底卸载
  14. correlation 蒙特卡洛_蒙特卡洛模拟法
  15. 【小算法】求约数个数
  16. 英文单词缩写规则(转自天涯)
  17. C++常用字符串string方法
  18. 那些年UNIX教我们的事
  19. 2019美团点评校招笔试劝退之旅
  20. 说说12306.cn铁路订票网站

热门文章

  1. pywifi连接中文wifi名称(乱码)连接不上问题解决方案
  2. 5.3.5—二叉查找树—Convert Sorted List to Binary Sear Tree
  3. Unity3d开发MOBA游戏类《王者荣耀》记录(起)
  4. centos gedit 字体大小_【写作技巧】毕业论文格式要求及字体大小
  5. java curator_使用curator实现选举
  6. 阿里云国际站云服务器可以用来做什么业务?
  7. Unity3D占用内存太大怎么解决呢? -尾
  8. 汽车门把手的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告
  9. 程序员高管修炼之道!
  10. android 获取视频编码,Android视频编码