Spark之RDD实战篇
RDD编程
在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用action触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。
解析器集成
与Ruby和Python类似,Scala提供了一个交互式Shell (解析器),借助内存数据带来的低延迟特性,可以让用户通过解析器对大数据进行交互式查询。Spark解析器将用户输入的多行命令解析为相应Java对象的示例如图所示
Scala解析器处理过程一般为:
①将用户输入的每一行编译成一个类;
②将该类载入到JVM 中;
③调用该类的某个函数。在该类中包含一个单利对象,对象中包含当前行的变量或函数,在初始化方法中包含处理该行的代码。例如,如果用户输入“varx=5”,在换行输入primln(x),那解析器会定义一个叫Linel的类,该类包含X,第二行编译成println (Linel.getlnstance().x)。
Spark中做了以下两个改变。
(1)类传输:为了让工作节点能够从各行生成的类中获取到字节码,通过HTTP传输。
(2)代码生成器的改动:通常各种代码生成的单例对象是由类的静态方法来提供的。也就是说,当序列化一个引用上一行定义变量的闭包(例如上面例子的Linel.x), Java不会通过检索对象树的方式去传输包含x的Linel实例。因此工作节点不能够得到x,在Spark中修改了代码生成器的逻辑,让各行对象的实例可以被字节应用。在图中显示了 Spark修改之后解析器是如何把用户输入的每一行变成Java对象的。
内存管理
Spark提供了 3种持久化RDD的存储策略:
1.未序列化Java对象存在内存中、
2.序列化的数据存于内存中
3.存储在磁盘中
第一个选项的性能是最优的,因为可以直接访问在Java虚拟机内存里的RDD对象;在空间有限的情况下,第二种方式可以让用户釆用比Java对象更有效的内存组织方式,但代价是降低了性能;第三种策略使用于RDD太大的情形,每次重新计算该RDD会带来额外的资源开销(如I/O等)。对于内存使用LRU回收算法来进行管理,当计算得到一个新的RDD分区,但没有足够空间来存储时,系统会从最近最少使用的RDD回收其一个分区的空间。除非该RDD是新分区对应的RDD,这种情况下Spark会将旧的分区继续保留在内存中,防止同一个RDD的分区被循环调入/调出。这点很关键,因为大部分的操作会在一个RDD的所有分区上进行,那么很有可能己经存在内存中的分区将再次被使用。
多用户管理
RDD模型将计算分解为多个相互独立的细粒度任务,这使得它在多用户集群能够支持多种资源共享算法。特别地,每个RDD应用可以在执行过程中动态调整访问资源。
在每个应用程序中,Spark运行多线程同时提交作业,并通过一种等级公平调度器来实现多个作业对集群资源的共享,这种调度器和Hadoop Fair Scheduler类似。该算法主 要用于创建基于针对相同内存数据的多用户应用,例如:Spark SQL引擎有一个服务 模式支持多用户并行查询。公平调度算法确保短的作业能够在即使长作业占满集群资源的情况下尽早完成。
Spark的公平调度也使用延迟调度,通过轮询每台机器的数据,在保持公平的情况下给予作业高的本地性。Spark支持多级本地化访问策略(本地化),包括内存、磁盘和机 架。
由于任务相互独立,调度器还支持取消作业来为高优先级的作业腾出资源。Spark中可以使用Mesos来实现细粒度的资源共享,这使得Spark应用能相互之间或在不同的计算框架之间实现资源的动态共享。Spark使用Sparrow系统扩展支持分布式调度,该调度允许多个Spark应用以去中心化的方式在同一集群上排队工作,同时提供数据本地性、低延迟和公平性。
RDD创建
集合中创建RDD
从已有的集合中创建RDD
1 |
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10)) |
12345678910111213141516171819202122 |
//并行化操作 def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { //默认是多少呢 assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) }//本地模式下 override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores) //CoarseGrainedSchedulerBackend override def defaultParallelism(): Int = { conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) }//stanlone继承了CoarseGrainedSchedulerBackend 因此绝大部分的情况下并行化处理数据的并行度为CPU的核数 //makeRDD本质上还是调用了parallelize def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) } |
1234567891011 |
/** * Distribute a local Scala collection to form an RDD, with one or more * location preferences (hostnames of Spark nodes) for each object. * Create a new partition for each collection item. *///def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)} |
12 |
val test1 = sc.parallelize(List(1,2,3,4))val seq = List((1,List("datanode1")),(2,List("datanode2"))) //可以提供位置信息 |
def parallelize[T: ClassTag]
和def makeRDD[T: ClassTag]
返回的都是ParallelCollectionRDD,而且这个makeRDD的实现不可以自己指定分区的数量,而是固定为seq参数的size大小。
外部存储系统的数据集创建
包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
1 |
val datasets =sc.textFile("hdfs://datanode1:9000/input/test.txt") |
RDD转换
map()
map操作时对RDD中的每一个素都执行一个指定的函数来产生一个新的RDD,任何元RDD中的元素在新RDD中都有且只有一个元素与之对应。
12 |
val data = sc.parallelize(1 to 10).collect()val map = data.map(_ * 2) |
mapPartitions()
类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。其中preservesPartitioning
表示是否保留父RDD的partitiones分区信息,如果在映射过程中需要频繁创建对象,使用mapPartitions操作要比map操作高 效得多。比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大。如果使用mapPartitions,那么只需要针对每一个分区建立一个connectiono mapPartitionsWithlndex操作作用类似于mapPartitions,只是输入参数多了一个分区索引。
123456789 |
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) } |
1234567891011121314151617181920212223242526 |
//创建RDD使RDD有两个分区 var rdd1= sc.makeRDD(1 to 10,2)///使用mapPartitions对rddl进行重新分区 var rdd2 = rdd1.mapPartitions{ x => { var result = List[Int]() var i = 0 while(x.hasNext){ i += x.next() } result.::(i).iterator }} //rdd2将rddl中每个分区中的数值累加 rdd2.collect //重新对rdd1分区var rdd3 = rdd1.mapPartitionsWithIndex{ (x,iter) => { var result = List[String]() var i = 0 while(iter.hasNext){ i += iter.next() } result.::(x + "|" + i).iterator }}rdd3.collect |
glom()
RDD中每一个分区所有类型为T的数据转变成元素类型为T的数组[Array[T]].
12 |
var rdd = sc.parallelize(1 to 16,4)rdd.glom().collect() |
flatMap()
flatMap操作原RDD中的每一个元素生成一个或多个元素来构建新的RDD
123 |
val rdd1 = sc.parallelize(1 to 5)val flatMap = rdd1.flatMap(1 to _)flatMap.collect |
filter()
返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成.
123 |
var sourceFilter = sc.parallelize(Array("hadoop","spark","flink","hphblog"))val filter = sourceFilter.filter(_.contains("h"))filter.collect |
mapPartitionsWithIndex()
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) =>Iterator[U]
1234567891011121314151617181920212223242526 |
//创建RDD使RDD有两个分区 var rdd1= sc.makeRDD(1 to 10,2)///使用mapPartitions对rddl进行重新分区 var rdd2 = rdd1.mapPartitions{ x => { var result = List[Int]() var i = 0 while(x.hasNext){ i += x.next() } result.::(i).iterator }} //rdd2将rddl中每个分区中的数值累加 rdd2.collect //重新对rdd1分区var rdd3 = rdd1.mapPartitionsWithIndex{ (x,iter) => { var result = List[String]() var i = 0 while(iter.hasNext){ i += iter.next() } result.::(x + "|" + i).iterator }}rdd3.collect |
sample(withReplacement, fraction, seed)
以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。例子从RDD中随机且有放回的抽出50%的数据,随机种子值为2(即可能以1 2 3的其中一个起始值)
1234 |
val rdd = sc.parallelize(1 to 10)rdd.collectvar sample1 = rdd.sample(true,0.5,2)sample1.collect |
distinct([numTasks]))
对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。
123456 |
val rdd = sc.parallelize(List(1,2,2,3,3,4,4,5,5,5,6,6,7,7,8))val rdd1 = rdd.distinct()rdd1.collect val rdd3 = rdd1.distinct(10)rdd3.collect |
partitionBy
对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD。
1234 |
val rdd = sc.parallelize(Array((1,"hadoop"),(2,"spark"),(3,"flink"),(4,"hphblog")),4)rdd.partitions.sizevar rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))rdd.collect |
coalesce((numPartitions, shuffle)
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。shuffle默认关闭.
1234567 |
val rdd = sc.parallelize(1 to 10000,4)val coalesceRDD = rdd.coalesce(2)val shuffleRDD = rdd.coalesce(2,true)shuffleRDD.collectrdd.collectcoalesceRDD.partitions.sizeshuffleRDD.partitions.size |
repartition(numPartitions)
根据分区数,从新通过网络随机洗牌所有数据。底层调用的是coalesce(numPartitions, shuffle = true)
123456 |
val rdd = sc.parallelize(1 to 10000,4)rdd.partitions.sizeval rerdd = rdd.repartition(2)rerdd.partitions.sizeval rerdd = rdd.repartition(4)rerdd.partitions.size |
repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。
123 |
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope { new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)} |
sortBy([ascending], [numTasks])
1234 |
def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) |
12 |
val rdd =sc.parallelize(List(1,2,3,4,5,6,7,8,9))rdd.sortBy(x => x ,ascending=false).collect |
union(otherDataset)
对源RDD和参数RDD求并集后返回一个新的RDD 不去重
1234 |
val rdd1 = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(5 to 15)val rdd3 = rdd1.union(rdd2)rdd3.collect |
subtract (otherDataset)
计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来
123456 |
val rdd1 = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(5 to 15)val rdd3 = rdd1.subtract(rdd2)rdd3.collectval rdd4 =rdd2.subtract(rdd1)rdd4.collect |
intersection(otherDataset)
对源RDD和参数RDD求交集后返回一个新的RDD
123456 |
val rdd1 = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(5 to 15)val rdd3 = rdd1.intersection(rdd2)rdd3.collectval rdd4 = rdd2.intersection(rdd1)rdd4.collect |
cartesian(otherDataset)
笛卡尔积
123 |
val rdd1 = sc.parallelize(1 to 3)val rdd2 = sc.parallelize(2 to 5)rdd1.cartesian(rdd2).collect |
pipe(command, [envVars])
管道,对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD
12345 |
#!/bin/shecho "hello Spark This is Linux bash"while read LINE; do echo ">>>"${LINE}done |
12 |
val rdd = sc.parallelize(List("hi","Hello","hadoop","spark","flink","hphblog"),1)rdd.pipe("/home/hadoop/pipe.sh").collect() |
join(otherDataset, [numTasks])
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
123 |
val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))rdd.join(rdd1).collect() |
cogroup(otherDataset,[numTasks])
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))
类型的RDD
1234567 |
val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))rdd.cogroup(rdd1).collect()val rdd2 = sc.parallelize(Array((4,4),(2,5),(3,6)))rdd.cogroup(rdd2).collect()val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))rdd3.cogroup(rdd2).collect() |
reduceByKey(func, [numTasks])
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
123 |
val rdd = sc.parallelize(List(("hadoop",1),("spark",5),("spark",5),("flink",3)))val reduce = rdd.reduceByKey((x,y)=>(x+y))reduce.collect |
groupByKey
groupByKey也是对每个key进行操作,但只生成一个sequence
12345678 |
val words = Array("hadoop", "spark", "spark", "flink", "flink", "flink")val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))val group = wordPairsRDD.groupByKey()group.collect()val result = group.map(t => (t._1, t._2.sum))result.collectval map = group.map(t => (t._1, t._2.sum))map.collect() |
combineByKey[C]
( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) 对相同K,把V合并成一个集合。
createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就 和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建
那个键对应的累加器的初始值
mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并
mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
123456789101112 |
val scores = Array(("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), ("Wilma", 95), ("Wilma", 98))val input = sc.parallelize(scores)val combine = input.combineByKey( (v)=>(v,1), (acc:(Int,Int),v)=>(acc._1+v,acc._2+1), (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)) val result = combine.map{ case (key,value) => (key,value._1/value._2.toDouble)}result.collect() |
aggregateByKey
(zeroValue:U,[partitioner: Partitioner]) (seqOp:(U, V) => U,combOp: (U, U) => U)
在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
seqOp函数用于在每一个分区中用初始值逐步迭代value,combOp函数用于合并每个分区中的结果。
123456 |
val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)agg.collect()agg.partitions.sizeval rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),1)val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_).collect() |
foldByKey
(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
aggregateByKey的简化操作,seqop和combop相同
123 |
val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)val agg = rdd.foldByKey(0)(_+_)agg.collect() |
sortByKey([ascending], [numTasks])
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
123 |
val rdd = sc.parallelize(Array((3,"hadoop"),(6,"hohblog"),(2,"flink"),(1,"spark")))rdd.sortByKey(true).collect()rdd.sortByKey(false).collect() |
mapValues
针对于(K,V)形式的类型只对V进行操作
12 |
val rdd = sc.parallelize(Array((3,"hadoop"),(6,"hohblog"),(2,"flink"),(1,"spark")))rdd.mapValues(_+"==> www.hphblog.cn").collect() |
RDD行动算子
reduce(func)
通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
1234 |
val rdd = sc.makeRDD(1 to 100,2)rdd.reduce(_+_)val rdd1 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))rdd1.reduce((x,y)=>(x._1 + y._1,x._2 + y._2)) |
collect()
12 |
val rdd = sc.makeRDD(1 to 100,2)rdd.collect() |
在驱动程序中,以数组的形式返回数据集的所有元素
count()
返回RDD的元素个数
12 |
val rdd = sc.makeRDD(1 to 100,2)rdd. count() |
first()
返回RDD的第一个元素(类似于take(1))
12 |
val rdd = sc.makeRDD(1 to 100,2)rdd.first() |
take(n)
返回一个由数据集的前n个元素组成的数组
12 |
val rdd = sc.makeRDD(1 to 100,2)rdd.take(10) |
takeSample(withReplacement,num, [seed])
返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
123 |
val rdd = sc.makeRDD(1 to 100,2)rdd.takeSample(true,10,2)rdd.takeSample(false,10,2) |
takeOrdered(n)
返回前几个的排序
12 |
val rdd = sc.makeRDD(1 to 100,2)rdd.take(10) |
aggregate
(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
12345 |
val rdd = sc.makeRDD(1 to 10,2)rdd.aggregate(1)( {(x : Int,y : Int) => x + y}, {(a : Int,b : Int) => a + b} ) |
为什么是58呢:
1234567891011 |
rdd.mapPartitionsWithIndex{ (partid,iter)=>{ var part_map = scala.collection.mutable.Map[String,List[Int]]() var part_name = "part_" + partid part_map(part_name) = List[Int]() while(iter.hasNext){ part_map(part_name) :+= iter.next()//:+= 列表尾部追加元素 } part_map.iterator }}.collect |
遍历第一个分区的数据我们知道第一个分区的数据是(1,2,3,4,5),第二个分区的数据是(6,7,8,9,10)首先在每一个分区执行(x : Int,y : Int) => x + y
我们传入的zeroValue的值为1,即在part_0中zeroValue+5+4+3+2+1=19
,在part_1中zeroValue+6+7+8+9+10=41
,在将连个分局的结果合并(a : Int,b : Int) => a + b
,并且使用zeroValue的值1即zeroValue+part_0+part_1=1+16+41=58
因此结果为58.
1234 |
rdd.aggregate(1)( {(x : Int,y : Int) => x * y}, {(a : Int,b : Int) => a + b} ) |
相同的我们可以刻分析出来
首先在每一个分区执行(x : Int,y : Int) => x * y
我们传入的zeroValue的值为1,即在part_0中zeroValue*5*4*3*2*1=120
,在part_1中zeroValue*6*7*8*9*10=30240
,在将连个分局的结果合并(a : Int,b : Int) => a + b
,并且使用zeroValue的值1即zeroValue+part_0+part_1=1+120+30240=30361
因此结果为30361.
fold(num)(func)
折叠操作,aggregate的简化操作,seqop和combop一样。
123456 |
val rdd = sc.makeRDD(1 to 10,2)rdd.aggregate(1)( {(x : Int,y : Int) => x + y}, {(a : Int,b : Int) => a + b} )rdd.fold(1)(_+_) |
saveAsTextFile(path)
将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
12 |
val rdd = sc.makeRDD(1 to 10,2)rdd.saveAsTextFile("hdfs://datanode1:9000/spark/saveAsTextFile/") |
saveAsSequenceFile(path)
将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
用于将RDD中的元素序列化成对象,存储到文件中。
countByKey()
针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
12 |
val rdd = sc.parallelize(List(("hadoop",3),("spark",2),("hphblog",3),("flink",9),("flink",9),("spark",10)),3)rdd.countByKey() |
foreach(func)
在数据集的每一个元素上,运行函数func进行更新。注意foreach遍历RDD,将函数f应用于每一个元素.要注意如果对RDD执行foreach,智慧在Executor端有效,而不是Driver.比如rdd.collect().foreach(println),只会在Executor端有效,Driver端是看不到的.
sortBy(funct)
123 |
var rdd = sc.makeRDD(Array(("A",2),("D",5), ("A",1), ("B",6), ("B",3), ("E", 7),("C",4)))rdd.sortBy(x => x).collectrdd.sortBy(x => x._2,false).collect |
RDD持久化
Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果 希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。
缓存方式
RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空 间中。
但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
123456789 |
/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) //默认的持久化是内存中 /** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */def cache(): this.type = persist() //cache最终也是调用了persist方法 |
在存储级别的末尾加上“_2”来把持久化数据存为两份
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
123456789 |
val rdd = sc.makeRDD(1 to 10)val nocache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")val cache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")cache.cachenocache.collectnocache.collectcache.collectcache.collectcache.collect |
我们发现持久化的内存时间戳没有变化,未持久化的内存时间戳是有变化的
RDD检查点机制
Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。
cache 和 checkpoint 是有显著区别的, 缓存把 RDD 计算出来然后放在内存中,但是RDD 的依赖链(相当于数据库中的redo 日志), 也不能丢掉, 当某个点某个 executor 宕了,上面cache 的RDD就会丢掉, 需要通过依赖链重放计算出来, 不同的是, checkpoint是把 RDD 保存在 HDFS中, 是多副本可靠存储,所以依赖链就可以丢掉了,就斩断了依赖链, 是通过复制实现的高容错。
如果存在以下场景,则比较适合使用检查点机制:
1)DAG中的Lineage过长,如果重算,则开销太大(如在PageRank中)。
2)在宽依赖上做Checkpoint获得的收益更大。
为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
1234567891011121314 |
val data = sc.parallelize(1 to 1000 , 5)sc.setCheckpointDir("hdfs://datanode1:9000/checkpoint")data.checkpointdata.countval ch1 = sc.parallelize(1 to 20)val ch2 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")val ch3 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")ch3.checkpointch2.collectch2.collectch2.collectch3.collectch3.collectch3.collect |
Spark之RDD实战篇相关推荐
- Spark之RDD实战篇3
键值对RDD.数据读取与保存.累加器.广播变量: 键值对RDD Spark 为包含键值对类型的 RDD 提供了一些专有的操作 在PairRDDFunctions专门进行了定义.这些 RDD 被称为 p ...
- Spark之RDD理论篇
Spark的基石RDD: RDD与MapReduce Spark的编程模型是弹性分布式数据集(Resilient Distributed Dataset,RDD),它是MapReduce的扩展和延申, ...
- Spark之RDD实战2
宽窄依赖.DAG RDD相关概念: 依赖 RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency). 窄依赖 窄依 ...
- Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...
本博文的主要内容是: 1.rdd基本操作实战 2.transformation和action流程图 3.典型的transformation和action RDD有3种操作: 1. Trandform ...
- 《Spark大数据分析实战》——1.4节弹性分布式数据集
本节书摘来自华章社区<Spark大数据分析实战>一书中的第1章,第1.4节弹性分布式数据集,作者高彦杰 倪亚宇,更多章节内容可以访问云栖社区"华章社区"公众号查看 1. ...
- Spark PersistenceEngine持久化引擎与领导选举代理机制内核原理深入剖析-Spark商业环境实战...
本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客.版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习. Sp ...
- 【大数据Hadoop实战篇】
大数据Hadoop实战篇 第1章 Hadoop概述 1.1 Hadoop是什么 1.2 Hadoop发展历史(了解) 1.3 Hadoop三大发行版本(了解) 1.4 Hadoop优势(4高) 1.5 ...
- Hadoop实战篇(1)
Hadoop实战篇(1) 作者 | WenasWei 前言 在前面介绍过了Hadoop-离线批处理技术的整体架构,接下来便开始学习安装配置并使用 Hadoop ; 将从以下几点介绍: Linux 环境 ...
- Spark Streaming 项目实战 (4) | 得到最近1小时广告点击量实时统计并写入到redis
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...
最新文章
- C/C++ 程序设计员应聘常见面试试题深入剖析
- MyEclipse6.5配置反编译插件
- 博士学位被撤三天后,她的大学教职也被开除!
- Python实例讲解 -- tkinter canvas (设置背景图片及文字)
- Eclipse 皮肤
- Java开发笔记(一百二十七)Swing的标签
- Android studio实现财务记账系统软件android studio开发课程设计
- uniapp中针对H5端做微信分享功能总结
- 文安三中电子计算机老师叫什么,顶岗实习周记:记我的第一次.doc
- 如何让Bing(必应)快速收录你的网站
- 第三方支付机构是什么
- 会议室录播方案及录播设备推荐
- 看中科院大牛博士如何进行文献检索和阅读
- 梦幻柔焦(奥顿效果)
- 手动实现string类的方法实现
- 手游方舟怎么输入代码_方舟自定义代码怎么输入 | 手游网游页游攻略大全
- java创建user类在哪里_java中创建对象有哪几种方式
- C语言编程优化运行速度
- 还不习惯Office 2007
- 电子白板和计算机通过什么链接,交互式电子白板实现了白板与计算机之间的双向交互通信与操...
热门文章
- python一加到二十等于多少_Python 3.1新变化之性能改善篇(转载)
- 3mysql的引擎哪_你知道哪几种MySQL存储引擎?
- php mysql事务实例_PHP + MySQL事务示例
- C# if---else---练习题整理
- Yahoo团队:网站性能优化的35条黄金准则
- 一个servlet,多个dwr.xml配置文件
- 论坛首页调用 来自 http://bbs.apabi.com
- python处理mat数据和处理png的区别_Python---利用scipy.misc等库对jpg以及png等图像数据预处理(用于深度学习喂数据)...
- 二分查找详解——弄懂二分思想的重要性!
- 干货下载丨开源数据库安全管理