现象

在kafka consumer (以 kafka1.0.0为例)消费 topic 时,常常会出现程序还在运行,但是已经不消费消息了(kafka producer正常生产消息),使用kafka命令查看,kafka 已经没有consumer 的信息了。

实验用例

实验 kafka consumer 实现:

package com.muhao.kafka;import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;public class MyKafKaConsumer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "192.168.220.10:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// 一次停止10 秒钟,如果上一次得到超过30条消息,就会出现kafka consumer停止消费的现象Thread.sleep(10000L);System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

实验 kafka producer 实现:

package com.muhao.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;public class MyKafkaProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.220.10:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++)producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), "hello message is number : "+Integer.toString(i)));producer.close();}
}

启动 kafka consumer ,成功运行后在 kafka命令行执行

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe  --group test

发现kafka consumer 已经注册到kafka集群中了。

kafka producer发送消息让kafka consumer 消费,但是consumer是阻塞的,等待5分钟时候,运行命令行

发现kafka集群已经没有了 consumer的消息,但是程序仍在运行。

解决及建议

这种现象也是纠结了好长时间,查看源码,终于明白了,原来是在 kafka consumer 运行时,要和kafka集群的协调节点做心跳交流,这也是kafka集群给consumer做负载均衡的条件。但是但是consumer内部也会有一个计时器,记录上一次向 kafka 集群 poll 的时间,另外心跳线程会检测该现在距上一次poll的时间,如果该时间差超过了设定时间(kafka consumer默认的是 5分钟),就会想kafka集群发出leaveGroup,这时kafka集群会注销掉该consumer 的信息。

建议:kafka consumer 在消费消息时,不要使用阻塞方法,比如blockqueue、网络发送设置超时时间……
总得一句就是上下两次poll 的时间间隔不要超过5分钟(默认的时间)。

kafka consumer 停止消费topic相关推荐

  1. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  2. kafka突然无法消费topic

    突然没法消费可能是kafka已经把数据给删了,该topic下面的数据的历史寿命已到 你再新增加数据试试? 说不定就好了. 根据[1]kafka的数据保存时间是: log.retention.hours ...

  3. Flink Kafka consumer的消费策略配置

    val helloStream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello" ...

  4. java kafka consumer不消费,报错marking the coordinator (id rack null) dead for group

    问题描述:在linux系统,通过 kafka 命令行客户端测试消费正常,但通过Java consumer客户端无法正常接收队列消息,启动后输出如下日志信息: 15:21:34.864 [concurr ...

  5. Kafka设计解析(四):Kafka Consumer解析--转

    原文地址:http://www.infoq.com/cn/articles/kafka-analysis-part-4?utm_source=infoq&utm_campaign=user_p ...

  6. Kafka设计解析(五): Kafka Consumer设计解析

    Kafka设计解析(五)- Kafka Consumer设计解析 大数据架构(郭俊_Jason) · 2015-09-18 08:24 点击上方 大数据架构   快速关注 Kafka Consumer ...

  7. Kafka设计解析(四):Kafka Consumer解析

    High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被 ...

  8. [Big Data - Kafka] Kafka设计解析(四):Kafka Consumer解析

    High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被 ...

  9. kafka控制台模拟消费_Flink初试——对接Kafka

    本篇文章我们用 Flink Kafka Connector对接Kafka,实现一个简单的报警业务.我们暂时不去谈论理论,先上手实现这个简单的需求. flink-connector-kafka是 fli ...

最新文章

  1. Linux下使用Apache实现域名转发(Tomcat/JBOSS)
  2. SQL 模糊查询技术
  3. 中文-自然语言处理-开源工具-流行度调查+句法依存树可视化调研
  4. What?你还搞不懂什么是物体检测?
  5. 亲密接触VC6.0编译器
  6. char,short ,int ,long,long long,unsigned long long数据范围
  7. 漫话:如何给女朋友解释什么是2PC(二阶段提交)?
  8. c字符串分割成数组_excel这个复杂数组公式怎么读?
  9. java websocket 后台服务器_Unity3D与Java后台TomCat服务器传递数据和文件(1)建立Java服务器...
  10. Node.js 沙箱易受原型污染攻击
  11. 开源视频质量评价工具: IQA
  12. android实现Materia Design风格APP(一):开篇
  13. 颜色的RGBnbsp;指数
  14. 计算机硬盘数据清零,如何完全清除硬盘数据,使其永不恢复
  15. APView500电能质量在线监测装置 谐波分析 电压不平衡
  16. Android版本自带游戏,植物大战僵尸自带花园版
  17. java中宏定义,宏定义的使用
  18. 接入百家号流量的方法
  19. cuda编程思想和opencv_gpu图像处理
  20. ios中嵌套h5做的app,长按图片默认会有放大效果;如何禁止

热门文章

  1. 计算机表格应用试卷,2020年9月网络教育统考《计算机应用基础》电子表格模拟题试卷操作题...
  2. isset和empty以及is_null区别
  3. 【2018.07.29】(深度优先搜索/回溯)学习DFS算法小记
  4. 手残转化了动态磁盘后如何转化回基本磁盘
  5. simulink中子系统分解
  6. mac上好用的菜单栏管理工具Bartender 4
  7. 终于,还是用Python向喜马拉雅动手了
  8. 手机微信浏览器调用图片放大功能
  9. ansys与solidworks关联失败_solidworks无法导入到ansys里怎么办?
  10. 跨AZ部署最佳实践之Elasticsearch