import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.common.math.MathUtils;// 自定义Partitioner
class ESShardPartitioner(settings: String) extends org.apache.spark.Partitioner {protected var _numPartitions = -1;  protected var _hashFunction = new org.elasticsearch.cluster.routing.Murmur3HashFunction;//此处会出现序列化错误override def numPartitions: Int = {val newSettings = new org.elasticsearch.hadoop.cfg.PropertiesSettings().load(settings);// 生产环境下,需要自行设置索引的 index/type,我是以web/blog作为实验的indexnewSettings.setResourceRead("web/blog"); // ******************** !!! modify it !!! ******************** newSettings.setResourceWrite("web/blog"); // ******************** !!! modify it !!! ******************** val repository = new org.elasticsearch.hadoop.rest.RestRepository(newSettings);val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly());repository.close();// targetShards ??? data structure_numPartitions = targetShards.size();println("********************numPartitions*************************");println(_numPartitions);_numPartitions;}override def getPartition(docID: Any): Int = {    val r = _hashFunction.hash(docID.toString());val shardId = org.elasticsearch.common.math.MathUtils.mod(r, _numPartitions);println("********************shardId*************************");println(shardId)shardId;}
}

根源:出现“task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。特别是当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。

解决方法:

class ESShardPartitioner(settings: String) extends org.apache.spark.Partitioner {protected var _numPartitions = -1;  override def numPartitions: Int = {val newSettings = new org.elasticsearch.hadoop.cfg.PropertiesSettings().load(settings);// 生产环境下,需要自行设置索引的 index/type,我是以web/blog作为实验的indexnewSettings.setResourceRead("web/blog"); // ******************** !!! modify it !!! ******************** newSettings.setResourceWrite("web/blog"); // ******************** !!! modify it !!! ******************** val repository = new org.elasticsearch.hadoop.rest.RestRepository(newSettings);val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly());repository.close();// targetShards ??? data structure_numPartitions = targetShards.size();println("********************numPartitions*************************");println(_numPartitions);_numPartitions;}override def getPartition(docID: Any): Int = {val _hashFunction = new org.elasticsearch.cluster.routing.Murmur3HashFunction;val r = _hashFunction.hash(docID.toString());val shardId = org.elasticsearch.common.math.MathUtils.mod(r, _numPartitions);println("********************shardId*************************");println(shardId)shardId;}
}

Job aborted due to stage failure: Task not serializable:

If you see this error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...

The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet:

NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");rdd.map(s -> notSerializable.doSomething(s)).collect();

This will trigger that error. Here are some ideas to fix this error:

  • Serializable the class
  • Declare the instance only within the lambda function passed in map.
  • Make the NotSerializable object as a static and create it once per machine.
  • Call rdd.forEachPartition and create the NotSerializable object in there like this:
rdd.forEachPartition(iter -> {NotSerializable notSerializable = new NotSerializable();// ...Now process iter
});

参考:https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html

转载于:https://www.cnblogs.com/bonelee/p/6120539.html

spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable...相关推荐

  1. spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable

    出现"task not serializable"这个错误,一般是因为在map.filter等的参数使用了外部的变量,但是这个变量不能序列化.特别是当引用了某个类(经常是当前类)的 ...

  2. 大数据学习系列之八----- Hadoop、Spark、HBase、Hive搭建环境遇到的错误以及解决方法

    大数据学习系列之八----- Hadoop.Spark.HBase.Hive搭建环境遇到的错误以及解决方法 参考文章: (1)大数据学习系列之八----- Hadoop.Spark.HBase.Hiv ...

  3. Spark SQL: Error in query: undefined function错误的解决方法

    本文原文出处: http://blog.csdn.net/bluishglc/article/details/50748937 严禁任何形式的转载,否则将委托CSDN官方维护权益! 问题描述 如果你在 ...

  4. WCF项目中出现常见错误的解决方法:基础连接已经关闭: 连接被意外关闭

    原文:WCF项目中出现常见错误的解决方法:基础连接已经关闭: 连接被意外关闭 在我们开发WCF项目的时候,常常会碰到一些莫名其妙的错误,有时候如果根据它的错误提示信息,一般很难定位到具体的问题所在,而 ...

  5. Hudi同步Hive表报“HoodieException : Got runtime exception when hive syncing”错误的解决方法

    1 问题描述 闯过第二关之后,普通的元数据同步基本就没什么问题了.但是当遇到下面这种场景时,同步再次"翻了车": 如果在一个SparkSession下,先读取一个Hudi数据集,得 ...

  6. oracle 配置数据库错误,Oracle数据库配置错误信息解决方法

    Oracle数据库配置错误信息 Oralce数据库的错误信息经常会出现,我们看见的都是错误的代码,至于错误原因究竟是什么还一时半会难以解答,所以就把一些常见的错误整理了一下,来看看也许对你有帮助的. ...

  7. “ Error:(1, 1) java: 非法字符: ‘\ufeff‘ ”错误的解决方法

    前言:今天为了做作业,在 github 上面下载了个项目,然后在运行项目时发现报错,在此记录一下 " Error:(1, 1) java: 非法字符: '\ufeff' "错误的解 ...

  8. 有关Run-Time Check Failure #2 - Stack around the variable 'XXX' was corrupted.错误的解决方法

    有关Run-Time Check Failure #2 - Stack around the variable 'XXX' was corrupted.错误的解决方法 今天我在敲完一段代码运行的时候出 ...

  9. php 500 内部服务器错误,php 500 - 内部服务器错误的解决方法

    php 500 - 内部服务器错误的解决方法 发布时间:2020-11-04 09:55:31 来源:亿速云 阅读:71 作者:小新 小编给大家分享一下php 500 - 内部服务器错误的解决方法,相 ...

最新文章

  1. mysql 怎么格式化输出_怎么格式化MySQL输出代码
  2. Verilog: How to avoid 'Redeclaration of ansi port'
  3. docker 核心概念整理
  4. scrapy two
  5. LintCode 402: Continuous Subarray Sum
  6. 代理 XP”组件已作为此服务器安全配置的一部分被关闭。系统管理员可以使用 sp_configure 来启用“代理 XP”。...
  7. android fragment 生命周期
  8. 计算机和移动存储设备管理台账,如何建立涉密管理台账?
  9. Istio学习笔记-熔断实验
  10. Windows11在Edge浏览器中打开IE浏览器兼容的页面,在Edge浏览器打开加载ActiveX控件的页面
  11. pixel 刷入自己编译的Android 8.0 安装Xposed 显示 Verified Boot (dm-verity) prevents the device from booting
  12. 正面管教读书笔记 10 你的性格对孩子性格的影响
  13. 使用机器学习来进行应用识别
  14. 常用的自动化管理工具
  15. Redis总结二 - 测试案例
  16. Matlab弹簧质量阻尼系统建模仿真
  17. hdu 2829 Lawrence 斜率优化
  18. 【图像隐藏】基于DCT算法实现数字水印嵌入+检测+攻击含Matlab源码
  19. NDN-lite 命名数据网络 -----第一章:关于Interest和Data
  20. 学计算机的去做传感器,传感器技术

热门文章

  1. 查看linux版本信息
  2. html5 video speed control插件,Video Speed Controller
  3. 机器人水库涵洞检测_能给眼睛打针、可水下搜索救援,手术机器人水下机器人将亮相服贸会...
  4. flinksql获取系统当前时间搓_DNF:从剑魂角度看工作服,不仅不是地摊货,更是超越了手搓套...
  5. java等号_java等号
  6. 南华大学计算机学院足球队,球场健儿,不言放弃——记南华大学“新生杯”足球赛...
  7. php spl的优势,PHP SPL核心库相对以前有什么好处嘛?
  8. 计算机动画火柴人作业,(Flash期末作品综合实验报告.doc
  9. 如何选择漏电保护器规格型号_家用漏电开关型号介绍 如何选用家用漏电开关...
  10. 一般web放在linux那个目录下,web.py应该安装在linux的哪个目录下?