Consumer is not subscribed to any topics or assigned any partitions
版本:
报错信息:
分析原因:
public ConsumerRecords<K, V> poll(long timeout) {acquire();try {if (timeout < 0)throw new IllegalArgumentException("Timeout must not be negative");// 如果没有任何订阅,抛出异常if (this.subscriptions.hasNoSubscriptionOrUserAssignment())throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");// 一直poll新数据直到超时long start = time.milliseconds();// 距离超时还剩余多少时间long remaining = timeout;do {// 获取数据,如果自动提交,则进行偏移量自动提交,如果设置offset重置,则进行offset重置Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);if (!records.isEmpty()) {// 再返回结果之前,我们可以进行下一轮的fetch请求,避免阻塞等待fetcher.sendFetches();client.pollNoWakeup();// 如果有拦截器进行拦截,没有直接返回if (this.interceptors == null)return new ConsumerRecords<>(records);elsereturn this.interceptors.onConsume(new ConsumerRecords<>(records));}long elapsed = time.milliseconds() - start;remaining = timeout - elapsed;} while (remaining > 0);return ConsumerRecords.empty();} finally {release();}
}
val inputDStream1 = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets) ) 修改:(全新的topic,没有被消费者消费过) val inputDStream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams) )
转载于:https://www.cnblogs.com/niutao/p/10547499.html
Consumer is not subscribed to any topics or assigned any partitions相关推荐
- 【kafka】Consumer is not subscribed to any topics
1.概述 一个网友的问题,然后我帮他解决,后来没告诉我后面结果如何了,先转载记录一下 转载: https://blog.csdn.net/github_32521685/article/details ...
- Consumer is not subscribed to any topics
为什么80%的码农都做不了架构师?>>> 产生该问题的原因主要是zookeeper中存在旧版本的kafka-connect topic信息,导致新版本的kafka-connec ...
- kafka consumer配置拉取速度慢_Kafka消费者的使用和原理
这周我们学习下消费者,仍然还是先从一个消费者的Hello World学起: public class Consumer { public static void main(String[] args) ...
- 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,
目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...
- KafkaConsumer分析
一 重要的字段 String clientId:Consumer唯一标识 ConsumerCoordinator coordinator: 控制Consumer与服务器端GroupCoordinato ...
- kafka使用_Kafka 消费者的使用和原理
继上周的<Kafka 生产者的使用和原理>,这周我们学习下消费者,仍然还是先从一个消费者的Hello World学起: public class Consumer { public sta ...
- Kafka消费者的使用和原理
作者 | 草捏子 来源 | 草捏子(ID:chaycao) 头图 | CSDN 下载自视觉中国 这周我们学习下消费者,还是先从一个消费者的Hello World学起: public class Co ...
- KafkaConsumer源码翻译(中英对照)
阅读kakka源码时,英语不好,翻译的比较费劲,经常翻译了后面的,忘记了前面说的什么了,等回到前面之后,前面说的是什么,英文又不认识了,在这里做一个记录,慢慢写吧,估计会写的很艰难,目前更新的不多,哎 ...
- Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析
文章目录 前言 1. 消费者负载均衡的实现 2. 源码分析 2.1 KafkaConsumer 的初始化 2.2 KafkaConsumer 的消息拉取 2.2.1 消息拉取的准备及入口 2.2.2 ...
- 你绝对能看懂的Kafka源代码分析-KafkaConsumer类代码分析
目录: <Kafka Producer设计分析> <KafkaProducer类代码分析> <RecordAccumulator类代码分析> <Sender类 ...
最新文章
- 【JavaSE04】Java中循环语句for,while,do···while-练习2
- JS最新的身份证验证代码
- java接口文件定义类_Java入门笔记(四)类、包和接口
- 做了全职妈妈后,你的生活将有5个方面的变化
- ::-webkit-scrollbar 滚动条的设置
- Activity启动模式singleTask模式
- 纯CSS中的可视数据库库
- 博客做外链不收录怎么办,如何利用博客做外链
- 三星GalaxyTab3 7.0(WIFI) SM-T210 刷机 ROM 教程 附带港版官方ROM
- Vue 制作滚动字幕,用于展示通知内容
- SQL注入点判断及万能密码
- android studio调试,华为手机连接电脑找不到HDB interface的解决方案
- java List删除元素问题及解决办法
- Leetcode题库1823. 找出游戏的获胜者(约瑟夫环 C实现)
- 详解one-hot编码及代码举例
- TCP/IP之蓟辽督师
- antdvue走马灯一页显示多张图片的效果
- 外观模式(Facade)----设计模式
- GeoEye-1 卫星照片来了 - 异常清晰
- 模拟微信拼手气红包程序
热门文章
- Python视频转换分辨率(附代码) | Python工具
- 现代治理10.0:Diligent研究院报告称,进入董事会的少数女性董事在董事会领导职务的角逐中超越男性董事
- Hulu推荐 | 《破产姐妹》Max演员新剧《娃娃脸》
- 大牛C++编程开发学习建议50条http://blog.csdn.net/gggg_ggg/article/details/44015409
- html底部友情链接代码,wordpress友情链接添加到页面底部的方法
- html中图片左右切换,超简单的图片左右切换滑动
- Processing简介和环境搭建
- GVIDO MUSIC向德意志唱片公司成立120周年 Special Gala Concert Presented by 小泽征尔 斋藤纪念管弦乐团提供赞助
- perl data::dumper 模块使用
- 【Unity】3D雷霆战机