项目场景:

使用sparkStream接收kafka的数据进行计算,并且打包上传到linux进行spark任务的submit


错误集合:

1.错误1:

Failed to add file:/usr/local/spark-yarn/./myapp/sparkDemo04.jar to Spark environment
java.io.FileNotFoundException: Jar D:\usr\local\spark-yarn\myapp\sparkDemo04.jar not found
WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped

2.windows下ideal中在yarn模式下运行代码出错,显示如下报错

WARN CheckpointReader: Error reading checkpoint from file hdfs://hadoop102:9000/checkpoint6/checkpoint-1637834226000
java.io.IOException: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.streaming.dstream.MappedDStream.mapFunc of type scala.Function1 in instance of org.apache.spark.streaming.dstream.MappedDStream

3.报的一些kafka包notfound的问题,这个下面就不讨论了,只需要把对应的包下载后放到spark目录下的jars文件中即可,比如常见的

java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater

都可以通过添加包的方式解决,如果是spark shell里面出现这种错误,则需要在输入spark-shell命令时,在后面添加 --jars 包路径
最初的代码:

import com.study.stream05_kafka.SparkKafka.createSSC
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.lang.System.getProperty
import scala.collection.mutable.ListBufferobject stream05_kafka {object SparkKafka{def createSSC(): _root_.org.apache.spark.streaming.StreamingContext={//    TODO 创建环境对象//    StreamingContext创建时,第一个参数表示环境配置,第二个是数据采集周期val sparkConf = new SparkConf().setMaster("local[*]").setAppName("kafka2")sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")sparkConf.set("spark.hadoop.fs.defaultFS","hdfs://hadoop102:9000")sparkConf.set("spark.hadoop.yarn.resoursemanager.address","hadoop103:8088")val streamingContext: StreamingContext = new StreamingContext(sparkConf, Seconds(3))streamingContext.checkpoint("hdfs://hadoop102:9000/checkpoint6")val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "second","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")//    TODO 逻辑处理val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("sparkOnKafka"), kafkaPara))val num: DStream[String] = kafkaDS.map(_.value())val result = num.map(line=>{val flows = line.split(",")val up=flows(1).toIntval down=flows(2).toInt(flows(0),(up,down,up+down))}).updateStateByKey((queueValue, buffValue: Option[(Int,Int,Int)]) => {val cur=buffValue.getOrElse((0,0,0))var curUp=cur._1var curDown=cur._2for (elem <- queueValue) {curUp+=elem._1curDown+=elem._2}Option((curUp,curDown,curUp+curDown))})result.print()streamingContext}}def main(args: Array[String]): Unit = {println("**************")Logger.getLogger("org.apache.spark").setLevel(Level.WARN)System.getProperties.setProperty("HADOOP_USER_NAME", "hadoop")val streamingContext = StreamingContext.getActiveOrCreate("hdfs://hadoop102:9000/checkpoint6", ()=>createSSC())streamingContext.start()//    2.等待关闭streamingContext.awaitTermination()}}

原因分析:

首先,这里指出如果要打包到linux 下在yarn模式下进行spark的submit,需要设置master为yarn,至于是yarn-client还是yarn-cluster需要提交任务时指定,默认是client。我这里写成local,所以一开始都是windows下可以正常连接kafka拿到数据进行计算,但是linux下就不行了。归根结底没有连接yarn。
1.错误1是因为windows下spark任务提交的时候,找不到你的jar包,试想一下spark的spark-submit命令,需要指定jar包以及class
2.这个是序列化问题还是广播变量不适合于检查点的问题,查资料发现广播变量的内容写入hdfs后就难以恢复了,这里可以把错误定位到StreamingContext.getActiveOrCreate里面,这里有时候可以正常进行数据恢复,但是有时候就会报错。解决方法还没找到,我就直接换检查点路径了,一般生产环境下也只有代码升级的情况下会关闭流计算,这里就没有深究,希望大神可以解答一下。猜测是读取检查点数据的时候序列化出了问题

解决方案:

错误1的解决:所以如果要在windows下运行,需要先使用mvn package或者build artifacts对程序进行打包,然后对sparkConf.setJars指定包的路径,这样在windows下就可以正常运行了
错误2的解决:这里我就换检查点了
最后贴一下我最终成功运行的代码

import com.study.stream05_kafka.SparkKafka.createSSC
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.lang.System.getProperty
import scala.collection.mutable.ListBufferobject stream05_kafka {object SparkKafka{def createSSC(): _root_.org.apache.spark.streaming.StreamingContext={//    TODO 创建环境对象//    StreamingContext创建时,第一个参数表示环境配置,第二个是数据采集周期val sparkConf = new SparkConf().setMaster("yarn").setAppName("kafka2").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")sparkConf.set("spark.hadoop.fs.defaultFS","hdfs://hadoop102:9000")sparkConf.set("spark.hadoop.yarn.resoursemanager.address","hadoop103:8088")val streamingContext: StreamingContext = new StreamingContext(sparkConf, Seconds(3))streamingContext.checkpoint("hdfs://hadoop102:9000/checkpoint7")val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "second","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")//    TODO 逻辑处理val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("sparkOnKafka"), kafkaPara))val num: DStream[String] = kafkaDS.map(_.value())val result = num.map(line=>{val flows = line.split(",")val up=flows(1).toIntval down=flows(2).toInt(flows(0),(up,down,up+down))}).updateStateByKey((queueValue, buffValue: Option[(Int,Int,Int)]) => {val cur=buffValue.getOrElse((0,0,0))var curUp=cur._1var curDown=cur._2for (elem <- queueValue) {curUp+=elem._1curDown+=elem._2}Option((curUp,curDown,curUp+curDown))})result.print()streamingContext}}def main(args: Array[String]): Unit = {println("**************")Logger.getLogger("org.apache.spark").setLevel(Level.WARN)System.getProperties.setProperty("HADOOP_USER_NAME", "hadoop")val streamingContext = StreamingContext.getActiveOrCreate("hdfs://hadoop102:9000/checkpoint7", ()=>createSSC())
//    new Thread(new MonitorStop(streamingContext)).start()streamingContext.start()//    2.等待关闭streamingContext.awaitTermination()}}

另外,打包的时候不要添加setJars,否则还是会报错,报的是什么已经忘了,这篇博客也是在我解决问题之后写的,没有记录太多报错,如果我没记错的话可能会报这种错误

cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

困惑:

为了解决这个bug,也是在yarn日志和spark日志来回看,看了一天,最让我头疼的就是spark-submit使用control+z退出后,spark-submit进行还会在后台运行,我都怀疑是不是我的kill -9 操作使检查点损坏导致数据恢复失败的,请问各路大神怎么才能结束sparkSubmit进程?

spark on yarn模式下SparkStream整合kafka踩的各种坑(已解决)_fqzzzzz的博客相关推荐

  1. Spark on Yarn 模式编写workcount实例

    Spark on Yarn 模式编写workcount实例 一:上传输入文件到hadoop,本例上传的文件名为spark.txt 二:打包程序,打包名为:Spark-0.0.1-SNAPSHOT.ja ...

  2. Spark基础学习笔记06:搭建Spark On YARN模式的集群

    文章目录 零.本讲学习目标 一.在Spark Standalone模式的集群基础上修改配置 二.运行Spark应用程序 (一)启动Hadoop的HDFS和YARN (二)运行Spark应用程序 (三) ...

  3. 关于spark yarn模式下的常用属性

    前言 整理了spark官网提供的一些常用的spark属性. Spark属性 属性名 默认值 描述 spark.yarn.am.memory 512m 在Client模式下用于YARN Applicat ...

  4. 【FLINK 】 Flink on YARN模式下TaskManager的内存分配

    解决背景: 总的ytm分配的不变的情况下怎么划分给堆内内存JVM 一个更大的内存空间 对于心急的同学来说,我们直接先给一个解决方案,后面想去了解的再往下看: 原来的命令,-ytm 8192,分配给ta ...

  5. Spark的Yarn模式及其案例

    目录 基本概念 Yarn模式搭建 1. 解压缩文件 2.修改配置文件 启动集群 测试Spark中examples案例 1. 提交应用 2.Web 页面查看日志 配置历史服务器 1.具体步骤 2.重新提 ...

  6. vi插入模式下的backspace键和方向键“不正常”使用解决方法

    在新装的ubuntu系统使用vi编辑器编辑文本时,会出现退格键(backspace)和上下左右方向键不好用情况,例如退格键不能删除前面的字母,方向键不能移动光标(在命令模式下可以用h.j. k. l键 ...

  7. MacOS深色模式下微信文章页面背景变为黑色问题的解决

    由于长期伏案,颈椎问题比较严重,所以平时都尽量减少低头看手机的时间,也就养成了用浏览器看各种读物的习惯,其中就包括公众号文章.最近不知道从哪一天开始,突然发现公众号文章的背景突然变成了黑色. 就像这样 ...

  8. Visual Studio在Release模式下开启debug调试,编译器提示变量已被优化掉,因而不可用

    系列文章目录 文章目录 系列文章目录 前言 一.解决办法 1.修改工程属性 参考 前言 我们在编写代码的时候,如果用到别人的库,而别人只提供了release版本,所有我们也只能生成release版本的 ...

  9. latex如何设置字体并加粗_Latex设置字体大小,加粗,加下划线,变斜体_孩纸气_新浪博客...

    Latex 设置字体大小命令由小到大依次为: \tiny \scriptsize \footnotesize \small \normalsize \large \Large \LARGE \huge ...

最新文章

  1. Allure Report使用
  2. Linux下的rsync远程增量备份详解
  3. 列出5个python标准库_Python常用标准库使用(一)
  4. Winform中怎样设置ContextMenuStrip右键菜单的选项ToolStripMenuItem添加照片
  5. 利用DelegatingHandler实现Web Api 的Api key校验
  6. 斐波那契博弈(证明+结论)
  7. python里随机抽取样本_概率分布和抽样分布基础知识及Python实现
  8. umi config.js整体defineConfig配置
  9. C#设计模式之15-解释器模式
  10. 学习记录-网络基础知识(1)
  11. 《软件测试技术大全:测试基础 流行工具 项目实战(第3版)》—第1章1.2节软件测试的发展...
  12. centos编译安装配置支持ssl加密的mysql replication
  13. Zabbix(简介和ubuntu安装步骤)
  14. POJ 3294 Life Forms
  15. 【Web技术】969- 如何实现高性能的在线 PDF 预览
  16. 【安全攻防知识-4】CTF之MISC
  17. MSF给正常程序添加后门
  18. 用Qt操作Word文档
  19. 数据挖掘与python实践心得体会_2年数据挖掘服务工作心得体会
  20. 事务的基本概念及Mysql事务实现原理

热门文章

  1. webrtc rtc_base Copy_on_write_Buffe类功能剖析
  2. Verilog:generate-for 语句(用法,及与for语句区别)
  3. SheetJS集成的Export2Excel框架的导出并修改单元格格式为文本简单记录
  4. java中的mvc和三层结构究竟是什么关系
  5. LRC (文件格式)
  6. 解决python爬虫requests.exceptions.SSLError: HTTPSConnectionPool(host=‘XXXXXXX‘, port=443)问题
  7. Exchange Online功能介绍
  8. 高考作文:如何下好“数据治理”这盘大棋?
  9. 宝塔 开启xdebug_XDebug的配置和使用
  10. Elasticsearch 同义词(dynamic-synonym插件)远程热词更新