Dpark内存溢出

Spark内存溢出
堆内内存溢出
堆外内存溢出
堆内内存溢出
java.lang.OutOfMemoryError: GC overhead limit execeeded
java.lang.OutOfMemoryError: Java heap space
具体说明
Heap size JVM堆的设置是指java程序运行过程中JVM可以调配使用的内存空间的设置.
JVM在启动的时候会自动设置Heap size的值,
Heap size 的大小是Young Generation 和Tenured Generaion 之和。
提示:在JVM中如果98%的时间是用于GC且可用的Heap size 不足2%的时候将抛出此异常信息
Driver heap OOM的三大原因:
(1).用户在Driver端口生成大对象, 比如创建了一个大的集合数据结构
(2).从Executor端收集数据回Driver端
Executor heap OOM
堆外内存溢出
报错情况
Container killed by YARN for exceeding memory limits. 1*.4 GB of 1* GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
基本内容介绍:
1.executor 和 container
01.Spark中的 executor 进程是跑在 container 中,所以container的最大内存会直接影响到executor的最大可用内存
02. yarn.nodemanager.pmem-check-enabled 该参数默认是true,也就是会由它来控制监控container的内存使用
03. yarn.scheduler.maximum-allocation-mb 设置值6114,也就是说单个container申请的最大内存是这么多,
执行任务的时候你的executer需要的内存超出这个值,那么就会被杀掉
container超过了内存的限制从而被kill掉
04.executor执行的时候,用的内存可能会超过 executor-memory
所以会为executor额外预留一部分内存。spark.yarn.executor.memoryOverhead 代表了这部分内存
即实际的内存
val executorMem = args.executorMemory + executorMemoryOverhead
05.memoryOverhead
如果没有设置 spark.yarn.executor.memoryOverhead ,则这部分的内存大小为
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
其中 MEMORY_OVERHEAD_FACTOR 默认为0.1, MEMORY_OVERHEAD_MIN 默认为384m executorMemory 为设置的 executor-memory
实际 executorMem= X+max(X*0.1,384)
设置了的话
executorMem=X +spark.yarn.executor.memoryOverhead 其中 X 是值 args.executorMemory
06. executorMem 需要满足的条件: executorMem< yarn.scheduler.maximum-allocation-mb

2.Yarn 中 contaimer 和 Spark中 partition 之间的关系
job会被切分成stages,每个stage切分成task,每个task单独调度,可以把executor的jvm进程看做task执行池
spark.executor.memory 每个executor使用的内存
一个executor可以并行执行多个task,实际上一个executor是一个进程,task是executor里的一个线程。
一个task至少要独占executor里的一个虚拟核心vcore, 一个task要占用几个核心,可以由.config(“spark.task.cpus”, 1)配置,默认是1即一个task占用一个vcore
同时并行执行的task最大数量 = executor数目 * (每个executor核数 / 每个task占用核心数)
总核数= executor-cores * num-executor
例如: 每个 executor具有3个 cores 理论上每个executor可以处理1-4个task
3.分区与Task的情况
读取阶段
01.从内存中创建 RDD:sc.parallelize(…),那么默认的分区数量为该程序所分配的资源的 CPU 数量。
02.如果是读取hdfs的文件,
一般来说,partition的数量等于文件的数量。
如果单个文件的大小大于hdfs的分块大小,partition的数量就等于 “文件大小/分块大小”。
同时,也可以使用rdd的repartition方法重新划分partition。
运算阶段
经过不同的算子计算后,分区数目又会变化
Task 的数量是由 Partition 决定的
在Spark中有两类task,一类是shuffleMapTask,一类是resultTask,
第一类task的输出是shuffle所需数据,
第二类task的输出是result,

可能的原因:

1、数据出现了倾斜等原因导致其中一个 contaimer 内存负载太大 运行失败
2. Spark的shuffle部分使用了netty框架进行网络传输,但netty会申请堆外内存缓存 Shuffle时,
每个Reduce都需要获取每个map对应的输出,
当一个reduce需要获取的一个map数据比较大 超出配置的限制就报了这个错。
通过spark.sql.adaptive.shuffle.targetPostShuffleInputSize 可设置每个 Reducer 读取的目标数据量,其单位是字节,默认值为 64 MB。
解决内存overhead的问题的方法是:
1.将"spark.executor.memory" 从8g设置为12g。将内存调大
2.将"spark.executor.cores" 从8设置为4。 将core的个数调小。
3.将rdd/dateframe进行重新分区 。 重新分区(repartition)
4.将"spark.yarn.executor.memoryOverhead"设置为最大值,可以考虑一下4096。这个数值一般都是2的次幂。
具体参数配置
set spark.sql.adaptive.repartition.enabled=true;
set spark.sql.shuffle.partitions=2000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=67108864;
数据倾斜
数据倾斜解决

  1. 热点数据
    01.热点数据已知
    过滤: 过滤无关数据以及影响可接受范围内的热点数据: NULL值以及空字符串等值
    分离:
    02.热点数据未知:
    打散: 添加随机字段前缀等方式 随机key实现双重聚合
    2.执行方式
    数据源数据文件不均匀
    Shuffle过程中数据分布不均
    缓解 - 处理shuffle的情况:
    join中有数据倾斜:
    一大一小: 将reduce join转换为map join
    两个数据都比较大: 随机数以及扩容表进行join

专有名词解释

1.常用配置
配置任务可用executor数量
每个Executor占用内存
每个Executor的core数目 spark.executor.cores

The maximum memory size of container to running driver
is determined by
the sum of
spark.driver.memoryOverhead
spark.driver.memory.

The maximum memory size of container to running executor
is determined by
the sum of
spark.executor.memory,
spark.executor.memoryOverhead,
spark.memory.offHeap.size
spark.executor.pyspark.memory.
Shuffle Behavior
Memory Management
spark.memory.fraction
在Spark中,执行和存储共享一个统一的区域M
代表整体JVM堆内存中M的百分比(默认0.6)。
剩余的空间(40%)是为用户数据结构、Spark内部metadata预留的,并在稀疏使用和异常大记录的情况下避免OOM错误
spark.memory.storageFraction

Note: Non-heap memory includes off-heap memory (when spark.memory.offHeap.enabled=true)
and memory used by other driver processes (e.g. python process that goes with a PySpark driver)
and memory used by other non-driver processes running in the same container

spark.executor.memoryOverhead
This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.

spark.memory.offHeap.size
spark.memory.offHeap.enabled
源码
package org.apache.spark.deploy.yarn
DRIVER_MEMORY_OVERHEAD
EXECUTOR_MEMORY : Amount of memory to use per executor process
EXECUTOR_MEMORY_OVERHEAD: The amount of off-heap memory to be allocated per executor in cluster mode
EXECUTOR_CORES = ConfigBuilder(“spark.executor.cores”)
EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder(“spark.yarn.executor.memoryOverhead”)
// Executor memory in MB.
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt

// Resource capability requested for each executorsprivate[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)

package org.apache.spark.memory;
public enum MemoryMode { ON_HEAP, OFF_HEAP}
private[spark] abstract class MemoryManager(
conf: SparkConf,
numCores: Int,
onHeapStorageMemory: Long,
onHeapExecutionMemory: Long) extends Logging {
# Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using sun.misc.Unsafe.
final val tungstenMemoryMode: MemoryMode = {
if (conf.getBoolean(“spark.memory.offHeap.enabled”, false)) {
require(conf.getSizeAsBytes(“spark.memory.offHeap.size”, 0) > 0,
“spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true”)
require(Platform.unaligned(),
“No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.”)
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}
}

Spark开发-Spark内存溢出原因以及解决方式相关推荐

  1. 开发中内存溢出问题及解决

    开发中内存溢出问题及解决 参考文章: (1)开发中内存溢出问题及解决 (2)https://www.cnblogs.com/yangyi1024/p/6417874.html 备忘一下.

  2. jmeter(二十二)内存溢出原因及解决方法

    jmeter(二十二)内存溢出原因及解决方法 参考文章: (1)jmeter(二十二)内存溢出原因及解决方法 (2)https://www.cnblogs.com/imyalost/p/7901064 ...

  3. 常见内存溢出原因及解决思路

    内存溢出(OOM)通常出现在某一块内存空间耗尽的时候,导致内存溢出的原因有很多,常见的有堆溢出.直接内存溢出.永久区溢出等. 堆溢出 堆是Java程序中最为重要的内存空间,由于大量的对象都直接分配在堆 ...

  4. java内存溢出原因及解决_java内存溢出的原因和解决方法

    java内存溢出的原因和解决方法 发布时间:2020-06-15 17:57:39 来源:亿速云 阅读:85 作者:元一 内存溢出含义: 内存溢出(out of memory)通俗理解就是内存不够,通 ...

  5. Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理)

    Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理) 参考文章: (1)Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理) (2)https://ww ...

  6. Android 系统(87)---常见的内存泄漏原因及解决方法

    常见的内存泄漏原因及解决方法 (Memory Leak,内存泄漏) 为什么会产生内存泄漏? 当一个对象已经不需要再使用本该被回收时,另外一个正在使用的对象持有它的引用从而导致它不能被回收,这导致本该被 ...

  7. springboot上传大文件时内存溢出的可能解决办法

    springboot上传大文件时内存溢出的可能解决办法 在springboot中上传大文件时要考虑内存的情况,一般我们会通过在执行服务时加入-Xms512m -Xmx512m等参数加大堆内存,但这是指 ...

  8. 浅谈Metaspace内存溢出原因及JVM参数设置

    浅谈Metaspace内存溢出原因及JVM参数设置 1.Metaspace内存溢出(oom) 日志 原因分析 从Java8开始,Java中的内存模型引入了一个称为元空间(Metaspace)的新内存区 ...

  9. 常见的内存泄漏原因及解决方法

    常见的内存泄漏原因及解决方法 参考文章: (1)常见的内存泄漏原因及解决方法 (2)https://www.cnblogs.com/leeego-123/p/12187677.html 备忘一下.

最新文章

  1. Appium 并发测试基于unitest
  2. CYQ.Data 轻量数据层之路 使用篇五曲 MProc 存储过程与SQL(十六)
  3. Python:数字的格式化输出
  4. mysql保存中文异常Incorrect string value: '\xE4\xBD\xA0\xE5\xA5\xBD' for column'
  5. kafka配置_Kafka生产环境的几个重要配置参数
  6. 由windows/linux转向使用Mac的适应期教程
  7. 一组飒气十足的商务海报PSD分层海报
  8. java成员变量除了方法传递_JAVA类与对象(四)----成员变量与局部变量 、成员方法、构造方法...
  9. Java并发编程技术
  10. 【通信】基于matlab GUI循环码编译码器【含Matlab源码 692期】
  11. C语言ftell()函数
  12. 直线电机的matlab仿真,基于MATLAB的永磁体直线电机的设计
  13. 数据库中的左连接和右连接的区别
  14. Effective java 读书笔记
  15. 华为云HMS Core 助力鸿蒙开发
  16. scrapy爬虫入门
  17. 2021Eclipse 的安装使用说明
  18. 机器学习笔记 - 行列式
  19. C++ 两点之间最短距离
  20. 跨境电商| FaceBook 群发消息

热门文章

  1. 情商高的人什么表现?
  2. 搭建简单GAN生成MNIST手写体
  3. Vue微信公众号分享全过程
  4. 利用免费Excel控件来制作Excel报表
  5. Xcode 快捷键必看!
  6. 南邮慕课c语言第九章答案,南邮mooc-数据结构第十章—排序作业参考答案
  7. vue源码-(了解mustache)-(模板祖师爷)
  8. SLAM - 视觉里程计 - LK光流法
  9. html图片自适应表格,如何用纯CSS实现自适应布局表格
  10. confer安装与计算