Spark踩坑填坑-聚合函数-序列化异常

  • 一、Spark聚合函数特殊场景
  • 二、spark sql group by
  • 三、Spark Caused by: java.io.NotSerializableException 序列化异常踩过的坑
    • 序列异常天坑1(网上常见的)
    • 序列异常天坑2
    • 序列异常天坑3
  • 四、org.apache.spark.SparkException: Task not serializable


部分内容原文地址:

AISeekOnline:Spark Caused by: java.io.NotSerializableException 序列化异常踩过的坑
zhou_jun:Spark运行程序异常信息: org.apache.spark.SparkException: Task not serializable 解决办法



一、Spark聚合函数特殊场景

在对数据进行统计分析时,如果对指标进行聚合运算,而待查询的字段中还包含了维度,则原则上我们还需要按照维度字段进行分组。倘若这个聚合运算为sum函数,分组之后就相当于分类汇总了。有一种特殊场景是我们对指标执行了sum聚合,查询字段也包含了维度,但我们不希望对维度分组。例如:

select name, role, sum(income) from employee

虽然返回的结果挺奇怪,因为它事实上是针对整张表的income进行了求和运算,与name、role无关。查询结果中返回的其实是第一条记录的name与role。但至少在MySQL中,这样的SQL语法是正确的。
但是在Spark中,执行这样的SQL语句,则会抛出org.apache.spark.sql.AnalysisException异常:

org.apache.spark.sql.AnalysisException: expression 'employee.`name`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.

这是因为Spark SQL在执行SQL语句时,事先会调用CheckAnalysis的checkAnalysis方法对LogicalPlan进行检查:

def checkAnalysis(plan: LogicalPlan): Unit = {case e: Attribute if groupingExprs.isEmpty =>// Collect all [[AggregateExpressions]]s.val aggExprs = aggregateExprs.filter(_.collect {case a: AggregateExpression => a}.nonEmpty)failAnalysis(s"grouping expressions sequence is empty, " +s"and '${e.sql}' is not an aggregate function. " +s"Wrap '${aggExprs.map(_.sql).mkString("(", ", ", ")")}' in windowing " +s"function(s) or wrap '${e.sql}' in first() (or first_value) " +s"if you don't care which value you get.")case e: Attribute if !groupingExprs.exists(_.semanticEquals(e)) =>failAnalysis(s"expression '${e.sql}' is neither present in the group by, " +s"nor is it an aggregate function. " +"Add to group by or wrap in first() (or first_value) if you don't care " +"which value you get.")
}

按照给出的SQL语句,groupingExprs应该是Empty才对,然而根据抛出的错误提示,在对分析语句进行检查时,却是走的后一个模式匹配分支,即e: Attribute if !groupingExprs.exists(_.semanticEquals(e))。

根据提示,在不增加group by的情况下,需要对select中的字段包裹一个first()或者first_value()函数,如下所示:

spark.sql("select first(name),first(role), sum(income) from employee")

这里的维度包含name和role。如果添加了group by,但只针对其中的一个维度进行了分组,那么对于缺少分组的维度,也当用first()函数来包裹才对。

第一部分内容转载自:张逸-简书

二、spark sql group by

hiveContext.sql("select time,count(*) from page_click group by id").collect.foreach(println)
//报错
org.apache.spark.sql.AnalysisException: expression 'page_click.`time`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:218)at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$6.apply(CheckAnalysis.scala:245)at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$6.apply(CheckAnalysis.scala:245)at scala.collection.immutable.List.foreach(List.scala:381)at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:245)at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682)... 48 elided

time这一列在 group by的时候有多个查询结果,需要使用collect_set()一下。

scala>hiveContext.sql("select collect_set(time),count(*) from page_click group by id limit 10").collect.foreach(println)

第二部分内容转载自:time_exceed-CSDN

三、Spark Caused by: java.io.NotSerializableException 序列化异常踩过的坑

这部分之所以会进行转载学习,是因为在之前的项目中,DateTimeFormat这个方法和Redis初始化对象方法,会报Spark Caused by: java.io.NotSerializableException 这个错,故转载过来学习。

最近有需求需要在driver端创建好类实例,然后在rdd里面调用,但是使用过程中发现 Caused by: java.io.NotSerializableException,即序列化异常,通过查处网上资料发现是构建的类没有继承Serializable,没有继承Serializable的类是不会自动执行自动序列化操作的,因此我把构建的类继承了Serializable这个类,再次运行的时候发现依旧是序列化异常,百般不得其解。

序列异常天坑1(网上常见的)

在rdd外实例化的类没有继承Serializable,在实例化类在rdd中使用,如下代码块:

class ClassA {def getClassName: String = this.getClass.getSimpleName
}object SerializableTest {def main(args: Array[String]): Unit = {val conf = new SparkConf(true).setMaster("local[*]").setAppName("SerializableTest").set("spark.rdd.compress", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")lazy val sc = new SparkContext(conf)val classA = new ClassA()val rdd = sc.makeRDD(1 to 5)rdd.map(i => "getClassName in main -> " + classA.getClassName + s": $i").collect().foreach(println)}
}

填坑方法
方法1:将ClassA修改为继承Serializable类:

class ClassA extends Serializable {def getClassName: String = this.getClass.getSimpleName
}

方法2:将ClassA放在rdd里面进行实例化:

rdd.map(i => {val classA = new ClassA"getClassName in main -> " + classA.getClassName + s": $i"}).collect().foreach(println)

方法3:将ClassA改成静态类,静态类自动实例化,在rdd里面直接调用其方法:

object ClassA {def getClassName: String = this.getClass.getSimpleName
}object SerializableTest {def main(args: Array[String]): Unit = {val conf = new SparkConf(true).setMaster("local[*]").setAppName("SerializableTest").set("spark.rdd.compress", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")lazy val sc = new SparkContext(conf)val rdd = sc.makeRDD(1 to 5)rdd.map(i => "getClassName in main -> " + ClassA.getClassName + s": $i").collect().foreach(println)}
}

序列异常天坑2

在rdd里面调用类中某个类的方法报序列化异常,代码如下:

class ClassA {def getClassName: String = this.getClass.getSimpleName
}class ClassB(sc: SparkContext) extends Serializable{val classA = new ClassA()def fun(): Unit = {val rdd = sc.makeRDD(1 to 5)rdd.map(i => "getClassName in ClassB -> "+classA.getClassName + s": $i").collect.foreach(println)}
}object SerializableTest {def main(args: Array[String]): Unit = {val conf = new SparkConf(true).setMaster("local[*]").setAppName("SerializableTest").set("spark.rdd.compress", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")lazy val sc = new SparkContext(conf)val classB = new ClassB(sc)val rdd = sc.makeRDD(1 to 5)rdd.map(i => "getClassName in main -> " + classB.classA.getClassName + s": $i").collect().foreach(println)}
}

如上述,在rdd里面调用ClassB中属性ClassA中的方法报序列化异常

填坑方法
方法1:这个ClassB有点脑残,把ClassA作为属性实在不可取,如果只是为了达到调用ClassA内的方法,则可以让ClassB去继承ClassA

class ClassA extends Serializable {def getClassName: String = this.getClass.getSimpleName
}class ClassB(sc: SparkContext) extends ClassA with Serializable{}object SerializableTest {def main(args: Array[String]): Unit = {val conf = new SparkConf(true).setMaster("local[*]").setAppName("SerializableTest").set("spark.rdd.compress", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")lazy val sc = new SparkContext(conf)val classB = new ClassB(sc)val rdd = sc.makeRDD(1 to 5)rdd.map(i => "getClassName in main -> " + classB.getClassName + s": $i").collect().foreach(println)}
}

方法2:在rdd外先把ClassB中ClassA取出来放到一个变量里面去,再在rdd里面调用该变量:

object SerializableTest {def main(args: Array[String]): Unit = {val conf = new SparkConf(true).setMaster("local[*]").setAppName("SerializableTest").set("spark.rdd.compress", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")lazy val sc = new SparkContext(conf)val classB = new ClassB(sc)val a = classB.classAval rdd = sc.makeRDD(1 to 5)rdd.map(i => "getClassName in main -> " + a.getClassName + s": $i").collect().foreach(println)}
}

这种类似填坑1里面的,相当于重新new了一个ClassA

序列异常天坑3

在类ClassB中有方法fun,和属性classA,fun调用了classA中的方法:

class ClassA extends Serializable {def getClassName: String = this.getClass.getSimpleName
}class ClassB(sc: SparkContext) extends Serializable{val classA = new ClassA()def fun(): Unit = {val rdd = sc.makeRDD(1 to 5)rdd.map(i => "getClassName in ClassB -> "+classA.getClassName + s": $i").collect.foreach(println)}
}object SerializableTest {def main(args: Array[String]): Unit = {val conf = new SparkConf(true).setMaster("local[*]").setAppName("SerializableTest").set("spark.rdd.compress", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")lazy val sc = new SparkContext(conf)val classB = new ClassB(sc)classB.fun()}
}

填坑方法
方法1:在fun里面不使用属性classA,而是在fun里面重新构建ClassA

def fun(): Unit = {val classA = new ClassA()val rdd = sc.makeRDD(1 to 5)rdd.map(i => "getClassName in ClassB -> "+classA.getClassName + s": $i").collect.foreach(println)}

这类似于天坑1的解决方式。但是很多时候我们的ClassA是一个比较全的工具类,不仅仅是在fun单个方法体里面调用,因此需要将放到ClassB作为属性。

方法2:与前面的一样,可以在fun方法里面的rdd前面先新增一个变量在调用

def fun(): Unit = {val a = classAval rdd = sc.makeRDD(1 to 5)rdd.map(i => "getClassName in ClassB -> "+a.getClassName + s": $i").collect.foreach(println)}

方法3:把ClassB修改成object修饰静态类:

class ClassA extends Serializable {def getClassName: String = this.getClass.getSimpleName
}object ClassB extends Serializable{val classA = new ClassA()def fun(sc: SparkContext): Unit = {val rdd = sc.makeRDD(1 to 5)rdd.map(i => "getClassName in ClassB -> "+classA.getClassName + s": $i").collect.foreach(println)}
}object SerializableTest {def main(args: Array[String]): Unit = {val conf = new SparkConf(true).setMaster("local[*]").setAppName("SerializableTest").set("spark.rdd.compress", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")lazy val sc = new SparkContext(conf)val classB = ClassBclassB.fun(sc)}
}

四、org.apache.spark.SparkException: Task not serializable

错误信息:

org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)

问题原因:再对RDD进行操作时引用了类的成员变量而该成员变量无法被序列化所导致的。

例如:

object Test2 extends App{val conf = new SparkConf().setAppName("RVM").setMaster("local")val sc = new SparkContext(conf)val matrix = new DenseMatrix(2,2,Array(1.0,2,3,4))new Test(sc,matrix).run()}class Test(scc:SparkContext,PHI:DenseMatrix) extends Serializable{val ts = 0.1def run(): Unit ={val rdds = scc.parallelize(0 to 3)val a = rdds.map(x =>{PHI.toArray.apply(x)*x})a.collect.foreach(println(_))}
}

出现“org.apache.spark.SparkException: Task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。特别是当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。解决这个问题最常用的方法有:

  1. 如果可以,将依赖的变量放到map、filter等的参数内部定义。这样就可以使用不支持序列化的类;
  2. 如果可以,将依赖的变量独立放到一个小的class中,让这个class支持序列化;这样做可以减少网络传输量,提高效率;
  3. 如果可以,将被依赖的类中不能序列化的部分使用transient关键字修饰,告诉编译器它不需要序列化。
  4. 将引用的类做成可序列化的。

总结

通过上面填坑过程发现如下规律:
1、在rdd应该外部变量类实例的时候,类需要继承Serializable
2、在非静态类中(class声明的类),若是类属性是一个对象,则该属性不能在rdd里面直接使用,尽管该对象是已经继承了Serializable,可以直接在rdd前将该属性赋值为一个变量,再在rdd里面调用该变量

Spark踩坑填坑-聚合函数-序列化异常相关推荐

  1. [iOS]贝聊 IAP 实战之见坑填坑

    大家好,我是**贝聊科技** 的 iOS 工程师 @NewPan. 这次为大家带来我司 IAP 的实现过程详解,鉴于支付功能的重要性以及复杂性,文章会很长,而且支付验证的细节也关系重大,所以这个主题会 ...

  2. Spark 2.3.0 用户自定义聚合函数UserDefinedAggregateFunction和Aggregator

    Spark 2.3.0 用户自定义聚合函数UserDefinedAggregateFunction和Aggregator 一.无类型的用户自定于聚合函数(Untyped User-Defined Ag ...

  3. weex css单位,Weex系列(7) ——踩坑填坑的总总

    目录 使用weex已经一年半了,踩了很多坑,也流了很多泪填上,总结一波,希望对大家有所帮助. LaunchImage 这是今年来的第一个调整,需要把 iOS8.0 and Later勾上,不然iPho ...

  4. Springboot中使用Mybatis框架对数据库进行联表查询,踩坑填坑

    因为mybatis使用的基本是原生sql语句 所以首先从数据库开始说 以mysql数据库为例,对表的连接查询分为四种 内连接,外连接,交叉连接,和联合连接 内连接使用比较运算符根据每个表共有的列的值匹 ...

  5. js promises 踩坑 填坑 We have a problem with promises

    We have a problem with promises promise 填坑 对于 promise return 与否,结果真的不一样哦. By: Nolan Lawson Published ...

  6. Cobalt Strike折腾踩坑填坑记录

    文章目录 0X00 背景 0x01 基础原理 0x02 关于破戒 Exit暗桩 0x03 CDN+反代隐藏Teamserver Domain Fronting Proxy 0x04 DNS上线 一个未 ...

  7. Spark:group by和聚合函数使用

    groupBy分组和使用agg聚合函数demo: df.show +----+-----+---+ |YEAR|MONTH|NUM| +----+-----+---+ |2017| 1| 10| |2 ...

  8. Pyinstaller 详解多种打包过程(去坑,填坑)。

    前言 本篇文章,详细介绍pyinstaller多种打包过程.去坑,填坑. 一.安装Pyinstaller 1)使用下面的命令即可安装(win10) pip install pyinstaller 二. ...

  9. 微信小程序--多视频滑动播放(踩坑,填坑)

    最近在做一个关于短视频的小程序,类似于微视和快手的小程序,但是在做的过程当中碰到了好多坑,于是得一步一步的去填这个坑.先来看看最后的实现效果 在做的过程中,想要实现多个视频无限滑动播放,并且在视频原生 ...

最新文章

  1. 把python语言翻译出来_Python语言实现翻译小工具(Python打包成exe文件)
  2. 总结 | 如何测试你自己的 RubyGem
  3. phpshe b2c商城系统配置nginx支持pathinfo和rewrite的写法
  4. php 1天,自学PHP之第1天-字符串函数 - 晓雨网
  5. 学php还是golang,学swoole还是golang
  6. m1芯片MacBook安装Apple优化版TensorFlow(虚拟环境)
  7. 求两个数的公约数java_java中怎样求两个数的最大公约数?
  8. php方法重载方法重写_PHP面向对象之旅:方法覆盖
  9. 《R语言实战》读书笔记--学习张丹日志
  10. 测试学习java_使用Junit测试框架学习Java
  11. 输入一行数字、数字之间用空格隔开python_Python 实现一行输入多个数字(用空格隔开)...
  12. 微服务测试:如何破解测试所面临的问题?测试的类型和范围你懂吗
  13. Oracle跨平台迁移之XTTS
  14. 明尼苏达计算机科学与工程,明尼苏达大学计算机科学专业排名第29(2020年USNEWS美国排名)...
  15. 论“渤海—黄海开凿人工运河”
  16. 目前支持WebGL的浏览器有哪些?
  17. IO流-节点流和处理流(涵盖底层调用关系)
  18. 关于AC6003、6005、6605版本关联WIFI6代产品方法
  19. MySQL之锁-表级锁
  20. IP地址管理工具——netbox——全

热门文章

  1. 微信小程序点击放大图片
  2. 王桂林讲C++之指向类成员指针
  3. 排列组合简介以及相关问题
  4. 2023年天津中德应用技术大学专升本通信工程专业考试大纲
  5. 瓦尔塔汽车蓄电池解读
  6. 城市异常事件精确预测:基于交互注意力机制的时空数据预测模型
  7. 解读wlk成就系统系列之:饱读诗书
  8. 新一代的java模板引擎--beetl
  9. Multi-Stage Feature Fusion Network for Video Super-Resolution阅读笔记
  10. 【Linux】ps -aux和ps -ef命令命令参数的作用以及区别详情