spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access
问题描述
spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
报错信息如图:
代码
object testScala {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("fs01").setMaster("local[*]")conf.set("spark.streaming.stopGracefullyOnShutdown", "true")conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")conf.set("spark.streaming.kafka.maxRatePerPartition","10")val sc = new SparkContext(conf)sc.setLogLevel("ERROR")val interval = PropertiesUtils.loadProperties("streaming.interval").toLongval ssc:StreamingContext = new StreamingContext(sc, Seconds(5))val kalfa_server_list: String = PropertiesUtils.loadProperties("kafka.broker.list")val kafka_group: String = "group_test_role_1"val kafkaParams = Map[String, Object]("bootstrap.servers" -> kalfa_server_list, "key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> kafka_group, //消费者组名kafka.groupId"auto.offset.reset" -> "earliest", //earliest可以获取历史数据"enable.auto.commit" -> "false") //如果是true,则这个消费者的偏移量会在后台自动提交val topics = Array("t_2021-09") var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent, //多数时候采用该方式,在所有可用的executor上均匀分配kafka的主题的所有分区。Subscribe[String, String](topics, kafkaParams))val cacheOper = kafkaStream.transform(rdd=>{offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval check = rdd.map(x => {x.value()})//.cache()check}) cacheOper.window(Seconds(20),Seconds(10)).foreachRDD(rdd=>{rdd.foreach(x=>{println("data:"+x)Thread.sleep(2000L)})kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})ssc.start()ssc.awaitTermination()}
}
解决办法
添加 conf.set(“spark.streaming.kafka.consumer.cache.enabled”, “false”)
这个问题发生的原因是spark 缓存问题,可以查看官网spark streaming整合kafka官网地址
spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access相关推荐
- 【Flink】flink消费kafka报错 KafkaConsumer.assign Ljava/util/List
文章目录 1.概述 1.概述 flink消费kafka上数据时报错: Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients. ...
- 【Spark】Spark Stream读取kafka写入kafka报错 AbstractMethodError
1.概述 根据这个博客 [Spark]Spark 2.4 Stream 读取kafka 写入kafka 报错如下 Exception in thread "main" java.l ...
- spark streaming运行kafka数据源
一.Kafka准备工作 Kafka的安装,请看另外一文,一定要选择和自己电脑上已经安装的scala版本号一致才可以,本教程安装的Spark版本号是1.6.2,scala版本号是2.10,所以,一定要选 ...
- 【Flink】Flink 写入 kafka 报错 The server disconnected before a response was received
文章目录 1.场景再现 1.1.概述 1.场景再现 1.1.概述 Flink 写入 kafka 报错 The server disconnected before a response was rec ...
- Flink读取Kafka报错:KafkaException ByteArrayDeserializer is not an instance Deserializer
1.视界 2.背景 做flink读取kafka报错 org.apache.kafka.common.KafkaException: Failed to construct kafka consumer ...
- 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义
注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...
- Structured Streaming整合kafka
Structured Streaming整合kafka Spark2.0以后开始推出Structured Streaming,详情参考上文Spark2.0 Structured Streaming.本 ...
- Spark Streaming使用Kafka保证数据零丢失
为什么80%的码农都做不了架构师?>>> 源文件放在github,随着理解的深入,不断更新,如有谬误之处,欢迎指正.原文链接https://github.com/jacksu/ ...
- spark streaming 消费 kafka入门采坑解决过程
kafka 服务相关的命令 # 开启kafka的服务器 bin/kafka-server-start.sh -daemon config/server.properties & # 创建top ...
最新文章
- 谈谈 Python 程序的运行原理
- [网络安全提高篇] 一〇八.Powershell和PowerSploit脚本渗透详解 (1)
- nginx平滑升级make upgrade出错的解决办法
- 北信源管理网页卸载密码_Homebrew: 一行代码实现mac软件管理
- Android 自定义下拉刷新
- wx:for双层循环
- C语言 rand和srand
- 110个oracle常用函数总结(7),oracle110个最常用函数
- 【TSP】基于matlab遗传和模拟退火算法求解旅行商问题【含Matlab源码 696期】
- liunx中的gcc命令
- 明华RD-EB读写器-读写代码
- 明华M1读卡器操作基本方法
- mysql触发器联机删除_mysql触发器删除实例1
- selenium+chromedriver实现自动填写问卷星问卷
- 高刷新率电视机有必要吗?
- 电脑键盘部分按键失灵_键盘失灵_电脑键盘失灵怎么办_电脑键盘失灵_笔记本键盘失灵怎么办-太平洋IT百科...
- 如何提高信号发生器(信号源)测量时的幅度精度
- VMX 进程已提前退出。VMware Workstation 无法连接到虚拟机。请确保您有权运行该程序、访问该程序使用的所有目录以及访问所有临时文件目录。
- [CTSC2010]珠宝商 SAM+后缀树+点分治
- HTML网页表格标签,HTML静态网页(标签、表格)
热门文章
- Didn't find class android.support.v7.widget.RecyclerView 解决办法 ———————————————— 版权声明:本文为CSDN博主「eag
- vs2010打开需要安装 service pack1
- Mysql中的七种常用查询连接详解
- SEO优化转战移动手机站
- 数据仓库工程师基本技能
- mysql error :1114 - The table ‘XXX‘ is full
- 雷军微博拧螺丝CFO为粉丝数发愁
- python输入姓名_在Python中解析人的名字和姓氏
- android 雷达图 蜘蛛图
- 餐馆会员管理系统 - MySQL数据库课程设计