Resilient Distributed Datasets: A Fault-Tolerant Abstraction forIn-Memory Cluster Computing
1 Intruction
许多框架缺乏充分利用分布式内存的抽象,这使得它们不适用于大量计算都需要重用中间结果的情形,但数据重用又比较常见,比如许多迭代机器学习和图算法、交互式数据工具。
分布式内存抽象的概念——弹性分布式数据集(RDD,Resilient Distributed Datasets),在大量应用中支持数据重用,具有容错性、并行数据结构,这使得用户可将中间结果保留在内存中,通过控制分区来优化数据的放置,并通过丰富的操作进行管理。
设计RDD的挑战是定义能高效提供容错性的编程接口。而现有在集群上的内存存储抽象,如分布式共享内存、键-值存储等,只能通过跨机器复制数据或跨机器更新日志保证容错性,对于数据密集型任务来说,代价太高。
RDD提供了一种基于粗粒度转换的接口,即将同一操作应用于多个数据 项,这样它就可以通过记录建立一个数据集的转换操作来支持容错性, 而不需要记录真实数据,降低了存储开销。
RDD会将一系列转换记录下来,即Lineage,如果RDD中的一个分区丢失了,该RDD拥有足够的信息:它是如何从其他RDD衍生的 ,以重新计算丢失的分区,这样不需要检查点操作就可以重构丢失的数据分区 ,速度快且不需要高代价的复制。
RDD系统---Spark,能够被用于开发多种并行应用,它提供了Scala接口, 可被用于开发多种并行应用,可用于交互式查询大数据集。它是第一个能够使用有效、通用编程语言,并在集群上对大数据集进行交互式分析的系统。
评估:
实验表明,在处理迭代式应用上Spark比Hadoop快高达20多倍,计算数据分析类报表的性能提高了40多倍,同时能够在5-7秒的延时内交互式扫描1TB数据集。
2 Resilient Distributed Datasets (RDDs)
2.1 RDD Abstraction
RDD不总是需要物化。RDD含有如何从其他数据集衍生(即计算)出本RDD的相关信息(即Lineage),据此可以从物理存储的数据计算出相应RDD分区。
用户可以控制RDD的其它两个方面:缓存和分区。用户可确定哪些RDD将被重用,并为之指定存储策略。也可以通过一个RDD每个记录中的键值,指定哪些元素被分区。
2.2 Spark Programming Interface
定义好了RDD,程序员就可在动作中使用RDD了。动作是向应用程序返回值,或向存储系统导出数据的那些操作,例如,count(返回RDD中的元素个数),collect(返回元素本身),save(将RDD输出到存储系统)。在Spark中,只有在动作第一次使用RDD时,才会计算RDD(即延迟计算)。这样在构建RDD的时候,运行时通过管道的方式传输多个转换。
Spark默认是存储到RAM中的,但当RAM不够的时候,Spark会将一些RDD写到磁盘上。用户可以为每个RDD指定缓存优先级,以指定内存中的那些数据被先写到磁盘上。
2.2.1 Example: Console Log Minin
lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.cache()
第1行从HDFS文件定义了一个RDD(即一个文本行集合),第2行获得一个过滤后的RDD,第3行请求将errors缓存起来。注意在Scala语法中filter的参数是一个闭包。
这时集群还没有开始执行任何任务。但是,用户已经可以在这个RDD上执行相应的动作,例如统计错误消息的数目:
errors.count()
用户还可以在RDD上执行更多的转换操作,并使用转换结果,如:
// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count()// Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS")).map(_.split('\t')(3)).collect()
包含错误的第一个动作执行后,Spark会将errors分区存储在内存中,极大提升了后续计算速度。注意,最初的RDD, lines, 是不会被缓存的,因为errors信息也许只是数据的一小部分(小到足以放到内存中)。
2.3 Advantages of the RDD Model
优点1:高效的容错性
系统可通过运行慢结点的备份副本来缓解慢结点负担。而DSM难以实现备份,因为一个任务的两个副本都需要读写同一个内存位置,会干扰彼此的更新。
在RDD的批量操作中,运行时将通过数据局部性来安排调度任务,从而提高性能。
对于基于扫描的操作,如果内存不足以缓存整个RDD,就进行部分缓存。把内存放不下的分区存储到磁盘上,此时性能与现有的数据流系统差不多
2.4 Applications Not Suitable for RDDs
RDD适用于:
对数据集中的所有元素使用同一操作的批量应用。在这种情况中,RDD可通过lineage高效记住每个转换,并且无需大量数据即可恢复丢失分区。
在共享状态下的异步细粒度的更新,比如web存储系统,或增量式web爬虫,这些更适合于用传统的日志更新,或是数据检查点。
我们的目标是为批量分析提供高效的编程模型。
3 Spark Programming Interface
RDD是静态类型对象,由参数指定其元素类型,如RDD[int]是一个整型RDD,但可以省略类型,因为Scala支持类型推断。
尽管用Scala实现RDD在概念上很简单,但还需要解决Scala闭包对象采用反射所带来的问题。如何通过Scala编译器来使Spark可用,需要做的还有很多,但不需要修改Scala编译器。
3.1 RDD Operations in Spark
3.2 Example Applications
3.2.1 Logistic Regression
逻辑回归是是一种比较常见的分类器,旨在寻找一个能将两个数据集分开的超平面w,它采用梯度下降:将w初始化为 一个随机值,在每次迭代中,将w的函数求和,并朝着优化的方向移动。
3.2.2 PageRank
PageRank算法:通过将其它连向它的文章的贡献加在一起,来迭代更新每篇文章的rank。
当前PageRank记为r,顶点表示状态。在每次迭代中,各个顶点向其所有邻居发送贡献值r/n,这里n是邻居的数目。下一次迭代开始时,每个顶点将其分值(rank)更新为 α/N + (1 - α) * ΣCi,这里的求和是各个顶点收到的所有贡献值的和,N是顶点的总数。
可通过控制分区进而对PageRanke算法进行优化。如果明确了links分区(如,哈希分区跨结点的URL的link列表),就可以以同样的方式分区ranks,以确保links和ranks之间的join 操作无需通信(就好像URL的rank和link列表在同一台机器上)
也可以写一个客户分区类将同时连向彼此的页面组在一起(如,通过域名分区URL)。
以上优化操作可被定义为partitionBy:
4 Representing RDDs
选择一种表示,以在广泛的转换中追踪lineage。理想情况下,应该提供尽可能丰富的转换操作,用户可任意组合。
提出了一种简单的基于图的表示方式,无需添加别的逻辑,以简化系统设计。在内核中,通过一个通用接口来表示RDD,该接口有5种信息:一组分区,是数据集的原子件;一组对父RDD的依赖;一个函数,基于其父结点来计算数据集;元数据,描述其分区方案以及数据的放置。
在RDD之间如何表示依赖:窄依赖、宽依赖。窄依赖:父RDD的分区至多被一个子RDD分区使用。宽依赖:可被多个子RDD分区可使用。比如map导致窄依赖,join导致宽依赖(除非父RDD被哈希分区)。
讨论它们之间区别的原因有两个:
HDFS files: 在例子中的输入RDD都是HDFS文件。对于这些RDD,分区返回为每个文件的块返回一个分区(块的偏移量存储在分区对象中), preferredLocations给出块所在节点列表,iterator读取块。
union:在两个RDD上调用union,返回他们父RDD的并集,每个子分区都通过相应父节点运用窄依赖关系计算。
sample:类似于mapping,除了RDD为每个分区存储随机数生成器种子,以确定地抽样父记录。
join:连接两个RDD可能会导致2个窄依赖(如果它们有相同的hash/range分区),或是2个宽依赖,或是混合(如果一个父节点有分区,而另一个没有)。不管是那种情况,输出的RDD都会有一个分区器(继承自父节点的,或者是默认的哈希分区)。
5 Implementation
5.1 Job Scheduling
3、当一个任务失败,如果它的父stage可用,就在另一个节点上重新运行该任务。如果某些stage不可用(比如shuffle时某个map输出丢失),重新提交该stage的任务以计算丢失的分区。
4、lookup操作可通过key来随机存取被哈希分区的RDD中的元素。这种情况下,如果某个需要的分区丢失,任务需要告诉调度器计算该分区。
5.2 Interpreter Integration
Scala具有类似于python的交互式shell,使用内存的数据时具有低时延。
Scala解释器一般这样运行:将用户键入的每一行解析为类,装入JVM,调用类中的函数。比如,如果用户输入x=5, 接着输入println(x),解释器会定义一个包含x的Line1类,并将第2行编译为println(Line1().getInstance().x)
在Spark中,对编译器做了2点改变:
1、类传输。让worker结点获取每行代码创建的类的字节码,并让解释器支持通过HTTP传输这些字节码。
2、对代码的生成逻辑做出修改。之前,是通过类的静态方法来访问行代码所创建的相应对象的,这意味着当序列化一个闭包,它引用了前一行所定义的变量, 比如上例中的Line1.x,Java不会根据对象关系传输包含x的Line1实例。因此,worker节点不会收到x。现在将代码生成逻辑修改为直接引用各个行对象的实例。图6显示了在修改后,编译器是如何将用户键入的行解释为Java对象的。
5.3 Memory Management
1、在内存中反序列化为Java对象。这种方式性能最快,因为Java VM可以在本地访问每一个RDD元素。
2、在内存中反序列化为数据。这种方式允许用户在空间受限时,可以选择一种比Java对象更有效的内存策略。
3、存储在磁盘上。这种方式对于太长的以致不能放在RAM中的RDD比较有用,但在使用时计算代价较高。
为了管理有限的内存空间,提出了RDD级别上的LRU策略(最近最少使用)。当计算一个新的RDD时所需空间不足,便将最近最少使用的RDD替换出去,除非如它与具有新分区的RDD是同一个RDD。这种情况下,在内存中记录旧分区,以防止同一个RDD循环的进来、出去。
另外,用户可以为每个RDD指定“缓存优先级”
5.4 Support for Checkpointing
当前Spark版本提供检查点API,但由用户决定是否需要执行检查点操作。今后将实现自动检查点,因为调度器掌握了RDD的各种信息,可根据成本效益分析确定RDD Lineage图中的最佳检查点位置。
此外,RDD是只读的,不需要考虑一致性,因此它的检查点操作相对于别的通用分布式内存会更简单。
6 Evaluation
1)对于迭代式机器学习应用,Spark比Hadoop快20多倍。这种加速比是因为:数据存储在内存中,同时Java对象缓存避免了反序列化操作。
2)用户编写的应用程序执行结果很好。尤其是,Spark分析报表比Hadoop快40多倍。
3)如果节点发生失效,通过只重建那些丢失的RDD分区,Spark能够实现快速恢复。
4)Spark能够在5-7s延时范围内,交互式地查询1TB大小的数据集。
在下面的测试中,除非特殊说明,实验使用4核15GB内存的m1.xlarge EC2 节点,块大小为256M的HDFS作为存储。在每个作业运行执行时,为了保证磁盘读时间更加精确,清理集群中的OS缓存。
6.1 Iterative Machine Learning Applications
Hadoop:0.20.2稳定版。
HadoopBinMem:一个hadoop部署,在首轮迭代中将输入数据转换为开销较低的二进制,以减少后续文本解析的开销,并存入位于内存的HDFS实例中。
Spark:RDD的实现。
2个算法的运行环境:使用25-100台机器,在100GB的数据集上迭代10代。它们的主要区别在于:每字节的运算量不同。k-means是计算密集型,而logistics回归不是,但在反序列化和I/O方面比较耗时。
因为典型的学习算法都需要数十次迭代达到收敛,分别统计了第一次迭代和后续迭代的时间,发现基于RDD共享数据的方式极大提高了后续迭代速度。
1、Hadoop软件栈的最小开销。
2、使用数据时,HDFS的开销。
3、将二进制记录转换为内存中Java对象时的反序列化开销。
为了测估1,运行空的Hadoop作业,至少需要25s来完成启动作业、开始任务、清理。
对于2,发现为了服务每个数据集,HDFS执行了多次复制和校验和计算。
6.2 PageRank
6.3 Fault Recovery
1、将k-means算法正常运行10代
2、第6代时一个节点失效
每一代都包含了运行在100GB数据上的400个任务。
但如果一个检查点失效,至少需要几代来恢复,取决于检查点的频率。甚至,需要复制100GB的数据,导致要么消耗的RAM是Spark的2倍,要么等待100GB的写盘。而RDD的lineage仅仅需要少于10KB的空间。
6.4 Behavior with Insufficient Memory
6.5 User Applications Built with Spark
6.6 Interactive Data Mining
7 Discussion
7.1 Expressing Existing Programming Models
可用RDD表达的模型包括:
a、MapReduce
b、DryadLINQ
c、SQL
d、Pregel
e、Iterative MapReduce
f、Batched Stream Processing
1)许多应用程序本身就对许多记录应用相同操作,这使得它们易于表达。
2)RDD本身的不变性并不是障碍,因为1个RDD可以创建多个RDD来表示不同的数据集版本。
3)为什么以前的框架没有同等的通用性?它们共同原因在于:缺乏数据共享的抽象。
7.2 Leveraging RDDs for Debugging
1)重建RDD,并执行交互式查询。
2)当作业位于单进程调试器中时,通过重新计算所依赖的RDD,可重新运行该作业的任何任务。
8 Conclusion
通用:可表达广泛的并行应用,包括许多为迭代计算而设计的专用模型,和那些模型没有处理的新应用。
容错:RDD提供了基于粗粒度转换的API,可使用lineage高效恢复数据。
高效:已经实现的RDD---Spark,实现处理迭代式作业的速度超过Hadoop大约20倍,而且还能够交互式查询数百G数据
Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing
Authors: | Matei Zaharia | University of California, Berkeley |
Mosharaf Chowdhury | University of California, Berkeley | |
Tathagata Das | University of California, Berkeley | |
Ankur Dave | University of California, Berkeley | |
Justin Ma | University of California, Berkeley | |
Murphy McCauley | University of California, Berkeley | |
Michael J. Franklin | University of California, Berkeley | |
Scott Shenker | University of California, Berkeley | |
Ion Stoica | University of California, Berkeley |
Published in: |
· Proceeding |
NSDI'12 Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation |
Pages 2-2 |
Resilient Distributed Datasets: A Fault-Tolerant Abstraction forIn-Memory Cluster Computing相关推荐
- Apache Spark 2.2.0 中文文档 - Spark RDD(Resilient Distributed Datasets)
Spark RDD(Resilient Distributed Datasets)论文 概要 1: 介绍 2: Resilient Distributed Datasets(RDDs) 2.1 RDD ...
- Spark学习笔记10-RDD(Resilient Distributed Datasets)
1.RDD概念 RDD(Resilient Distributed Datasets),弹性分布式数据集.是分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的 ...
- Spark的核心RDD(Resilient Distributed Datasets弹性分布式数据集)
Spark的核心RDD(Resilient Distributed Datasets弹性分布式数据集) 铺垫 在hadoop中一个独立的计算,例如在一个迭代过程中,除可复制的文件系统(HDFS)外没有 ...
- Resilient Distributed Datasets (RDD)
Resilient Distributed Datasets RDD本质上是一组数据的Spark表示,分布在多台机器上,使用API让您对其进行操作.RDD可以来自任何数据源,例如文本文件,通过JD ...
- Spark-RDD论文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster 》有感
动机 当前很多分布式计算框架无法实现高效的迭代式计算以及交互式数据挖掘,包括Hadoop!,首先为了解决高效这个问题,RDD提出基于内存的迭代思想,直接鄙视了Hadoop要不断进行磁盘Spill的弊端 ...
- Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing 阅读笔记
文章目录 背景 弹性分布式数据集(RDDs) 如何产生 RDD 用户可以对 RDD 的控制 Spark 提供的编程接口 lineage 图示 RDDs 表示 实现 作业调度 调度思想 阶段(stage ...
- (翻译)Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
中文版链接:http://shiyanjun.cn/archives/744.html
- RDD(Resilient Distributed Datasets 弹性分布式数据集)
RDD是spark计算框架的核心,spark相比于hadoop的优越性在RDD中体现得淋漓尽致.对于RDD的介绍,最好的资料就是那篇专门介绍RDD的论文了,已经有中文翻译.使用scala编写spark ...
- 【Paper】2016_A Learning-Based Fault Tolerant Tracking Control of an Unmanned Quadrotor Helicopter
Liu, Z., Yuan, C., Zhang, Y. et al. A Learning-Based Fault Tolerant Tracking Control of an Unmanned ...
最新文章
- crawlerNo.1(video,audio,image)
- 【控制】控制学习路线资源整理
- 平滑线反锯齿工具_PS大神常用选框类工具有哪些?其实很简单,小白认真学也能懂...
- JTable调整列宽
- 線陣相機處理時間計算方法
- Elasticsearch 基础入门
- dedecms php5.4 无法退出后台,PHP5.4版本织梦dedecms后台退出空白的解决方法
- Java API概述及应用
- pdfwin10闪退_win10系统打开文件夹闪退的解决方法
- 基于jmeter测试web接口,看完都说学会了
- 发现一个 WPF/E Asp.net Server Control
- python json.dumps() 中文乱码问题
- Linux内核中的IPSEC实现(6)
- Command not found 解决
- linux进程地址空间内核,菜鸟求问linux进程地址空间问题
- sc query mysql_SC 命令用法
- FTTB+NAT+pppoe+CBAC+*** client+AAA配置
- c语言程序设计名片管理系统,《名片管理系统》 - C语言课程设计
- npm使用国内淘宝镜像的方法(两种)
- 《Python自然语言处理》-ch1-语料库
热门文章
- python saltstack_saltstack-api使用详解
- 【原创干货】用户标签/用户分群在DMP(数据管理平台)中的应用 | SG小组第一期
- audio插入背景音乐_HTML5添加背景音乐 3种方法个人推荐audio
- python读取大智慧数据_大智慧数据读取器day.dat
- 领导能力--学会说话,学会沟通
- 实现上一篇,下一篇的效果
- 开通微信扫码支付,申请微信扫码支付怎么用
- html获取jsq中定义的参数,如何在HTML5中标记也可用作行标题的数据单元格?
- 伯努利分布、二项分布
- 问题 | iview Row Col 修改样式