【README】

本文主要对 java客户端作为kafka 消费者进行测试, 生产者由 kafka客户端扮演;

【1】普通消费者

设置消费者组;

重置消费者的offset, 即每次都从最头开始消费(默认仅保持7天内数据) ;

类似于 命令行 --from-beginning

kafka-console-consumer.sh --topic first --zookeeper centos201:2181 --from-beginning

小结:从头开始消费,必须满足2个条件;

条件1: 必须重新换组, 如本文中的消费者组 从 sichuan 更新为 sichuan1 ;
条件2: 需要设置offset, 修改为 earliest, 默认值是 lastest;
/*** 普通消费者*/
public class MyConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props = new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2开启自动提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest /* 创建消费者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 订阅主题 */consumer.subscribe(Arrays.asList("first", "second"));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecords<String, String> consumerRds  = consumer.poll(100);/* 解析并打印 ConsumerRecords  *//* 遍历 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消费者] " + rd.key() + "--" + rd.value()); }} /* 关闭消费者 */
//      consumer.close(); }
}

从官网可以找到以上配置值; https://kafka.apache.org/0110/documentation.html#configuration

【2】kafka消费者-手动提交offset

手动提交offset有3种方式:

  • 方式1:同步手动提交;
  • 方式2:异步手动提交;
  • 方式3:自定义手动提交策略;

0)为啥需要手动提交?

kafka自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
很多情况下我们需要从kafka成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 , 所以这时我们就需要进行手动提交kafka的offset下标。这里顺便说下offset具体是什么。
offset:指的是kafka的topic中的每个消费组消费的下标。
简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。

1)关闭自动提交(默认为true)

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

第一次启动 consumer 从 90 开始消费;
第2次启动相同 consumer ,还是从90开始消费;

2) 如何使用手动提交?

kafka提供了手动提交offset的api;
方法1:commitSync 同步提交:  ;
方法2:commitAsync 异步提交;
两者相同点:都会将本次 poll  的一批数据最高的偏移量提交;
不同点是, commitSync 阻塞当前线程,一直到提交成功, 并且会自动失败重试;
而 commitAsync 没有失败重试机制, 可能提交失败; 

3)同步手动提交offset

/*** 手动同步提交offset */
public class ManSyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props = new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest /* 创建消费者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 订阅主题 */consumer.subscribe(Arrays.asList("first", "second"));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecords<String, String> consumerRds  = consumer.poll(100);/* 解析并打印 ConsumerRecords  *//* 遍历 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【同步提交】,当前线程会阻塞直到 offset提交成功 */ consumer.commitSync();} /* 关闭消费者 */
//      consumer.close(); }
}

4)异步手动提交offset

/*** 异步手动提交offset  */
public class ManASyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props = new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest /* 创建消费者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 订阅主题 */consumer.subscribe(Arrays.asList("first", "second"));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecords<String, String> consumerRds  = consumer.poll(100);/* 解析并打印 ConsumerRecords  *//* 遍历 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【异步提交】 当前线程会阻塞直到 offset提交成功 */  consumer.commitAsync(new OffsetCommitCallback() {@Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception !=null) {System.out.println("异步提交失败");} else {System.out.println("异步提交成功"); }}}); } /* 关闭消费者 */
//      consumer.close(); }
}

5)自定义手动提交offset策略

5.0)为啥需要自定义?

因为异步提交有一些问题,如下:
先消费数据,后提交offset, 可能导致数据重复消费;
先提交offset, 后走业务逻辑,可能会丢数据; 

5.1)应用场景:

把 offset 存储到本地库 和 消息消费逻辑 在同一个数据库事务里面;

5.2)如何实现?需要实现 ConsumerRebalanceListener 来实现。

/*** 自定义手动提交offset策略  */
public class DiyCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props = new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest /* 创建消费者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 订阅主题 */consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 在 rebalance方法【前】调用}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 在 rebalance方法【后】调用  /* 分区分配方法 */for (TopicPartition partition :  partitions) { /*定位到某个 offset*/consumer.seek(partition, 1); // TODO: 这里需要输入1  }}  });/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecords<String, String> consumerRds  = consumer.poll(100);/* 解析并打印 ConsumerRecords  *//* 遍历 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【同步提交】,当前线程会阻塞直到 offset提交成功 */ consumer.commitSync();} /* 关闭消费者 */
//      consumer.close(); }
}

补充: 消费者rebalance 是什么?

消费者 rebalance, 什么时候触发 rebalance?  如 同一个消费者组下的 某个消费者机器宕机,或新增一个消费者机器,都会触发 rebalance,即重新分配  kafka分区数据与 消费者的对应关系; 

java客户端作为kafka消费者测试相关推荐

  1. java客户端作为kafka生产者测试

    [README] 1.本文主要对 java客户端作为kafka 生产者进行测试, 消费者由 centos的kafka命令行线程扮演: 2.消息发送: kafka的生产者采用异步发送消息的方式,在消息发 ...

  2. Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka

    文章目录 操作步骤 Maven依赖 生产者 消费者 操作步骤 Maven依赖 核心依赖 kafka-clients <dependency><groupId>org.apach ...

  3. java客户端访问kafka

    kafka版本:kafka_2.11-0.11.0.1.tgz centos7 关闭防火墙:systemctl stop firewalld.service 在kafka中配置文件: listener ...

  4. 【Java客户端访问Kafka】

    我是

  5. java客户端发消息到kafka

    前言 下面记录下如何使用kafka的java客户端向kafka的broker发送消息 1.导入maven依赖 <dependency><groupId>org.apache.k ...

  6. Mysql开启ssl加密协议及Java客户端配置操作指南

    Mysql开启ssl加密协议及Java客户端配置操作指南 Mysql配置 验证Mysql开启SSL Java客户端操作 生成证书密码 配置数据库连接 工具配置 Mysql配置 Mysql需要配置对应的 ...

  7. kafka消费者接收分区测试

    [README] 本文演示了当有新消费者加入组后,其他消费者接收分区情况: 本文还模拟了 broker 宕机的情况: 本文使用的是最新的 kafka3.0.0 : 本文测试案例,来源于 消费者接收分区 ...

  8. (转)Kafka 消费者 Java 实现

    转自: Kafka 消费者 Java 实现 - 简书应用程序使用 KafkaConsumer向 Kafka 订阅 Topic 接收消息,首先理解 Kafka 中消费者(consumer)和消费者组(c ...

  9. Kafka : Kafka入门教程和JAVA客户端使用

    目录 目录 Kafka简介 环境介绍 术语介绍 消费模式 下载 集群安装配置 命令使用 JAVA实战 参考文献 Kafka简介 由Scala和Java编写,Kafka是一种高吞吐量的分布式发布订阅消息 ...

最新文章

  1. c语言链表最高响应比优先,操作系统--最高响应比优先调度算法实验报告..doc
  2. vsftp账号_vsftp 实现不同用户不同权限配置
  3. 曈曈妈妈设计的2010年台历模板
  4. 【laravel5.4】laravel5.4系列之生成_ide_helper.php文件
  5. 移动平台作业——天气预报——天气数据的获得——为应用申请百度ak码
  6. Iterator 和 for...of 循环
  7. Chrome 或将于2018年正式弃用 HPKP 公钥固定标准
  8. chinaren校友录恢复重新开放_确定!九寨沟景区9月27日对外开放(试运行) 最大限量为每天5000人 各大旅企产品已上线...
  9. 分布式锁是啥?zk还是redis?
  10. java对象头_我的并发编程(二):java对象头以及synchronized升级过程
  11. 【OS学习笔记】三十五 保护模式十:中断描述符表、中断门和陷阱门
  12. 右侧快速入口滑动时左侧跟着变化
  13. bootstrap table无法服务器分页_layui分页的大坑,RequestPayload和FormData
  14. 储粮过冬?消息称中芯国际大举向设备、零件商囤货
  15. 【SpringMVC】SpringMVC: @RequestBody 和@ResponseBody 注解详解 NoHandlerFoundException
  16. Java Singleton类中的线程安全
  17. jeecg框架 弹出框问题
  18. 企业生存与发展的前提是安全
  19. 矩阵特征值和特征向量详细计算过程
  20. 【tensorflow】tensorflow相关基础概念

热门文章

  1. hdu 1028 Ignatius and the Princess III 母函数入门
  2. Codeforces Round #715 (Div. 2) C. The Sports Festival 区间dp
  3. 【ZJOI2015】幻想乡战略游戏【点分树】【带权重心】
  4. 线段树——思维(Codeforces 339D Xenia and Bit Operations/Billboard HDU - 2795)
  5. CF980D Perfect Groups
  6. CF1253E Antenna Coverage
  7. 牛客网 【每日一题】5月11日题目精讲 Moovie Mooving
  8. CF98E Help Shrek and Donkey(纳什博弈 + 大讨论)
  9. 盲盒(随机概率 + 最大公约数)
  10. 【无码专区6】球与盒子(数学线性筛)