spark job运行参数优化
一、问题
使用spark join两张表(5000w*500w)总是出错,报的异常显示是在shuffle阶段。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
14 / 11 / 27 12 : 05 : 49 ERROR storage.DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /hadoop/application_1415632483774_448143/spark-local- 20141127115224 -9ca8/ 04 /shuffle_1_1562_27
java.io.FileNotFoundException: /hadoop/application_1415632483774_448143/spark-local- 20141127115224 -9ca8/ 04 /shuffle_1_1562_27 (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java: 212 )
at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(BlockObjectWriter.scala: 178 )
at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$revertWrites$ 1 .apply(HashShuffleWriter.scala: 118 )
at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$revertWrites$ 1 .apply(HashShuffleWriter.scala: 117 )
at scala.collection.IndexedSeqOptimized$ class .foreach(IndexedSeqOptimized.scala: 33 )
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala: 108 )
at org.apache.spark.shuffle.hash.HashShuffleWriter.revertWrites(HashShuffleWriter.scala: 117 )
at org.apache.spark.shuffle.hash.HashShuffleWriter.stop(HashShuffleWriter.scala: 89 )
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala: 73 )
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala: 41 )
at org.apache.spark.scheduler.Task.run(Task.scala: 54 )
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala: 177 )
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: 1145 )
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 615 )
at java.lang.Thread.run(Thread.java: 724 )
|
出问题的代码块(scala)
1 val cRdd = iRdd.leftOuterJoin(label).map { 2 case (id, (iMap, Some(set))) => (id, (iMap, set)) 3 case (id, (iMap, None)) => (id, (iMap, new HashSet[Int]())) 4 }.persist(StorageLevel.MEMORY_AND_DISK)
二、问题分析与解决
一般spark job很多问题都是来源于系统资源不够用,通过监控日志等判断是内存资源占用过高导致的问题,因此尝试通过配置参数的方法来解决。
1)--conf spark.akka.frameSize=100
此参数控制Spark中通信消息的最大容量 (如task的输出结果),默认为10M。当处理大数据时,task的输出可能会大于这个值,需要根据实际数据设置一个更高的值。尝试将此参数设置成100M后,问题未能解决。
2)--conf spark.shuffle.manager=SORT
Spark默认的shuffle采用Hash模式,在HASH模式下,每一次shuffle会生成M*R的数量的文件(M指的是Map的数目,R指的是Reduce的数目),而当Map和Reduce的数目开得较大时,会产生相当规模的文件,与此同时带来了大量的内存开销。
为了降低系统资源,可以采用Sort模式,Sort模式只产生M数量的文件。具体可以参考:Sort-based Shuffle之初体验
在我们的应用场景下,采用Sort模式后,shuffle时间比之前增大了1/3,但是问题依旧未解决。
3)--conf spark.yarn.executor.memoryOverhead=4096
executor堆外内存设置。起初是1024M,未能跑过,后改为4096M,Job就能跑通,原因是程序使用了大量的堆外内存。
spark job运行参数优化相关推荐
- hive sql 报错后继续执行_Hive优化之Spark执行引擎参数调优(二)
Hive是大数据领域常用的组件之一,主要是大数据离线数仓的运算,关于Hive的性能调优在日常工作和面试中是经常涉及的的一个点,因此掌握一些Hive调优是必不可少的一项技能.影响Hive效率的主要有数据 ...
- Spark SQL运行流程及性能优化:RBO和CBO
1 Spark SQL运行流程 1.1 Spark SQL核心--Catalyst Spark SQL的核心是Catalyst查询编译器,它将用户程序中的SQL/Dataset/DataFrame经过 ...
- Spark程序运行常见错误解决方法以及优化
Spark程序运行常见错误解决方法以及优化 参考文章: (1)Spark程序运行常见错误解决方法以及优化 (2)https://www.cnblogs.com/double-kill/p/901238 ...
- Spark 在京东物流财务计费应用中的一些常见参数优化
财务需要进行物流总公司同各子公司间的计费及结算,每月初完成核算入账,既要完成对海量数据计费,又要完成对下游系统的下发.经过调研,Spark 提供了比 MapReduce 编程模型更为灵活的 DAG 编 ...
- Spark Streaming实践和优化
2019独角兽企业重金招聘Python工程师标准>>> Spark Streaming实践和优化 博客分类: spark 在流式计算领域,Spark Streaming和Storm时 ...
- 【Spark 深入学习 04】再说Spark底层运行机制
本节内容 · spark底层执行机制 · 细说RDD构建过程 · Job Stage的划分算法 · Task最佳计算位置算法 一.spark底层执行机制 对于Spark底层的运行原理,找到了一副很好的 ...
- Mapreduce和Yarn概念,参数优化,作用,原理,MapReduce计数器 Counter,MapReduce 多job串联之ControlledJob(来自学习资料)
3.3. MapReduce与YARN 3.3.1 YARN概述 Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运 ...
- Mapreduce和Yarn概念,参数优化,作用,原理,MapReduce计数器 Counter,MapReduce 多job串联之ControlledJob(来自学习资料)...
3.3. MapReduce与YARN 3.3.1 YARN概述 Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运 ...
- 工作经验分享:Spark调优【优化后性能提升1200%】
问题导读 1.本文遇到了什么问题? 2.遇到问题后,做了哪些分析? 3.本文解决倾斜使用哪些方法? 4.本次数据倾斜那种方法更有效? 5.解决性能优化问题的原理是什么? 优化后效果 1.业务处理中存在 ...
最新文章
- AS3版本的MaxRects算法测试
- 软件测试需要学习什么技术
- 最近项目上Makefile的小Bug
- 在pl/sql中使用exp/imp工具实现oracle数据导出/导入
- 面向Tableau开发人员的Python简要介绍(第2部分)
- Java多线程系列--“JUC锁”05之 非公平锁
- 【转】C#中StreamWriter与BinaryWriter的区别兼谈编码。
- 笔试题--Multicore简答题(下)
- 字符串字段当条件查询的时候需要加引号吗_如此详细的SQL优化教程,是你需要的吗?...
- 我用 Python 集齐了五福
- kafka 的pom文件_Flink的sink实战之二:kafka
- 三层结构对多语言的支持
- Dev C++中编译问题
- java 抽象类命名_Java命名规范
- 对话机器人(一)——对话机器人基础知识
- [计算机组成原理]定点数运算及溢出检测
- word 表格不跨行断页
- Python---python3.7.0---如何安装PIL
- 计算机网络的全局模式是,全局配置模式-计算机网络精品课程网.PPT
- 唯品会订单分库分表的实践总结以及关键步骤
热门文章
- Java快速入门学习笔记3 | Java语言中的表达式与操作符
- matlab实现photoshop,photoshop图像滤镜——素描算法(含matlab代码)
- IDEA配置Docker一键部署SpringBoot项目(企业级做法)
- 备份自己常用的VS2010设置
- NSDictionary / NSMutableDictionary 及 NSArray / NSmutableArray (实例)
- 集成学习——机器学习面试
- Windows下搭建IOS开发环境(一)
- 部署DNS split分离解析服务
- LeetCode 507. Perfect Number
- 【ICLR2019】Oral 论文汇总