简单起见,下述答案仅就无shuffle的单stage Spark作业做了概要解释。对于多stage任务而言,在内存的使用上还有很多其他重要问题没有覆盖。部分内容请参考评论中  @邵赛赛  给出的补充。Spark确实擅长内存计算,内存容量不足时也可以回退,但题主给出的条件(8GB内存跑1TB数据)也确实是过于苛刻了……

首先需要解开的一个误区是,对于Spark这类内存计算系统,并不是说要处理多大规模的数据就需要多大规模的内存。Spark相对Hadoop MR有大幅性能提升的一个前提就是大量大数据作业同一时刻需要加载进内存的数据只是整体数据的一个子集,且大部分情况下可以完全放入内存,正如Shark(Spark上的Hive兼容的data warehouse)论文1.1节所述:

In fact, one study [1] analyzed the access patterns in the Hive warehouses at Facebook and discovered that for the vast majority (96%) of jobs, the entire inputs could fit into a fraction of the cluster’s total memory.

[1] G. Ananthanarayanan, A. Ghodsi, S. Shenker, and I. Stoica. Disk-locality in datacenter computing considered irrelevant. In HotOS ’11, 2011.

至于数据子集仍然无法放入集群物理内存的情况,Spark仍然可以妥善处理,下文还会详述。

在Spark内部,单个executor进程内RDD的分片数据是用Iterator流式访问的,Iterator的hasNext方法和next方法是由RDD lineage上各个transformation携带的闭包函数复合而成的。该复合Iterator每访问一个元素,就对该元素应用相应的复合函数,得到的结果再流式地落地(对于shuffle stage是落地到本地文件系统留待后续stage访问,对于result stage是落地到HDFS或送回driver端等等,视选用的action而定)。如果用户没有要求Spark cache该RDD的结果,那么这个过程占用的内存是很小的,一个元素处理完毕后就落地或扔掉了(概念上如此,实现上有buffer),并不会长久地占用内存。只有在用户要求Spark cache该RDD,且storage level要求在内存中cache时,Iterator计算出的结果才会被保留,通过cache manager放入内存池。

简单起见,暂不考虑带shuffle的多stage情况和流水线优化。这里拿最经典的log处理的例子来具体说明一下(取出所有以ERROR开头的日志行,按空格分隔并取第2列):

val lines = spark.textFile("hdfs://<input>")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split(" ")(1))
messages.saveAsTextFile("hdfs://<output>")

按传统单机immutable FP的观点来看,上述代码运行起来好像是:

  1. 把HDFS上的日志文件全部拉入内存形成一个巨大的字符串数组,
  2. Filter一遍再生成一个略小的新的字符串数组,
  3. 再map一遍又生成另一个字符串数组。

真这么玩儿的话Spark早就不用混了……

如前所述,Spark在运行时动态构造了一个复合Iterator。就上述示例来说,构造出来的Iterator的逻辑概念上大致长这样:

new Iterator[String] {private var head: String = _private var headDefined: Boolean = falsedef hasNext: Boolean = headDefined || {do {try head = readOneLineFromHDFS(...)     // (1) read from HDFScatch {case _: EOFException => return false}} while (!head.startsWith("ERROR"))       // (2) filter closuretrue}def next: String = if (hasNext) {headDefined = falsehead.split(" ")(1)                        // (3) map closure} else {throw new NoSuchElementException("...")}
}

上面这段代码是我按照Spark中FilteredRDD、MappedRDD的定义和Scala Iterator的filter、map方法的框架写的伪码,并且省略了从cache或checkpoint中读取现成结果的逻辑。1、2、3三处便是RDD lineage DAG中相应逻辑嵌入复合出的Iterator的大致方式。每种RDD变换嵌入复合Iterator的具体方式是由不同的RDD以及Scala Iterator的相关方法定义的。可以看到,用这个Iterator访问整个数据集,空间复杂度是O(1)。可见,Spark RDD的immutable语义并不会造成大数据内存计算任务的庞大内存开销。

然后来看加cache的情况。我们假设errors这个RDD比较有用,除了拿出空格分隔的第二列以外,可能在同一个application中我们还会再频繁用它干别的事情,于是选择将它cache住:

val lines = spark.textFile("hdfs://<input>")
val errors = lines.filter(_.startsWith("ERROR")).cache()  // <-- !!!
val messages = errors.map(_.split(" ")(1))
messages.saveAsTextFile("hdfs://<output>")

加了cache之后有什么变化呢?实际上相当于在上述复合Iterator伪码的(2)处,将filter出来的文本行逐一追加到了内存中的一个ArrayBuffer[String]里存起来形成一个block,然后通过cache manager扔进受block manager管理的内存池。注意这里仅仅cache了filter出来的结果,HDFS读出的原始数据没有被cache,对errors做map操作后得到的messages RDD也没有被cache。这样一来,后续任务复用errors这个RDD时,直接从内存中取就好,就不用重新计算了。

内存有限的情况下 Spark 如何处理 T 级别的数据?相关推荐

  1. java中数组的内存模型_Java如何在内存有限的情况下实现一个超大数组?jvm性能调优+内存模型+虚拟机原理能解决什么样问题...

    在上一篇文章中,小编为您详细介绍了关于<变频器调速问题?三星R458更换CPU>相关知识.本篇中小编将再为您讲解标题Java如何在内存有限的情况下实现一个超大数组?jvm性能调优+内存模型 ...

  2. 内存不够的情况下python处理大规模数据

    在机器内存不够的情况下如何处理大规模数据 背景介绍 笔者需要在一个内存为16G的远程服务器上处理几百G的数据,数据包含了几千个csv文件,所有文件存在了一个文件夹下. 目标:用python处理完所有数 ...

  3. 有一副由NxN矩阵表示的图像,这里每个像素用一个int表示,请编写一个算法,在不占用额外内存空间的情况下(即不使用缓存矩阵),将图像顺时针旋转90度。 给定一个NxN的矩阵,和矩阵的阶数N,请返回旋转

    有一副由NxN矩阵表示的图像,这里每个像素用一个int表示,请编写一个算法,在不占用额外内存空间的情况下(即不使用缓存矩阵),将图像顺时针旋转90度. 给定一个NxN的矩阵,和矩阵的阶数N,请返回旋转 ...

  4. oracle恢复drop建的表首次,案例:Oracle dul数据挖掘 没有备份情况下非常规恢复drop删除的数据表...

    天萃荷净 通过Oracle dul工具在没有备份情况下进行非常规恢复,找出drop删除的Oracle数据表中的数据进行恢复 dul对被drop对象进行恢复,需要提供两个信息 1.被删除表所属表空间(非 ...

  5. PHP uniqid()函数可用于生成不重复的唯一标识符,该函数基于微秒级当前时间戳。在高并发或者间隔时长极短(如循环代码)的情况下,会出现大量重复数据。即使使用了第二个参数,也会重复,最好的方案是结

    转载地址:http://www.51-n.com/t-4264-1-1.html PHP uniqid()函数可用于生成不重复的唯一标识符,该函数基于微秒级当前时间戳.在高并发或者间隔时长极短(如循环 ...

  6. 【实习小tip】多层dialog弹窗遮罩问题、elementUI的form表单组件的select框在只读的情况下没办法拿到传来的数据、从弹窗子组件获取数据后需要刷新页面

    解决elementui多层dialog弹窗遮罩问题 弹窗套娃出现了整个屏幕都是遮罩层的问题,需要鼠标点击一下才能正常. 在弹窗组件代码上加上 append-to-body 就可以了,表示这个弹窗是嵌在 ...

  7. 在PHP中如何要json中的数据,如何在不知道键值的情况下在php中读取JSON数据

    我需要在php中读取firebase JSON URL然后显示它. 我的firebase得到了以下.json数据: {"dDsdE4AlB7P5YYd4fWbYTQKCLPh1": ...

  8. 面试官问:在读多写少的情况下,如何优化 MySQL 的数据查询方案

    作者 | 面试官问     责编 | 张文 来源 | 面试官问(ID:interviewer_asked) 面试官问:假设你负责的某业务在双十一期间要搞运营活动,公司投入了大量的营销费用进行推广,此举 ...

  9. 阿里云谦大佬:时间精力有限的情况下如何高效学习前端?

    大家好,我是若川.最近组织了源码共读活动1个月,200+人,一起读了4周源码,欢迎加我微信 ruochuan12 进群参与.今天分享一篇阿里云谦大佬的文章.昨天在群里也有小伙伴说到:大佬们是需要什么学 ...

最新文章

  1. 8个最好的Linux平台商业智能(BI)软件
  2. 【C++深度剖析教程11】C++学习之编写代码实现复数类
  3. mysql常见问题记录
  4. 赞!苏州大学95后硕士一作发《Nature》!
  5. 【重识 HTML + CSS】背景相关知识点
  6. [BUG]Ubuntu server 16.04安装,无网卡驱动解决
  7. [MVC学习笔记]4.使用Log4Net来进行错误日志的记录
  8. 从零开始学ArcGIS Server(三)--如何创建一个个人地理数据库ArcSDE Personal geodatabase...
  9. 阿里巴巴矢量图标库使用步骤
  10. vue 仿豆瓣 爬坑之旅
  11. wordpress实现全站HTTPS
  12. J-LinK-OB改造版 仿真/调试器 使用说明
  13. 【epoll】epoll使用详解(精髓)--研读和修正
  14. BIgDecimal的用法,及与各类数据类型的转换
  15. python飞船项目
  16. uniapp,H5下,安卓手机视频播放自动全屏问题
  17. 乐优商城笔记六:商品详情页
  18. 解决canvas动画嵌套H5移动端适配、嵌套iframe加载canvas动画屏幕适配的问题
  19. vgg16识别咖啡豆
  20. 操作系统 段页式存储管理

热门文章

  1. Django和uwsgi,配合nginx做静态缓存
  2. Facebook Docusaurus 中文文档 自定义页面
  3. Spring的静态代理和动态代理
  4. Eclipse 实用技巧
  5. 用linux下常用命令wget进行整站下载(递归下载至本地)
  6. SQL Server 环形缓冲区(Ring Buffer) -- 介绍
  7. SQLite 函数大全
  8. 快速深入一门语言的几个问题 - Shell909090 - 随笔杂记
  9. koa2+vue实现登陆以及是否登陆控制
  10. 虚幻4引擎角色蓝图Character的Movement组件学习