1.概述

一个网友的问题,然后我帮他解决,后来没告诉我后面结果如何了,先转载记录一下
转载:
https://blog.csdn.net/github_32521685/article/details/89953671

产生该问题的原因主要是zookeeper中存在旧版本的kafka-connect topic信息,导致新版本的kafka-connect启动异常:

ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-configs,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitionsat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1109)at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:256)at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:69)at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:327)

解决办法:

(1) 使用kafka命令列出所有与connect相关的topic:

bin/kafka-topics.sh --list --zookeeper 10.255.8.102:2181

输出:

__consumer_offsets
ambari_kafka_service_check
connect-configs
connect-offsets
connect-status

(2)使用kafka命令删除所有与connect相关的topic:

bin/kafka-topics.sh --delete --zookeeper 10.255.8.102:2181 --topic connect-configs
bin/kafka-topics.sh --delete --zookeeper 10.255.8.102:2181 --topic connect-offsets
bin/kafka-topics.sh --delete --zookeeper 10.255.8.102:2181 --topic connect-status

最后验证是否删除:

bin/kafka-topics.sh --list --zookeeper 10.255.8.102:2181

输出:

__consumer_offsets
ambari_kafka_service_check
connect-configs - marked for deletion
connect-offsets - marked for deletion
connect-status - marked for deletion

输出信息中显示三个connect topic已被标记删除了,要想彻底删除,需要在kafka的server.properties配置文件里设置delete.topic.enable=true

2.分析原因

从指定的主题或者分区获取数据,在poll之前,你没有订阅任何主题或分区是不行的,每一次poll,消费者都会尝试使用最后一次消费的offset作为接下来获取数据的start offset,最后一次消费的offset也可以通过seek(TopicPartition, long)设置或者自动设置
通过源码可以找到:

public ConsumerRecords<K, V> poll(long timeout) {acquire();try {if (timeout < 0)throw new IllegalArgumentException("Timeout must not be negative");// 如果没有任何订阅,抛出异常if (this.subscriptions.hasNoSubscriptionOrUserAssignment())throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");// 一直poll新数据直到超时long start = time.milliseconds();// 距离超时还剩余多少时间long remaining = timeout;do {// 获取数据,如果自动提交,则进行偏移量自动提交,如果设置offset重置,则进行offset重置Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);if (!records.isEmpty()) {// 再返回结果之前,我们可以进行下一轮的fetch请求,避免阻塞等待fetcher.sendFetches();client.pollNoWakeup();// 如果有拦截器进行拦截,没有直接返回if (this.interceptors == null)return new ConsumerRecords<>(records);elsereturn this.interceptors.onConsume(new ConsumerRecords<>(records));}long elapsed = time.milliseconds() - start;remaining = timeout - elapsed;} while (remaining > 0);return ConsumerRecords.empty();} finally {release();}
}

【kafka】Consumer is not subscribed to any topics相关推荐

  1. 【Kafka】Consumer group 'console-consumer-28367' has no active members.

    1.美图 2.概述 想查看某个消费组的详情,结果报错 (base) lcc@lcc kafka_2.11-1.1.0$ bin/kafka-consumer-groups.sh --new-consu ...

  2. 【kafka】Kafka consumer处理大消息数据过大导致消费停止问题

    文章目录 1.概述 2.案例分析 3.kafka的设计初衷 3.1 broker 配置 3.2 Consumer 配置 M.扩展 1.概述 转载:https://www.cnblogs.com/wyn ...

  3. 【Kafka】消费者组 Consumer Group(三)

    文章目录 一.概念 二.创建Topic 三.开启3个Consumer的消费者组Group (分区数量1 < 消费者数量3) 3.1 结论一:一个分区Partition只能被一个消费者抢占.(组内 ...

  4. 【kafka】Kafka Fetch Session 剖析

    1.概述 转载:https://www.cnblogs.com/smartloli/p/14352489.html 1.概述 最近有同学留言在使用Kafka的过程中遇到一些问题,比如在拉取的Topic ...

  5. 【Kafka】Kafka 使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化

    1.概述 请参考:https://www.jianshu.com/p/a70950bab06d [Kafka]Kafka 使用传统的 avro API 自定义序列化类和反序列化类 比较麻烦,需要根据 ...

  6. 【Kafka】第三篇-Kafka的集群及Canal介绍

    [上一章 [Kafka]第二篇-Kafka的核心概念及分区消费规则] 学习路线 Kafka集群架构 Kafka集群环境 1.kafka是一个压缩包,直接解压即可使用,所以我们就解压三个kafka: 2 ...

  7. 【Kafka】从kafka中读取最新数据

    [Kafka]从kafka中读取最新数据 一.死循环无限拉取kafka数据 1.1 整体框架剖析 1.2 测试 二.@KafkaListener注解 实现监听kafka数据 三.参考资料 前情提要:我 ...

  8. 【kafka】Kafka 幂等 Producer

    1.概述 [Kafka]Kafka幂等性原理及实现剖析 [kafka]Kafka 事务性之幂等性实现 官网:Idempotent Producer 2.简介 Kafka提供了"至少一次&qu ...

  9. 【Kafka】Kafka 如果 动态 不停止的情况下 修改 消费组 offset

    文章目录 1.概述 2.方案1 3.方案2 1.概述 测试点:有人遇到这样的情况,他一个消费者正在消费一个环境的topic,然后他想启动另外一个消费组,但是是使用了同一个消费组,这个去更改 消费组of ...

最新文章

  1. 支持比特币支付的商家中有90%接受了BCH
  2. 【数据立方】数据立方体的有效计算、物化materialization,索引OLAP数据
  3. ImportError: Could not find ‘cudart64_100.dll报错
  4. [BeiJing2010组队]次小生成树 Tree
  5. windows socket 简单实例
  6. Blockquotes,引用,html里面,经常用到的一个!
  7. leetcode 236. 二叉树的最近公共祖先 思考分析
  8. 数字图像处理总结(冈萨雷斯版)
  9. Android客户端应用享用传统Web服务
  10. 机智云获取树莓派传来的数据_哪些数据对云来说太冒险了?
  11. php怎么自己写框架,PHP学习笔记,自己动手写个MVC的框架
  12. 自学python顺序-要成为一名Python程序员,要学习哪些内容,学习顺序是怎样的?...
  13. VMware安装及使用详细教程
  14. ai人工智能开发_面向开发人员的十大人工智能(AI)工具
  15. android手机 滚动截屏,安卓手机如何滚动截屏?看完图解一秒学会!
  16. 对C++一脸懵逼却又无比热爱的第一篇
  17. APP开发者应办理许可或备案手续
  18. Android .9图片使用报错...报错:AAPT: error: file failed to compile.
  19. 高效程序员系列 别做机器人——让工作自动化
  20. 电脑图片无损放大怎么操作 ?怎么无损放大图片?

热门文章

  1. 美团自研无人机登场:15分钟配送到家
  2. 苹果推出App Store搜索建议功能
  3. 华为P50渲染图曝光:后背丑哭?
  4. 小米“小仙女”来了:强大美颜 女性专属的定制手机
  5. 华硕Zenfone 6曝光:滑盖再度现身市场
  6. 突然!锤子科技天猫官方旗舰店商品全线下架 店铺撤店?!
  7. c++中的system函数
  8. python emoji 表情过滤
  9. Python自动单元测试框架
  10. matlab如何把实验结果记录在文件中,实验一Matlab基本操作