一、问题

使用spark join两张表(5000w*500w)总是出错,报的异常显示是在shuffle阶段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
14/11/27 12:05:49 ERROR storage.DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /hadoop/application_1415632483774_448143/spark-local-20141127115224-9ca8/04/shuffle_1_1562_27
java.io.FileNotFoundException: /hadoop/application_1415632483774_448143/spark-local-20141127115224-9ca8/04/shuffle_1_1562_27 (No such file or directory)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:212)
        at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(BlockObjectWriter.scala:178)
        at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$revertWrites$1.apply(HashShuffleWriter.scala:118)
        at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$revertWrites$1.apply(HashShuffleWriter.scala:117)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at org.apache.spark.shuffle.hash.HashShuffleWriter.revertWrites(HashShuffleWriter.scala:117)
        at org.apache.spark.shuffle.hash.HashShuffleWriter.stop(HashShuffleWriter.scala:89)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:54)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)

出问题的代码块(scala)

1 val cRdd = iRdd.leftOuterJoin(label).map {
2      case (id, (iMap, Some(set))) => (id, (iMap, set))
3      case (id, (iMap, None)) => (id, (iMap, new HashSet[Int]()))
4    }.persist(StorageLevel.MEMORY_AND_DISK)

二、问题分析与解决

一般spark job很多问题都是来源于系统资源不够用,通过监控日志等判断是内存资源占用过高导致的问题,因此尝试通过配置参数的方法来解决。

1)--conf spark.akka.frameSize=100

此参数控制Spark中通信消息的最大容量 (如task的输出结果),默认为10M。当处理大数据时,task的输出可能会大于这个值,需要根据实际数据设置一个更高的值。尝试将此参数设置成100M后,问题未能解决。

2)--conf spark.shuffle.manager=SORT

Spark默认的shuffle采用Hash模式,在HASH模式下,每一次shuffle会生成M*R的数量的文件(M指的是Map的数目,R指的是Reduce的数目),而当Map和Reduce的数目开得较大时,会产生相当规模的文件,与此同时带来了大量的内存开销。

为了降低系统资源,可以采用Sort模式,Sort模式只产生M数量的文件。具体可以参考:Sort-based Shuffle之初体验

在我们的应用场景下,采用Sort模式后,shuffle时间比之前增大了1/3,但是问题依旧未解决。

3)--conf spark.yarn.executor.memoryOverhead=4096

executor堆外内存设置。起初是1024M,未能跑过,后改为4096M,Job就能跑通,原因是程序使用了大量的堆外内存。

spark job运行参数优化相关推荐

  1. hive sql 报错后继续执行_Hive优化之Spark执行引擎参数调优(二)

    Hive是大数据领域常用的组件之一,主要是大数据离线数仓的运算,关于Hive的性能调优在日常工作和面试中是经常涉及的的一个点,因此掌握一些Hive调优是必不可少的一项技能.影响Hive效率的主要有数据 ...

  2. Spark SQL运行流程及性能优化:RBO和CBO

    1 Spark SQL运行流程 1.1 Spark SQL核心--Catalyst Spark SQL的核心是Catalyst查询编译器,它将用户程序中的SQL/Dataset/DataFrame经过 ...

  3. Spark程序运行常见错误解决方法以及优化

    Spark程序运行常见错误解决方法以及优化 参考文章: (1)Spark程序运行常见错误解决方法以及优化 (2)https://www.cnblogs.com/double-kill/p/901238 ...

  4. Spark 在京东物流财务计费应用中的一些常见参数优化

    财务需要进行物流总公司同各子公司间的计费及结算,每月初完成核算入账,既要完成对海量数据计费,又要完成对下游系统的下发.经过调研,Spark 提供了比 MapReduce 编程模型更为灵活的 DAG 编 ...

  5. Spark Streaming实践和优化

    2019独角兽企业重金招聘Python工程师标准>>> Spark Streaming实践和优化 博客分类: spark 在流式计算领域,Spark Streaming和Storm时 ...

  6. 【Spark 深入学习 04】再说Spark底层运行机制

    本节内容 · spark底层执行机制 · 细说RDD构建过程 · Job Stage的划分算法 · Task最佳计算位置算法 一.spark底层执行机制 对于Spark底层的运行原理,找到了一副很好的 ...

  7. Mapreduce和Yarn概念,参数优化,作用,原理,MapReduce计数器 Counter,MapReduce 多job串联之ControlledJob(来自学习资料)

    3.3. MapReduce与YARN 3.3.1 YARN概述 Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运 ...

  8. Mapreduce和Yarn概念,参数优化,作用,原理,MapReduce计数器 Counter,MapReduce 多job串联之ControlledJob(来自学习资料)...

    3.3. MapReduce与YARN 3.3.1 YARN概述 Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运 ...

  9. 工作经验分享:Spark调优【优化后性能提升1200%】

    问题导读 1.本文遇到了什么问题? 2.遇到问题后,做了哪些分析? 3.本文解决倾斜使用哪些方法? 4.本次数据倾斜那种方法更有效? 5.解决性能优化问题的原理是什么? 优化后效果 1.业务处理中存在 ...

最新文章

  1. AS3版本的MaxRects算法测试
  2. 软件测试需要学习什么技术
  3. 最近项目上Makefile的小Bug
  4. 在pl/sql中使用exp/imp工具实现oracle数据导出/导入
  5. 面向Tableau开发人员的Python简要介绍(第2部分)
  6. Java多线程系列--“JUC锁”05之 非公平锁
  7. 【转】C#中StreamWriter与BinaryWriter的区别兼谈编码。
  8. 笔试题--Multicore简答题(下)
  9. 字符串字段当条件查询的时候需要加引号吗_如此详细的SQL优化教程,是你需要的吗?...
  10. 我用 Python 集齐了五福
  11. kafka 的pom文件_Flink的sink实战之二:kafka
  12. 三层结构对多语言的支持
  13. Dev C++中编译问题
  14. java 抽象类命名_Java命名规范
  15. 对话机器人(一)——对话机器人基础知识
  16. [计算机组成原理]定点数运算及溢出检测
  17. word 表格不跨行断页
  18. Python---python3.7.0---如何安装PIL
  19. 计算机网络的全局模式是,全局配置模式-计算机网络精品课程网.PPT
  20. 唯品会订单分库分表的实践总结以及关键步骤

热门文章

  1. Java快速入门学习笔记3 | Java语言中的表达式与操作符
  2. matlab实现photoshop,photoshop图像滤镜——素描算法(含matlab代码)
  3. IDEA配置Docker一键部署SpringBoot项目(企业级做法)
  4. 备份自己常用的VS2010设置
  5. NSDictionary / NSMutableDictionary 及 NSArray / NSmutableArray (实例)
  6. 集成学习——机器学习面试
  7. Windows下搭建IOS开发环境(一)
  8. 部署DNS split分离解析服务
  9. LeetCode 507. Perfect Number
  10. 【ICLR2019】Oral 论文汇总