问题描述

spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
报错信息如图:

代码
object testScala {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("fs01").setMaster("local[*]")conf.set("spark.streaming.stopGracefullyOnShutdown", "true")conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")conf.set("spark.streaming.kafka.maxRatePerPartition","10")val sc = new SparkContext(conf)sc.setLogLevel("ERROR")val interval = PropertiesUtils.loadProperties("streaming.interval").toLongval ssc:StreamingContext = new StreamingContext(sc, Seconds(5))val kalfa_server_list: String = PropertiesUtils.loadProperties("kafka.broker.list")val kafka_group: String = "group_test_role_1"val kafkaParams = Map[String, Object]("bootstrap.servers" ->  kalfa_server_list, "key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" ->  kafka_group, //消费者组名kafka.groupId"auto.offset.reset" -> "earliest", //earliest可以获取历史数据"enable.auto.commit" -> "false") //如果是true,则这个消费者的偏移量会在后台自动提交val topics = Array("t_2021-09") var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent, //多数时候采用该方式,在所有可用的executor上均匀分配kafka的主题的所有分区。Subscribe[String, String](topics, kafkaParams))val cacheOper = kafkaStream.transform(rdd=>{offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval check = rdd.map(x => {x.value()})//.cache()check}) cacheOper.window(Seconds(20),Seconds(10)).foreachRDD(rdd=>{rdd.foreach(x=>{println("data:"+x)Thread.sleep(2000L)})kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})ssc.start()ssc.awaitTermination()}
}
解决办法

添加 conf.set(“spark.streaming.kafka.consumer.cache.enabled”, “false”)
这个问题发生的原因是spark 缓存问题,可以查看官网spark streaming整合kafka官网地址

spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access相关推荐

  1. 【Flink】flink消费kafka报错 KafkaConsumer.assign Ljava/util/List

    文章目录 1.概述 1.概述 flink消费kafka上数据时报错: Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients. ...

  2. 【Spark】Spark Stream读取kafka写入kafka报错 AbstractMethodError

    1.概述 根据这个博客 [Spark]Spark 2.4 Stream 读取kafka 写入kafka 报错如下 Exception in thread "main" java.l ...

  3. spark streaming运行kafka数据源

    一.Kafka准备工作 Kafka的安装,请看另外一文,一定要选择和自己电脑上已经安装的scala版本号一致才可以,本教程安装的Spark版本号是1.6.2,scala版本号是2.10,所以,一定要选 ...

  4. 【Flink】Flink 写入 kafka 报错 The server disconnected before a response was received

    文章目录 1.场景再现 1.1.概述 1.场景再现 1.1.概述 Flink 写入 kafka 报错 The server disconnected before a response was rec ...

  5. Flink读取Kafka报错:KafkaException ByteArrayDeserializer is not an instance Deserializer

    1.视界 2.背景 做flink读取kafka报错 org.apache.kafka.common.KafkaException: Failed to construct kafka consumer ...

  6. 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义

    注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...

  7. Structured Streaming整合kafka

    Structured Streaming整合kafka Spark2.0以后开始推出Structured Streaming,详情参考上文Spark2.0 Structured Streaming.本 ...

  8. Spark Streaming使用Kafka保证数据零丢失

    为什么80%的码农都做不了架构师?>>>    源文件放在github,随着理解的深入,不断更新,如有谬误之处,欢迎指正.原文链接https://github.com/jacksu/ ...

  9. spark streaming 消费 kafka入门采坑解决过程

    kafka 服务相关的命令 # 开启kafka的服务器 bin/kafka-server-start.sh -daemon config/server.properties & # 创建top ...

最新文章

  1. 谈谈 Python 程序的运行原理
  2. [网络安全提高篇] 一〇八.Powershell和PowerSploit脚本渗透详解 (1)
  3. nginx平滑升级make upgrade出错的解决办法
  4. 北信源管理网页卸载密码_Homebrew: 一行代码实现mac软件管理
  5. Android 自定义下拉刷新
  6. wx:for双层循环
  7. C语言 rand和srand
  8. 110个oracle常用函数总结(7),oracle110个最常用函数
  9. 【TSP】基于matlab遗传和模拟退火算法求解旅行商问题【含Matlab源码 696期】
  10. liunx中的gcc命令
  11. 明华RD-EB读写器-读写代码
  12. 明华M1读卡器操作基本方法
  13. mysql触发器联机删除_mysql触发器删除实例1
  14. selenium+chromedriver实现自动填写问卷星问卷
  15. 高刷新率电视机有必要吗?
  16. 电脑键盘部分按键失灵_键盘失灵_电脑键盘失灵怎么办_电脑键盘失灵_笔记本键盘失灵怎么办-太平洋IT百科...
  17. 如何提高信号发生器(信号源)测量时的幅度精度
  18. VMX 进程已提前退出。VMware Workstation 无法连接到虚拟机。请确保您有权运行该程序、访问该程序使用的所有目录以及访问所有临时文件目录。
  19. [CTSC2010]珠宝商 SAM+后缀树+点分治
  20. HTML网页表格标签,HTML静态网页(标签、表格)

热门文章

  1. Didn't find class android.support.v7.widget.RecyclerView 解决办法 ———————————————— 版权声明:本文为CSDN博主「eag
  2. vs2010打开需要安装 service pack1
  3. Mysql中的七种常用查询连接详解
  4. SEO优化转战移动手机站
  5. 数据仓库工程师基本技能
  6. mysql error :1114 - The table ‘XXX‘ is full
  7. 雷军微博拧螺丝CFO为粉丝数发愁
  8. python输入姓名_在Python中解析人的名字和姓氏
  9. android 雷达图 蜘蛛图
  10. 餐馆会员管理系统 - MySQL数据库课程设计