Kafaka的三种消费方式

1.消费位移确认

Kafka消费者消费位移确认有自动提交与手动提交两种策略。在创建KafkaConsumer对象时,通过参数enable.auto.commit设定,true表示自动提交(默认)。自动提交策略由消费者协调器(ConsumerCoordinator)每隔${auto.commit.interval.ms}毫秒执行一次偏移量的提交。手动提交需要由客户端自己控制偏移量的提交。

(1)自动提交。在创建一个消费者时,默认是自动提交偏移量,当然我们也可以显示设置为自动。例如,我们创建一个消费者,该消费者自动提交偏移量

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "test");
  4. props.put("client.id", "test");
  5. props.put("enable.auto.commit", true);// 显示设置偏移量自动提交
  6. props.put("auto.commit.interval.ms", 1000);// 设置偏移量提交时间间隔
  7. props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  8. props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  9. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 创建消费者
  10. consumer.subscribe(Arrays.asList("test"));// 订阅主题

(2)手动提交。在有些场景我们可能对消费偏移量有更精确的管理,以保证消息不被重复消费以及消息不被丢失。假设我们对拉取到的消息需要进行写入数据库处理,或者用于其他网络访问请求等等复杂的业务处理,在这种场景下,所有的业务处理完成后才认为消息被成功消费,这种场景下,我们必须手动控制偏移量的提交。

Kafka 提供了异步提交(commitAsync)及同步提交(commitSync)两种手动提交的方式。两者的主要区别在于同步模式下提交失败时一直尝试提交,直到遇到无法重试的情况下才会结束,同时,同步方式下消费者线程在拉取消息时会被阻塞,直到偏移量提交操作成功或者在提交过程中发生错误。而异步方式下消费者线程不会被阻塞,可能在提交偏移量操作的结果还未返

回时就开始进行下一次的拉取操作,在提交失败时也不会尝试提交。

实现手动提交前需要在创建消费者时关闭自动提交,即设置enable.auto.commit=false。然后在业务处理成功后调用commitAsync()或commitSync()方法手动提交偏移量。由于同步提交会阻塞线程直到提交消费偏移量执行结果返回,而异步提交并不会等消费偏移量提交成功后再继续下一次拉取消息的操作,因此异步提交还提供了一个偏移量提交回调的方法commitAsync(OffsetCommitCallback callback)。当提交偏移量完成后会回调OffsetCommitCallback 接口的onComplete()方法,这样客户端根据回调结果执行不同的逻辑处理。

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "test");
  4. props.put("client.id", "test");
  5. props.put("fetch.max.bytes", 1024);// 为了便于测试,这里设置一次fetch 请求取得的数据最大值为1KB,默认是5MB
  6. props.put("enable.auto.commit", false);// 设置手动提交偏移量
  7. props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  8. props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  9. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  10. // 订阅主题
  11. consumer.subscribe(Arrays.asList("test"));
  12. try {
  13. int minCommitSize = 10;// 最少处理10 条消息后才进行提交
  14. int icount = 0 ;// 消息计算器
  15. while (true) {
  16. // 等待拉取消息
  17. ConsumerRecords<String, String> records = consumer.poll(1000);
  18. for (ConsumerRecord<String, String> record : records) {
  19. // 简单打印出消息内容,模拟业务处理
  20. System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record. partition(), record.offset(), record.key(),record.value());
  21. icount++;
  22. }
  23. // 在业务逻辑处理成功后提交偏移量
  24. if (icount >= minCommitSize){
  25. consumer.commitAsync(new OffsetCommitCallback() {
  26. @Override
  27. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  28. if (null == exception) {
  29. // TODO 表示偏移量成功提交
  30. System.out.println("提交成功");
  31. } else {
  32. // TODO 表示提交偏移量发生了异常,根据业务进行相关处理
  33. System.out.println("发生了异常");
  34. }
  35. }
  36. });
  37. icount=0; // 重置计数器
  38. }
  39. }
  40. } catch(Exception e){
  41. // TODO 异常处理
  42. e.printStackTrace();
  43. } finally {
  44. consumer.close();
  45. }

2以时间戳查询消息

Kafka 在0.10.1.1 版本增加了时间戳索引文件,因此我们除了直接根据偏移量索引文件查询消息之外,还可以根据时间戳来访问消息。consumer-API 提供了一个offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,该方法入参为一个Map 对象,Key 为待查询的分区,Value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的偏移量和时间戳。需要注意的是,若待查询的分区不存在,则该方法会被一直阻塞。

假设我们希望从某个时间段开始消费,那们就可以用offsetsForTimes()方法定位到离这个时间最近的第一条消息的偏移量,在查到偏移量之后调用seek(TopicPartition partition, long offset)方法将消费偏移量重置到所查询的偏移量位置,然后调用poll()方法长轮询拉取消息。例如,我们希望从主题“stock-quotation”第0 分区距离当前时间相差12 小时之前的位置开始拉取消息

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "test");
  4. props.put("client.id", "test");
  5. props.put("enable.auto.commit", true);// 显示设置偏移量自动提交
  6. props.put("auto.commit.interval.ms", 1000);// 设置偏移量提交时间间隔
  7. props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  8. props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  9. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  10. // 订阅主题
  11. consumer.assign(Arrays.asList(new TopicPartition("test", 0)));
  12. try {
  13. Map<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition,Long>();
  14. // 构造待查询的分区
  15. TopicPartition partition = new TopicPartition("stock-quotation", 0);
  16. // 设置查询12 小时之前消息的偏移量
  17. timestampsToSearch.put(partition, (System.currentTimeMillis() - 12 * 3600 * 1000));
  18. // 会返回时间大于等于查找时间的第一个偏移量
  19. Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes (timestampsToSearch);
  20. OffsetAndTimestamp offsetTimestamp = null;
  21. // 这里依然用for 轮询,当然由于本例是查询的一个分区,因此也可以用if 处理
  22. for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
  23. // 若查询时间大于时间戳索引文件中最大记录索引时间,
  24. // 此时value 为空,即待查询时间点之后没有新消息生成
  25. offsetTimestamp = entry.getValue();
  26. if (null != offsetTimestamp) {
  27. // 重置消费起始偏移量
  28. consumer.seek(partition, entry.getValue().offset());
  29. }
  30. }
  31. while (true) {
  32. // 等待拉取消息
  33. ConsumerRecords<String, String> records = consumer.poll(1000);
  34. for (ConsumerRecord<String, String> record : records){
  35. // 简单打印出消息内容
  36. System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record.partition(), record.offset(), record.key(),record.value());
  37. }
  38. }
  39. } catch (Exception e) {
  40. e.printStackTrace();
  41. } finally {
  42. consumer.close();
  43. }

3消费速度控制

提供 pause(Collection<TopicPartition> partitions)和resume(Collection<TopicPartition>
partitions)方法,分别用来暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据操作。通过这两个方法可以对消费速度加以控制,结合业务使用。

Kafaka的消息消费方式相关推荐

  1. RocketMQ消息消费方式 推拉模式

    RocketMQ消息消费本质上是基于的拉(pull)模式,consumer主动向消息服务器broker拉取消息. consumer被分为2类:MQPullConsumer和MQPushConsumer ...

  2. [RocketMQ]消息中间件—RocketMQ消息消费(一)

    2019独角兽企业重金招聘Python工程师标准>>> 文章摘要:在发送消息给RocketMQ后,消费者需要消费.消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在 ...

  3. RocketMq之消费方式

    一.如何选择消息消费的方式-Pull or Push? 1.1 MQ中Pull和Push的两种消费方式 对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费: (1)Pus ...

  4. RocketMQ的长轮询消费方式

    1.Push推送方式(即Server端推送消息给client): 当Server收到消息发送者发送过来的消息后,Server端主动把消息推送给client,这个方式实时性比较好,但是增加了Server ...

  5. RocketMQ:两种消费方式:pull拉、push推

    RocketMQ:两种消息消费方式:pull拉.push推 1.推送方式pull模式: 拉取,DefaultMQPullConsumer模式. 是由客户端主动向MQ请求数据,主动权在客户端,先拉取数据 ...

  6. RocketMQ 消息消费 轮询机制 PullRequestHoldService

    1. 概述 先来看看 RocketMQ 消费过程中的轮询机制是啥.首先需要补充一点消费相关的前置知识. 1.1 消息消费方式 RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式 ...

  7. rocketMQ的消息介绍、发送方式和消费方式

    rocketMQ中消息有以下几种 普通消息:消息队列中没有特性的消息 顺序消息:严格按照顺序发布和消费的消息,先发布的消息一定会先被消费,可以分为 全局顺序消息:所有消息严格按照先入先出的顺序来发布和 ...

  8. RocketMQ系列---消息消费者及消费方式

    1.消息消费 public class Consumer {public static void main(String[] args) throws InterruptedException, MQ ...

  9. mq中消息消费的几种方式

    mq系列文章 本章内容 从消费者的角度出发,分析一下消息消费的两种方式: push方式 pull方式 push方式 消息消费的过程: mq接收到消息 mq主动将消息推送给消费者(消费者需提供一个消费接 ...

最新文章

  1. Microsoft Dynamics CRM server 2013 中业务规则,有点像C#的正则表达式
  2. linux命令---查找文件中的内容
  3. 如何在Spring Boot中使用Hibernate Natural ID
  4. SendMessage函数的常用消息及其应用
  5. 软件测试菲律宾,英雄联盟手游菲律宾测试资格怎么得 菲律宾测试资格获取攻略[多图]...
  6. 中山大学计算机专业研究生报录比,中山大学报录比(中山大学2019各专业报录比)...
  7. Java案例:基于TCP的简单聊天程序
  8. Vue过滤器_使用过滤器进行数据格式化操作---vue工作笔记0015
  9. android屏幕内容实时传输,在设备之间无缝传输内容
  10. java 获取各省市的区号_城市查区号示例代码
  11. dcm文件如何转化为jpg或者bmp文件
  12. 【SLAM学习】(三)激光雷达原理及分类
  13. MES的发展历程及功能模块
  14. springboot访问页面显示Whitelabel Error Page
  15. 第二章 Google 常用功能
  16. pointwise linux ubuntu 安装 纪录
  17. 【医学图像处理】1 (医学)图像及图像处理流程
  18. 大数据(电商行业)规模参考
  19. R中的Box-Cox变换
  20. 上海计算机应用能力大赛获奖作品,_上海市大学生计算机应用能力大赛2009年参赛作品集锦.pdf...

热门文章

  1. 6个不亚于公务员的职业选择
  2. 医学影像组学之数据增强免费教程
  3. 2022年中职网络空间安全国赛竞赛题解析仅代表自己的建议——2022年中职网络安全国赛竞赛试题2解析
  4. MOSES翻译系统的训练,调优和使用
  5. 从零学习VH6501(八) —— 采样点测试
  6. [Windows 10](Windows 10 解决开机小键盘灯不亮)
  7. sip 时序图_教你如何看懂时序图(小白如何快速轻松的看懂时序图)
  8. 左声道,右声道和立体声
  9. 躺平减脂减重法补充篇——无需控制碳水摄入的有效方法,另推一种健康的运动和防止老年慢性病的方式...
  10. 基于VQ适量特征的说话人识别