Spark弹性数据集
2019独角兽企业重金招聘Python工程师标准>>>
Spark中迭代式机器学习算法的数据流可以通过图2.3来进行理解。将它和图2.1中Hadoop MR的迭代式机器学习的数据流比较一下。你会发现在Hadoop
MR 中每次迭代都会涉及HDFS的读写,而在Spark中则要简单得多。它仅需从HDFS到Spark中的分布式共享对象空间的一次读入——从HDFS文件中创建RDD。RDD可以重用,在机器学习的各个迭代中它都会驻留在内存里,这样能显著地提升性能。当检查结束条件发现迭代结束的时候,会将RDD持久化,把数据写回到HDFS中。后续章节会对Spark的内部结构进行详细介绍——包括它的设计,RDD,以及世系等等。
图2.3 Spark中进行迭代式计算的数据共享
Spark的弹性分布式数据集
RDD这个概念跟我们讨论到的Spark的动机有关——就是能让用户操作分布式系统上的Scala集合。Spark中的这个重要的集合就是RDD。RDD可以通过在其它RDD或者稳态存储中的数据(比如说,HDFS中的文件)上执行确定性操作来进行创建。创建RDD的另一种方式就是将Scala集合并行化。RDD的创建也就是Spark中的转换操作。RDD上除了转换操作,还有其它的一些操作,比如说动作(action)。像map, filter以及join这些都是常见的转换操作。RDD有意思的一点在于它可以将自己的世系或者说创建它所需的转换序列,以及它上面的动作给存储起来。这意味着Spark程序只能拥有一个RDD引用——它知道自己的世系,包括它是如何创建的,上面执行过哪些操作。世系为RDD提供了容错性——即使它丢失了,只要世系本身被持久化或者复制了,就仍能重建整个RDD。RDD的持久化以及分块可以由程序员来指定。比如说,你可以基于记录的主键来进行分块。
在RDD上可以执行许多操作。包括count,collect以及save,它们分别可以用来统计元素总数,返回记录,以及保存到磁盘或者HDFS中。世系图中存储了RDD的转换以及动作。表2.1中列举了一系列的转换及动作。
表2.1
转换 | 描述 |
Map(function f1) | 把RDD中的每个元素并行地传递给f1,并返回结果的RDD |
Filter(function f2) | 选取出那些传递给函数f2并返回true的RDD元素 |
flatMap(function f3) | 和map类似,但f3返回的是一个序列,它能将单个输入映射成多个输出。 |
Union(RDD r1) | 返回RDD r1和自身的并集 |
Sample(flag, p, seed) | 返回RDD的百分之p的随机采样(使用种子seed) |
动作 | 描述 |
groupByKey(noTasks) | 只能在键值对数据上进行调用——返回的数据按值进行分组。并行任务的数量通过一个参数来指定(默认是8) |
reduceByKey(function f4,noTasks) | 对相同key元素上应用函数f4的结果进行聚合。第二个参数是并行的任务数 |
Join(RDD r2, noTasks) | 将RDD r2和对象自身进行连接——计算出指定key的所有可能的组合 |
groupWith(RDD r3, noTasks) | 将RDD r3与对象自身进行连接,并按key进行分组 |
sortByKey(flag) | 根据标记值将RDD自身按升序或降序来进行排序 |
动作 | 描述 |
Reduce(function f5) | 使用函数f5来对RDD的所有元素进行聚合 |
Collect() | 将RDD的所有元素作为一个数组来返回 |
Count() | 计算RDD的元素总数 |
take(n) | 获取RDD的第n个元素 |
First() | 等价于take(1) |
saveAsTextFile(path) | 将RDD持久化成HDFS或者其它Hadoop支持的文件系统中路径为path的一个文件 |
saveAsSequenceFile(path) | 将RDD持久化为Hadoop的一个序列文件。只能在实现了Hadoop写接口或类似接口的键值对类型的RDD上进行调用。 |
动作 | 描述 |
foreach(function f6) | 并行地在RDD的元素上运行函数f6 |
下面将通过一个例子来介绍下如何在Spark环境中进行RDD的编程。这里是一个呼叫数据记录(CDR)——基于影响力分析的应用程序——通过CDR来构建用户的关系图,并识别出影响力最大的K个用户。CDR结构包括id,调用方,接收方,计划类型,呼叫类型,持续时长,时间,日期。具体做法是从HDFS中获取CDR文件,接着创建出RDD对象并过滤记录,然后再在上面执行一些操作,比如说通过查询提取出特定的字段,或者执行诸如count的聚合操作。最终写出的Spark代码如下:
val spark = new SparkContext();
Call_record_lines = spark.textFile(“HDFS://….”);
Plan_a_users = call_record_lines.filter(_.
CONTAINS(“plana”)); // RDD上的过滤操作.
Plan_a_users.cache(); // 告诉Spark运行时,如果仍有空间,就将这个RDD缓存到内存里Plan_a_users.count();
%% 呼叫数据集处理中.
RDD可以表示成一张图,这样跟踪RDD在不同转换/动作间的世系变化会简单一些。RDD接口由五部分信息组成,详见表2.2。
表2.2 RDD接口
信息 | HadoopRDD | FilteredRDD | JoinedRDD |
分区类型 | 每个HDFS块一个分区 | 和父RDD一致 | 每个reduce任务一个 |
依赖类型 | 无依赖 | 和父RDD是一对一的依赖 | 在每一个父RDD上进行shuffle |
基于父RDD来计算数据集的函数 | 读取对应块的数据 | 计算父RDD并进行过滤 | 读取洗牌后的数据并进行连接 |
位置元数据(preferredLocations) | 从命名节点中读取HDFS块的位置信息 | 无(从父RDD中获取) | 无 |
分区元数据(partitioningScheme) | 无 | 无 | HashPartitioner |
Spark的实现
Spark是由大概20000行Scala代码写就的,核心部分大概是14000行。Spark可以运行在Mesos, Nimbus或者YARN等集群管理器之上。它使用的是未经修改的Scala解释器。当触发RDD上的一个动作时,一个被称为有向无环图(DAG)调度器的Spark组件就会去检查RDD的世系图,同时会创建各阶段的DAG。每个阶段内都只会出现窄依赖,宽依赖所需的洗牌操作就是阶段的边界。调度器在DAG的不同阶段启动任务来计算出缺失的分区,以便重构整个RDD对象。它将各阶段的任务对象提交给任务调度器(Task Scheduler, TS)。任务对象是一个独立的实体,它由代码和转换以及所需的元数据组成。调度器还负责重新提交那些输出丢失了的阶段。任务调度器使用一个被称为延迟调度(Zaharia等 2010)的调度算法来将任务分配给各个节点。如果RDD中有指定了优先区域的话,任务会被传送给这些节点,否则会被分配到那些有分区在请求内存任务的节点上。对于宽依赖而言,中间记录会在那些包含父分区的节点上生成。这样会使得错误恢复变得简单,Hadoop MR中map输出的物化也是类似的。
Spark中的Worker组件会负责接收任务对象并在一个线程池中调用它们的run方法。它将异常或者错误报告给TaskSetManager(TSM)。TSM是任务调度器管理的一个实体——每个任务集都会对应一个TSM,用于跟踪任务的执行过程。TS是按先进先出的顺序来轮询TSM集的。通过插入不同的策略或者算法,这里仍有一定的优化空间。执行器会与其它的组件进行交互,比如说块管理器(BM),通信管理器(CM),Map输出跟踪器(MOT)。块管理器是节点用于缓存RDD并接收洗牌数据的组件。它也可以看作是每个worker中只写一次的K-V存储。块管理器和通信管理器进行通信以便获取到远端的块数据。通信管理器是一个异步网络库。MOT这个组件会负责跟踪每个map任务都在哪运行并把这些信息返回给归约器——Worker会缓存这个信息。当映射器的输出丢失了的话,会使用一个“分代ID”来将这个缓存置为无效。Spark中各组件的交互如图2.4中所示。
图2.4 Spark集群中的组件
RDD的存储可以通过下面这三种方式来完成:
- 作为Java虚拟机中反序列化的Java对象:由于对象就在JVM内存中,这样做的性能会更佳。
- 作为内存中序列化的Java对象:这么表示内存的使用率会更高,但却牺牲了访问速度。
- 存储在磁盘上:这样做性能最差,但是如果RDD太大以至于无法存放到内存中的话就只能这么做了。
一旦内存满了,Spark的内存管理会通过最近最少使用(LRU)策略来回收RDD。然而,属于同一个RDD的分区是无法剔除的——因为通常来说,一个程序可能会在一个大的RDD上进行计算,如果将同一个RDD中的分区剔除的话则会出现系统颠簸。
世系图拥有足够的信息来重建RDD的丢失分区。然而,考虑到效率的因素(重建整个RDD可能会需要很大的计算量),检查点仍是必需的——用户可以自主控制哪个RDD作为检查点。使用了宽依赖的RDD可以使用检查点,因为在这种情况下,计算丢失的分区会需要显著的通信及计算量。而对于只拥有窄依赖的RDD而言,检查点则不太适合。
转载自并发编程网 – ifeve.com
转载于:https://my.oschina.net/lgscofield/blog/497145
Spark弹性数据集相关推荐
- Hive数据分析——Spark是一种基于rdd(弹性数据集)的内存分布式并行处理框架,比于Hadoop将大量的中间结果写入HDFS,Spark避免了中间结果的持久化...
转自:http://blog.csdn.net/wh_springer/article/details/51842496 近十年来,随着Hadoop生态系统的不断完善,Hadoop早已成为大数据事实上 ...
- Spark弹性式数据集RDDs
title: Spark弹性式数据集RDDs date: 2021-05-08 16:24:20 tags: Spark RDD 全称为 Resilient Distributed Datasets, ...
- Hadoop+Spark 大数据集群日常1 (There are 0 datanode(s) running报错 处理)
Hadoop+Spark 大数据集群日常1 由于项目涉及Hadoop+Spark大数据集群,特写此文档,方便将来处理类似问题参照,也为后人提供解决方案. 本人才疏学浅,文档难免有错漏与不妥之处,欢迎与 ...
- 颠覆大数据分析之Spark弹性分布式数据集
Spark中迭代式机器学习算法的数据流可以通过图2.3来进行理解.将它和图2.1中Hadoop MR的迭代式机器学习的数据流比较一下.你会发现在Hadoop MR中每次迭代都会涉及HDFS的读写,而在 ...
- Spark弹性分布式数据集RDD:基于内存集群计算的容错抽象
摘要 我们提出的弹性分布式数据集(RDDs),是一个让程序员在大型集群上以容错的方式执行基于内存计算的分布式内存抽象.RDDs受启发于两类使用当前计算框架处理不高效的应用:迭代算法和交互式数据挖掘工具 ...
- 《循序渐进学Spark》一1.6 使用Spark Shell开发运行Spark程序
本节书摘来自华章出版社<循序渐进学Spark>一书中的第1章,第1.6节,作者 小象学院 杨 磊,更多章节内容可以访问云栖社区"华章计算机"公众号查看. 1.6 使用S ...
- 使用Spark Shell开发运行Spark程序
使用Spark Shell开发运行Spark程序 Spark Shell是一种学习API的简单途径,也是分析数据集交互的有力工具. 虽然本章还没涉及Spark的具体技术细节,但从总体上说, Spark ...
- Fluid 0.5 版本发布:开启数据集缓存在线弹性扩缩容之路
作者 | 顾荣 南京大学PASALab, Fluid项目co-founder 来源 | 阿里巴巴云原生公众号 导读:为了解决大数据.AI 等数据密集型应用在云原生场景下,面临的异构数据源访问复杂.存 ...
- Fluid 0.5 版本:开启数据集缓存在线弹性扩缩容之路
简介:为了解决大数据.AI 等数据密集型应用在云原生场景下,面临的异构数据源访问复杂.存算分离 I/O 速度慢.场景感知弱调度低效等痛点问题,南京大学PASALab.阿里巴巴.Alluxio 在 20 ...
最新文章
- C语言实现最简单的2048小游戏
- flash与php交互,flash与PHP的交互技巧
- 【图文教程】Ubuntu software解决下载速度过慢
- multi mysql_mysqld_multi 的使用方法
- php模糊搜索慢怎么办,MySQL中文模糊检索问题的解决方法_php
- STS安装 activiti-designer-5.18.0插件
- txt文本变为粗体_如何在PHP中使文本变为粗体?
- 2022年轻人潮流爱好报告:被朋友圈高赞的神秘爱好,不烧钱还能脱单
- Unity3d开发IOS游戏 基础
- CMake的简单使用
- Windows 8.1 with Update MSDN 简体/英文/繁体
- (js技巧)input文本框回车或者失去光标触发事件
- 《激荡三十年》十八、青春飞扬——互联网的崛起
- 双系统重装win10后恢复grub引导
- 线性模型(Linear Model)
- 近观趣店,“审视”罗敏
- mx150 宏碁swift3_大众化的轻薄本-宏碁蜂鸟Swift 3评测报告
- Matlab_GUI gcf、gca 以及gco 的区别用法
- SQL积累 复制一个表数据到另一个表 SELECT INTO -- INSERT INTO ... SELECT
- 2021周记12:理财、朋友与焦虑