kafka consumer 停止消费topic
现象
在kafka consumer (以 kafka1.0.0为例)消费 topic 时,常常会出现程序还在运行,但是已经不消费消息了(kafka producer正常生产消息),使用kafka命令查看,kafka 已经没有consumer 的信息了。
实验用例
实验 kafka consumer 实现:
package com.muhao.kafka;import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;public class MyKafKaConsumer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "192.168.220.10:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// 一次停止10 秒钟,如果上一次得到超过30条消息,就会出现kafka consumer停止消费的现象Thread.sleep(10000L);System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}
实验 kafka producer 实现:
package com.muhao.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;public class MyKafkaProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.220.10:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++)producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), "hello message is number : "+Integer.toString(i)));producer.close();}
}
启动 kafka consumer ,成功运行后在 kafka命令行执行
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test
发现kafka consumer 已经注册到kafka集群中了。
kafka producer发送消息让kafka consumer 消费,但是consumer是阻塞的,等待5分钟时候,运行命令行
发现kafka集群已经没有了 consumer的消息,但是程序仍在运行。
解决及建议
这种现象也是纠结了好长时间,查看源码,终于明白了,原来是在 kafka consumer 运行时,要和kafka集群的协调节点做心跳交流,这也是kafka集群给consumer做负载均衡的条件。但是但是consumer内部也会有一个计时器,记录上一次向 kafka 集群 poll 的时间,另外心跳线程会检测该现在距上一次poll的时间,如果该时间差超过了设定时间(kafka consumer默认的是 5分钟),就会想kafka集群发出leaveGroup,这时kafka集群会注销掉该consumer 的信息。
建议:kafka consumer 在消费消息时,不要使用阻塞方法,比如blockqueue、网络发送设置超时时间……
总得一句就是上下两次poll 的时间间隔不要超过5分钟(默认的时间)。
kafka consumer 停止消费topic相关推荐
- Kafka Consumer多线程消费
概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...
- kafka突然无法消费topic
突然没法消费可能是kafka已经把数据给删了,该topic下面的数据的历史寿命已到 你再新增加数据试试? 说不定就好了. 根据[1]kafka的数据保存时间是: log.retention.hours ...
- Flink Kafka consumer的消费策略配置
val helloStream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello" ...
- java kafka consumer不消费,报错marking the coordinator (id rack null) dead for group
问题描述:在linux系统,通过 kafka 命令行客户端测试消费正常,但通过Java consumer客户端无法正常接收队列消息,启动后输出如下日志信息: 15:21:34.864 [concurr ...
- Kafka设计解析(四):Kafka Consumer解析--转
原文地址:http://www.infoq.com/cn/articles/kafka-analysis-part-4?utm_source=infoq&utm_campaign=user_p ...
- Kafka设计解析(五): Kafka Consumer设计解析
Kafka设计解析(五)- Kafka Consumer设计解析 大数据架构(郭俊_Jason) · 2015-09-18 08:24 点击上方 大数据架构 快速关注 Kafka Consumer ...
- Kafka设计解析(四):Kafka Consumer解析
High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被 ...
- [Big Data - Kafka] Kafka设计解析(四):Kafka Consumer解析
High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被 ...
- kafka控制台模拟消费_Flink初试——对接Kafka
本篇文章我们用 Flink Kafka Connector对接Kafka,实现一个简单的报警业务.我们暂时不去谈论理论,先上手实现这个简单的需求. flink-connector-kafka是 fli ...
最新文章
- Linux下使用Apache实现域名转发(Tomcat/JBOSS)
- SQL 模糊查询技术
- 中文-自然语言处理-开源工具-流行度调查+句法依存树可视化调研
- What?你还搞不懂什么是物体检测?
- 亲密接触VC6.0编译器
- char,short ,int ,long,long long,unsigned long long数据范围
- 漫话:如何给女朋友解释什么是2PC(二阶段提交)?
- c字符串分割成数组_excel这个复杂数组公式怎么读?
- java websocket 后台服务器_Unity3D与Java后台TomCat服务器传递数据和文件(1)建立Java服务器...
- Node.js 沙箱易受原型污染攻击
- 开源视频质量评价工具: IQA
- android实现Materia Design风格APP(一):开篇
- 颜色的RGBnbsp;指数
- 计算机硬盘数据清零,如何完全清除硬盘数据,使其永不恢复
- APView500电能质量在线监测装置 谐波分析 电压不平衡
- Android版本自带游戏,植物大战僵尸自带花园版
- java中宏定义,宏定义的使用
- 接入百家号流量的方法
- cuda编程思想和opencv_gpu图像处理
- ios中嵌套h5做的app,长按图片默认会有放大效果;如何禁止
热门文章
- 计算机表格应用试卷,2020年9月网络教育统考《计算机应用基础》电子表格模拟题试卷操作题...
- isset和empty以及is_null区别
- 【2018.07.29】(深度优先搜索/回溯)学习DFS算法小记
- 手残转化了动态磁盘后如何转化回基本磁盘
- simulink中子系统分解
- mac上好用的菜单栏管理工具Bartender 4
- 终于,还是用Python向喜马拉雅动手了
- 手机微信浏览器调用图片放大功能
- ansys与solidworks关联失败_solidworks无法导入到ansys里怎么办?
- 跨AZ部署最佳实践之Elasticsearch