kafka踩坑、实践篇

1、从kafka读取数据后 数据会自动删除吗

不会,kafka中数据的删除跟有没有消费者消费完全无关。

数据的删除,只跟kafka broker上面上面的这两个配置有关:

# 数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略,log.retention.bytes和log.retention.minutes或log.retention.hours任意一个达到要求,都会执行删除
# 有2删除数据文件方式: 按照文件大小删除:log.retention.bytes  按照2中不同时间粒度删除:分别为分钟,小时
log.retention.hours=168# topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
# log.retention.bytes=-1
log.retention.bytes=1073741824 #数据最多1G

2、kafka日志删除

Kafka日志管理器中会有一个专门的日志删除任务来周期性检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。

当前日志分段的保留策略有3种:基于时间的保留策略、基于日志大小的保留策略以及基于日志起始偏移量的保留策略。

2.1、 基于时间

日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值retentionMs来寻找可删除的的日志分段文件集合deletableSegments。retentionMs可以通过broker端参数log.retention.hours、log.retention.minutes以及log.retention.ms来配置,其中log.retention.ms的优先级最高,log.retention.minutes次之,log.retention.hours最低。默认情况下只配置了log.retention.hours参数,其值为168,故默认情况下日志分段文件的保留时间为7天。

2.2、 基于日志大小

日志删除任务会检查当前日志的大小是否超过设定的阈值retentionSize来寻找可删除的日志分段的文件集合deletableSegments。retentionSize可以通过broker端参数log.retention.bytes来配置,默认值为-1,表示无穷大。注意log.retention.bytes配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。

2.3、 基于日志起始偏移量

参考文档:https://blog.csdn.net/u013256816/article/details/80418297

3、Kafka提供了两种日志清理策略:

3.1、日志删除(Log Deletion):按照一定的保留策略来直接删除不符合条件的日志分段。
3.2、日志压缩(Log Compaction):针对每个消息的key进行整合,对于有相同key的的不同value值,只保留最后一个版本。

可以通过broker端参数log.cleanup.policy来设置日志清理策略,此参数默认值为“delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略的话,就需要将log.cleanup.policy设置为“compact”,并且还需要将log.cleaner.enable(默认值为true)设定为true。通过将log.cleanup.policy参数设置为“delete,compact”还可以同时支持日志删除和日志压缩两种策略。

4、Kafka消费异常

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:775)

该Consumer不能提交offset了,因为它已经出局了,是因为你的处理小时时间长于你要报告给server的时间。同时还告诉我们怎么处理:要么增加超时时间,要么减少每次poll回来的消息个数。

异常解决:
a)调大max.poll.interval,ms,默认300000(300s)
b)调小max.poll.records,默认500
c)另起线程

5、WARN Error while fetching metadata with correlation id 5 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

异常解决:

# vim server.properties

listeners=PLAINTEXT://192.168.20.112:9092 #打开注释

6、发送消息 send()

①、普通发送——发送就忘记

    //1、通过上面的配置文件生成 Producer 对象Producer producer = new KafkaProducer(kafkaProperties);//2、生成 ProducerRecord 对象,并制定 Topic,key 以及 value//创建名为testTopic的队列,键为testkey,值为testValue的ProducerRecord对象ProducerRecord<String,String> record = new ProducerRecord<>("testTopic","testkey","testValue");//3、发送消息producer.send(record);

通过配置文件构造一个生产者对象 producer,然后指定主题名称,键值对,构造一个 ProducerRecord 对象,最后使用生产者Producer 的 send() 方法发送 ProducerRecord 对象,send() 方法会返回一个包含 RecordMetadata 的 Future 对象,不过通常我们会忽略返回值。
和上面的名字一样——发送就忘记,生产者只管发送,并不管发送的结果是成功或失败。通常如果我们不关心发送结果,那么就可以使用此种方式。

②、同步发送

    //1、通过上面的配置文件生成 Producer 对象Producer producer = new KafkaProducer(kafkaProperties);//2、生成 ProducerRecord 对象,并制定 Topic,key 以及 value//创建名为testTopic的队列,键为testkey,值为testValue的ProducerRecord对象ProducerRecord<String,String> record = new ProducerRecord<>("testTopic","testkey","testValue");//3、同步发送消息try {//通过send()发送完消息后返回一个Future对象,然后调用Future对象的get()方法等待kafka响应//如果kafka正常响应,返回一个RecordMetadata对象,该对象存储消息的偏移量//如果kafka发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理producer.send(record).get();} catch (Exception e) {//4、异常处理e.printStackTrace();}

和上面普通发送消息一样,只不过这里我们调用了 Future 对象的 get() 方法来等待 kafka 服务器的响应,程序运行到这里会产生阻塞,直到获取kafka集群的响应。而这个响应有两种情况:

  1、正常响应:返回一个 RecordMetadata 对象,通过该对象我们能够获取消息的偏移量、分区等信息。

  2、异常响应:基本上来说会发生两种异常,

    一类是可重试异常,该错误可以通过重发消息来解决。比如连接错误,可以通过再次连接后继续发送上一条未发送的消息;再比如集群没有首领(no leader),因为我们知道集群首领宕机之后,会有一个时间来进行首领的选举,如果这时候发送消息,肯定是无法发送的。

    二类是无法重试异常,比如消息太大异常,对于这类异常,KafkaProducer 不会进行任何重试,直接抛出异常。

  同步发送消息适合需要保证每条消息的发送结果,优点是能够精确的知道什么消息发送成功,什么消息发送失败,而对于失败的消息我们也可以采取措施进行重新发送。缺点则是增加了每条消息发送的时间,当发送消息频率很高时,此种方式便不适合了。

③、异步发送

有同步发送,基本上就会有异步发送了。同步发送每发送一条消息都得等待kafka服务器的响应,之后才能发送下一条消息,那么我们不是在错误产生时马上处理,而是记录异常日志,然后马上发送下一条消息,而这个异常再通过回调函数去处理,这就是异步发送。

1、首先我们要实现一个继承 org.apache.kafka.clients.producer.Callback 接口,然后实现其唯一的 onCompletion 方法。

    import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaCallback implements Callback{@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){//异常处理e.printStackTrace();}}}

2、发送消息时,传入这个回调类。

    //异步发送消息producer.send(record,new KafkaCallback());

或者通过内部类实现:

 public static void main(String[] args) {// 获取生产者Producer<String, String> producer = MQDict.getKafkaProducer();for (int i = 0; i < 100; i++) {ProducerRecord<String, String> record = new ProducerRecord<String, String>(MQDict.PRODUCER_TOPIC, "key" + Integer.toString(i), "value" + Integer.toString(i));// 异步发送producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {// exception == null代表消息发送成功System.out.println("消息发送成功......");} else {// 消息发送失败,执行错误的逻辑System.out.println("消息发送失败......");if (exception instanceof RetriableException) {// 处理可重试瞬时异常// ...} else {// 处理不可重试异常// ...}}}});}System.out.println("消息生产结束......");// 关闭kafkaProduce对象producer.close();System.out.println("关闭生产者......");}

7、Unrecognized VM option 'UseCompressedOops'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

异常解决:
vim kafka-run-class.sh

去掉这个配置
  -XX:+UseCompressedOops

8、IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive

https://blog.csdn.net/Dongguabai/article/details/86543698

9、kafka概念里面的注意项:

1.订阅消息可以订阅多个主题
2.ConsumerConfig.GROUP_ID_CONFIG表示消费者的分组,kafka根据分组名称判断是不是同一组消费者,同一组消费者去消费一个主题的数据的时候,数据将在这一组消费者上面轮询。
3.主题涉及到分区的概念,同一组消费者的个数不能大于分区数。因为:一个分区只能被同一群组的一个消费者消费。出现分区小于消费者个数的时候,可以动态增加分区。
4.注意和生产者的对比,Properties中的key和value是反序列化,而生产者是序列化。

10、Kafka常用操作命令

    1:查看当前服务器中的所有topic。--zookeeper master:2181指定zookeeper。bin/kafka-topics.sh --list --zookeeper  master:21812:创建topic。--partitions 3,指定三个分区。--replication-factor 1指定备份的副本数量。--topic topicTest,指定topic的名称。./kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 3 --topic topicTest3:删除topicbin/kafka-topics.sh --delete --zookeeper master:2181 --topic topicTest注意:需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。4:通过shell命令发送消息。生产者。--broker-list master:9092bin/kafka-console-producer.sh --broker-list master:9092 --topic topicTest 可能会报错(This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt),这个版本不支持key为null:./kafka-console-producer.sh --broker-list 192.168.20.112:9092 --topic testTopic1 \--property parse.key=true \--property key.separator=,发送消息格式(testkey,testvalue)5:通过shell消费消息。消费者。--from-beginning从最开始消费。bin/kafka-console-consumer.sh --zookeeper master:2181 --from-beginning --topic topicTest收到消息:testvalue获取全量的topic上的数据:./kafka-console-consumer.sh --zookeeper 192.168.20.112:2181 --topic topicTest --from-beginning6:查看消费位置bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper master:2181 --group testGroup7:查看某个Topic的详情bin/kafka-topics.sh --topic SimpleDemo3 --describe --zookeeper 192.168.20.112:2181#下面是显示信息Topic:ssports    PartitionCount:1    ReplicationFactor:2    Configs:Topic: SimpleDemo3    Partition: 0    Leader: 1    Replicas: 0,1    Isr: 1#分区数为1  复制因子为2   当前SimpleDemo3的分区为0 #Replicas: 0,1   复制的为0,18:kafka topic增加partition,修改topic分区数./kafka-topics.sh --alter --topic SimpleDemo3 --zookeeper 192.168.20.112:2181 --partitions 6 

9、基于kafka 2.2.0 实现的生产者和消费者代码案例

生产者(Producer):

public class ProducerSimpleDemo {public static void main(String[] args) {// 获取生产者Producer<String, String> producer = MQDict.getKafkaProducer();for (int i = 0; i < 500; i++) {// 方式1// 构造好kafkaProducer实例以后,下一步就是构造消息实例。生成 ProducerRecord 对象,并制定 Topic,key 以及 value// producer.send(new ProducerRecord<>(MQDict.PRODUCER_TOPIC, "key" + Integer.toString(i), "value" + Integer.toString(i)));// 方式2// 构造待发送的消息对象ProduceRecord的对象,指定消息要发送到的topic主题,分区以及对应的key和value键值对。 注意,分区和key信息可以不用指定,由kafka自行确定目标分区。ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(MQDict.PRODUCER_TOPIC, "key" + Integer.toString(i), "value" + Integer.toString(i));// 调用kafkaProduce的send方法发送消息producer.send(producerRecord);}System.out.println("消息生产结束......");// 关闭kafkaProduce对象producer.close();System.out.println("关闭生产者......");}}

消费者(KafkaConsumer):

public class ConsumerSimpleDemo {public static void main(String[] args) {// 获取消费者KafkaConsumer<String, String> consumer = MQDict.getKafkaConsumer();while (true) {ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_SECOND_OUT); // 拉取消息,阻塞时间5秒if (records.isEmpty())break;// 遍历消息并打印valuerecords.forEach(rec -> System.out.println("主题topic:" + rec.topic() + "; topickey:" + rec.key() + "; topicval:" + rec.value()));/*for (ConsumerRecord<String, String> record : records) {// 简单的打印输出System.out.println("offset = " + record.offset() + ",key = " + record.key() + ",value =" + record.value());}*/}// 关闭消费者consumer.close();}}

更多的Java操作kafka这里不再介绍,了解更多请参考:
git项目地址:更多的Java操作kafka
https://github.com/powerfuler/kafkaDemo.git

10、kafkaDemo项目介绍

此项目是熟悉kafka工具以及用Java调用kafkaApi进行消息的发送与消费的测试案例。

项目特性
1、kafka生产者、消费者简单demo,Java调用。
2、kafka同步、异步调用。
3、kafka自定义分区的使用。
4、kafka自定义指定序列化生产的消息。
5、kafka拦截器链的使用。
6、kafka消费者消费消息之每个线程维护一个KafkaConsumer实例,用户创建多个线程消费topic数据,每个线程都会创建专属该线程的KafkaConsumer实例。
7、单独KafkaConsumer实例和多worker线程。使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,之后worker线程完成处理后上报位移状态,由全局consumer提交位移。
8、获取所有kafka所有的Topic。
9、获取当前topic每个分区内最新的30条消息(如果topic额分区内有没有30条,就获取实际消息)。

每天努力一点,每天都在进步。

kafka踩坑、实践篇相关推荐

  1. php kafka storm,php的kafka踩坑(二)

    接上一篇文章,上次没有解决的一个问题就是在做一个队列的时候,存在多消费者消费到同一个消息的情况,今天终于解决了这个问题,问题的本质是因为运维给我创建的topic是有问题的,他创建的分区数量是0,我今天 ...

  2. 记录kafka踩坑:marking the coordinator (id rack null) dead for group

    工作中用java代码连接k8s集群中的kafka集群时消费者java代码一直报异常marking the coordinator (id rack null) dead for group.经过长时间 ...

  3. kafka php 教程,php的kafka踩坑(一)

    最近项目上有一个需要用到消息队列的功能,从网上找了一些php相关的kafka使用的教程和博客,大抵都是安装php的拓展librdkafka(这里就不讲这个拓展的安装方法了,搜一下还是有很多教程的),然 ...

  4. 部署Linux单机kafka踩坑

    kafka简介 kafka是Apache开发的一个开源流处理平台,也目前一种比较成熟的消息系统,被广泛用作消息中间件(现如今的大数据应用中很多都是采用zookeeper+kafka集群). 部署前置条 ...

  5. Springboot+阿里云kafka踩坑实录

    场景描述:上文写到,不断接收数据并存放到OSS,现在要把数据存到MQ的kafka一份. springboot版本为1.5.9. 开工之前先阅读阿里云官方kafka消息接入说明:https://help ...

  6. windows10 kafka 环境搭建 踩坑记录

    一.下载网站 http://kafka.apache.org/downloads 选择二进制下载 二.启动方式 分别启动Zookeeper.Kafka .\bin\windows\zookeeper- ...

  7. 第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密 java.lang.ClassNotFoundException 踩坑解决问题详细内幕版本

    第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密    /* * *王家林老师授课http://weibo.com/ilovepains */  每天晚上20: ...

  8. 东八区转为0时区_踩坑记 | Flink 天级别窗口中存在的时区问题

    ❝ 本系列每篇文章都是从一些实际的 case 出发,分析一些生产环境中经常会遇到的问题,抛砖引玉,以帮助小伙伴们解决一些实际问题.本文介绍 Flink 时间以及时区问题,分析了在天级别的窗口时会遇到的 ...

  9. 日常踩坑记录-汇总版

    开发踩坑记录,不定时更新 心得 RTFM 严谨的去思考问题,处理问题 严格要求自己的代码编写习惯与风格 注意 单词拼写 20200207 mybatis plus 自带insert插入异常 sql i ...

最新文章

  1. 使用.NET发送邮件
  2. jquery实现点击页面其他地方隐藏指定元素
  3. SpringRestTemplate用法详解
  4. python04-列表与元祖
  5. 一起谈.NET技术,ASP.NET Eval如何进行数据绑定
  6. python装饰器原理-Python 装饰器工作原理解析
  7. 经典网页设计:10个响应式设计的国外购物网站
  8. vue 导出html
  9. [.NET领域驱动设计实战系列]专题二:结合领域驱动设计的面向服务架构来搭建网上书店...
  10. 一个函数统一238个机器学习R包,这也太赞了吧
  11. ​esquisse: 快速可视化图形的 Rstudio 插件
  12. sql 树状结构中知道 父节点与孙节点_数据结构之(树)
  13. AngularJS中$apply
  14. SourceTree 密码填写正确但是拉取显示密码错误解决
  15. python批量图片自动编码
  16. C4D快速入门教程——可编辑模式
  17. 高响应比优先调度算法以及其优缺点
  18. iOS 多线程面试题
  19. 什么样的人适合学编程?
  20. CDlinux万能无线破解系统iSO中文版 U盘启动版

热门文章

  1. 计算机科学与技术影视,影视作品可视化研究-计算机科学与技术专业论文.docx
  2. 【有利可图网】PS教程:用PS合成立体特效的穿插照片效果
  3. 机器学习之算法优化(一)
  4. 初创公司必看 杭州靠谱2B创业梯队
  5. qq拼音输入法引起的ctrl键粘滞问题
  6. 【无标题】Android10 编译错误
  7. 关于访问 WIN10共享打印机提示 无法连接到打印机的问题
  8. TED | 深度拖延症患者的自白(附视频演讲稿)
  9. SEO教程:如何优化长尾关键词达到快速排名(干货)
  10. [FROM WOJ]#2040 山贼集团