RDD的操作

  1.1 概述

      RDD整体包含两大类操作

      transformation 从现有中创建一个新的数据集

      action 在对数据集做一定程度的计算后将结果返回

    对于所有的transformation,都是Lazy的,也就是说它不会立即执行,只是单纯的记住怎么样从原来的数据集进行转换的逻辑而已,它仅在某一个计算需要的情况下,才会被真正执行.

    因为transformation 的Lazy性,RDD支持在每次计算时都进行重新计算,当然你可以将这个RDD保存下来 (persist  or cache方法)避免每次重计算

    可以通过设置不同的存储级别,将数据保存到硬盘,内存,或者选择同步多个副本到多个节点中.

    1.2 集群环境下的变量与操作

    集群环境,所有操作最终会交给executors去执行.而变量,会以数据副本的形式交给executors.很多时候,这与我们非集群环境下的开发思维有非常大的不同.

    1.2.1 集群下的闭包

        RDD是支持闭包操作的.但务必注意的是Spark不保证对闭包之外的对象引用进行的变化.

        原因是闭包的会被序列化发生给每一个executor,对于闭包的之外的对象引用会拷贝一个副本给executor.这时多个executor执行至少是跨JVM的

        这时对这个副本对象的变更没有任何意义,因为每个JVM(executor)的副本都是独立的.

    1.2.2 集群下的print

      集群环境下,print不会在driver端有任何输出.

      原因也是一样,print最终是在每个executor执行,其输出也是在每个executor的stdout上,在driver端,是不会有这些输出的.

      如果想在driver输出,一个比较简单的办法是调用collect()将结果发送到driver端在进行print,但这样可能会造成driver内存爆掉(所有executor的数据涌入).

        比较推荐的做法是rdd.take(100).foreach(println)

     1.2.3 共享变量

      因为集群下,变量只会以副本方式交给executor,而不会将变量的值变化传回driver,所以一个可读写共享的变量是非常有用的.

      Spark提供了两种共享变量 broadcast(广播变量) 和 accumulators(累加器)

      1.2.3.1 广播变量(broadcast)

        广播变量允许将一个只读变量的副本发送到每个机器上(executor机器),而不是对每一个任务发送一个副本.这样在同一机器上的多个任务,就可以反复使用这个变量了.      

        注意:

          广播变量只会对每个节点分发一次,所以一般来说,广播变量不应该再被修改了.以保证每个广播变量的副本的值都是一致的

          如果广播变量被修改,则需要将广播变量重新分发

        另:

          举个例子:Spark的action操作本身是通过一系列的stage来完成的,这些Stage是通过分布式的shuffle操作来进行切分的.而在每个Stage之间,Spark自动使用广播变量.

          这里用法说明,只有数据本身会在多个Stage的多个任务中反复使用,或者说缓存这个数据是非常重要且非常必要的情况下,使用广播变量才有意义.

        广播变量的使用如下:      

          // SparkContext.broadcast(v)进行创建,返回的是广播变量的封装器,以.value读取广播变量的的值val broadcastVar = sc.broadcast(Array(1, 2, 3))val v = broadcastVar.value

      1.2.3.2 累加器(accumulators) 

        累加器变量仅支持累加操作,因为可以在并行计算执行一些特殊的计算(比计数或者求和).并且累加器的变化是可以在UI的Task界面上看见的(注,不支持Python)

        累加器操作,依然遵循RDD的Lazy原则:

          累加器更新操作是在Action中,并且在每个任务中只会执行一次(如果任务失败重启了,累加器更新不会执行)

          而在transformation中,累加器依然不会立即执行更新,如果transformation被重新执行了,则累加器操作会重复执行

        对于累加器变量,Spark原生支持数值类型.一个使用例子如下        

          val accum = sc.longAccumulator("My Accumulator")sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))println(accum.value)

         也可以创建继承AccumulatorV2的类型,来实现自定义类型的累加器,例子如下:          

          //两个泛型参数->累加的元素类型和结果类型可以不同的class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {private val myVector: MyVector = MyVector.createZeroVectordef reset(): Unit = {myVector.reset()}def add(v: MyVector): Unit = {myVector.add(v)}...}// 创建一个自定义的累加器类型:val myVectorAcc = new VectorAccumulatorV2//将这个触发器注册到SparkContext:sc.register(myVectorAcc, "MyVectorAcc1")

  1.3 RDD的一些基本操作

    1.3.1 Transformations 依赖关系

      RDD是由父RDD经过转换函数形成一个个子RDD(子RDD依赖父RDD).针对不同的转换函数,以父RDD分区与子RDD分区的关系为标准,Spark将这些依赖关系分为两类.

      窄依赖

        窄依赖是指转换后,父RDD的每个分区只会被某一个子RDD分区使用.(一对一或者多对一的关系).

        所以窄依赖一般出现在map,filter等子分区沿用父分区,不会发生重分区的时候.

        宽依赖

        宽依赖是指转换后,父RDD的某个或某些分区会被几个子RDD分区使用.(某个分区数据部分在这个RDD,部分在那个RDD,一对多关系)

        宽依赖一般出现在groupByKey等子分区一定会发生重分区的时候

      两种依赖关系的对比

        一般来说,窄依赖比宽依赖对执行优化更加有利

        i).窄依赖允许集群节点上以流水线的形式直接计算所有分区

           宽依赖则需要先计算好父分区的分区信息,然后再以一个shuffle完成重分区,

          ii).某个子分区异常需要重计算时,会对这个子分区所依赖的所有分区进行计算.(这是宽窄依赖都必须的),但是针对分区数据而言

          窄依赖,一个或多个父分区完全对应一个子分区.对这些父分区的重计算,利用率是100%

          宽依赖,父分区的数据不完全对应一个子分区(一对多关系,父分区的某些部分是其它分区的),但此时依然需要重计算父分区全部数据,造成计算浪费(因为白计算其它分区的数据)

    1.3.2 Transformations 操作

      map

        对RDD中的元素执行一个指定函数,将执行结果作为新元素产生一个新的RDD.

          与其它map系函数区别,map新元素的完全是Map函数的执行结果返回,所以新RDD的数量与老RDD是一一对应的.        

        val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)rdd.map(rec=>rec.split(" ")>).collect().map(println(_))//返回结果是rec.split(" ")结果(一维数组)=>[["aa","bb"],["cc","dd"]]

      flatMap

        与map相同,但结果会扁平化.即如果结果是迭代器容器的,会将元素从容器中取出再返回       

        val rdd = sc.parallelize(Seq("aa bb","cc"),2)rdd.flatMap(rec=>rec.split(" ")).collect().map(println(_));//返回结果["aa","bb","cc"]//flatMap如以下这种方式使用是不行,flatMap返回结果必须是TraversableOnce[U](可迭代一次的类型)//rdd.flatMap(rec=>(rec,1)).collect().map(println(_));     

      mapPartitions

        与map相同,不过是以分区为单位,所以语法要求必须为 f: Iterator[T] => Iterator[U],注意返回结果不是以分区为单位,而是所有分区执行函数的结果的合并      

        val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)rdd.mapPartitions(part=>part.map(rec=>rec.split(" "))).collect().map(println(_))//结果是 [["aa","bb"],["cc","dd"],["ee","ff"]]

      mapPartitionsWithIndex

        与mapPartitions类似,不过它带有分区的index以供使用.所以语法要求为f: (Int, Iterator[T]) => Iterator[U]        

        val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)rdd.mapPartitionsWithIndex((partIdx,part)=>part.map(rec=>(partIdx,rec))).collect().map(println(_))//返回结果 (0,aa bb),(1,cc dd),(1,ee ff)

      sample

       抽样函数.可以从数据集中按一定比例抽取部分数据,抽取之后可以选择是否返回

       语法要求 withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong         

      val rdd = sc.parallelize(1 to 50)rdd.sample(false,0.2,System.currentTimeMillis).map(rec=>(rec,1)).collect().map(print(_)+" ")//返回结果 (8,1)(21,1)(26,1)(27,1)(34,1)(43,1)(46,1)(49,1)

      union      

      将两个数据集合并(包含数据重复)                 

      val rdd = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(11 to 20)rdd.union(rdd2).map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))//返回结果 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20

     intersection

      将两个数据集合并,取交集作为结果返回

      val rdd = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(5 to 15)rdd.intersection(rdd2).map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))//返回结果 6 7 9 8 10 5

       distinct      

      对当前结果集去重返回            

        val rdd = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(5 to 15)rdd.union(rdd2).distinct().map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))//返回结果 4 14 6 8 12 10 2 13 15 11 1 3 7 9 5

    groupByKey

      将一个键值对类型的结果集按照key进行分组(如果是为分组聚合,groupByKey相比reduceByKey效率更低,因为少一个map-shuffer的combine)  

       val rdd = sc.parallelize(Seq("aa bb","cc dd","bb cc"),2)rdd.flatMap(rec=>rec.split(" ")).map(rec=>(rec,1)).groupByKey().collect().map(rec=>print(s" ${rec._1} ${rec._2.sum} |"));//返回结果 aa 1 | dd 1 | bb 2 | cc 2 |

    reduceByKey

      将一个键值对类型数据集,使用指定的函数分组聚合为另一个键值对类型数据集,(相比groupByKey性能更高,因为可以在map-shuffer进行combine减少数据量)    

      val rdd = sc.parallelize(Seq("aa bb","cc dd","bb cc"),2)rdd.flatMap(rec=>rec.split(" ")).map(rec=>(rec,1)).reduceByKey((value1,value2)=>value1+value2).collect().map(rec=>print(s"${rec} "))//返回结果 (aa,1) (dd,1) (bb,2) (cc,2)

    aggregate

      给出一个默认基准值,先使用seqOp遍历分区内元素传入基准值进行聚合,再对分区间结果使用combOp聚合为最后结果.

        注意aggregate返回的结果直接是聚合结果(不是RDD),并且要与原RDD的类型一致     

      val rdd = sc.parallelize(1 to 10);/*** zeroValue:预定义一个初始值 (0,0)* seqOp: (U, T) => U  分区内元素聚合,遍历元素传入基准值执行函数.(类似Map-Shuffle)*   U:当前基准值,T:当前元素*   执行的逻辑是 (基准值(默认初始值), 元素No.1) 执行seqOp ,结果再作为基准值,执行(基准值(上步结果),元素No.2),以此类推* combOp: (U, U) => U 分区间聚合,将各分区执行seqOp函数的结果再使用combOp聚合 (类似Reduce-Shuffle)*/val aggregateResult = rdd.aggregate((0,0))(seqOp=(sv,tv)=>(sv._1+tv,sv._2+1),combOp=(v1,v2)=>(v1._1+v2._1,v2._2+v2._2))println(aggregateResult)//输出结果 (55,10) (1-10的总和,1-10的个数) <=非RDD结果,并且类型必须是Int

      aggregateByKey

        与aggregate类似,但针对的是key分组,aggregateBykey是以key组为单位,对分组内元素遍历使用seqOp,再使用combOp聚合分组内       

        val rdd = sc.parallelize(Seq("a b c", "b c d"));rdd.flatMap(rec => rec.split(" ")).map(rec => (rec, 1)).aggregateByKey(0)(seqOp = (sv, tv) => (sv + tv),combOp = (v1, v2) => (v1 + v2)).collect().map(rec => print(s"${rec} |"))//输出结果 (d,1) |(b,2) |(a,1) |(c,2) |

      sortByKey

        将一个键值对RDD按key排序转换为另一个RDD

      join

        将两个键值对RDD((K, V),(K, W)),按Key合并为一个RDD(K, (V, W)) .(Spark同时还提供 leftOuterJoin,rightOuterJoin,fullOuterJoin)        

       val rdd = sc.parallelize(Seq("a b")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));val rdd2 = sc.parallelize(Seq("b c")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));rdd.join(rdd2).collect().map(rec => print(s"${rec} |"))//两个RDD交集 (b,(b,b))rdd.leftOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))//leftOuterJoin左边全数据,右边Opt (b,(b,Some(b))) |(a,(a,None)) |rdd.rightOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))//rightOuterJoin右边全数据,左边Opt (b,(Some(b),b)) |(c,(None,c)) |rdd.fullOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))//笛卡尔乘积,Opt (b,(Some(b),Some(b))) |(a,(Some(a),None)) |(c,(None,Some(c))) |

      cogroup

        将多个键值对RDD按Key合并在一起.合并为全数据(没有丢失)

        与fullOuterJoin区别在与多个RDD情况下,cogroup按key合并为一个,fullOuterJoin为多个的笛卡尔积

        注意,如果某个数据集少某一个key,合并时是在这个数据集的位置上占CompactBuffer()的位置,而不是直接跳过        

        val rdd = sc.parallelize(Seq("a b")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));val rdd2 = sc.parallelize(Seq("b c")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));rdd.cogroup(rdd2).collect().map(rec => print(s"${rec} |"))//(b,(CompactBuffer(b),CompactBuffer(b))) |(a,(CompactBuffer(a),CompactBuffer())) |(c,(CompactBuffer(),CompactBuffer(c))) |

      cartesian

        返回两个RDD的笛卡尔积结果

      pipe

        使用Shell的语法操作RDD

      coalesce

        重新调整RDD的分区后形成一个新的RDD.语法要求:numPartitions: Int, shuffle: Boolean = false.

          numPartitions表示要重新调整的分区数量,shuffle表示重新调整分区时是否允许发生shuffle过程.

        如果子分区数往下减少,则子分区数设置一定会成功.但要注意,在这种情况下会造成任务的并行度降低(分区数,任务数降了),任务内存更容易爆出(单个任务的数据增大了)

        如果子分区数往上增加,则子分区数设置必须要设置shuffle=true,才会成功,否则子分区依然等于父分区

        谨记:如果没有shuffle的参与,RDD只能减少分区(窄依赖),不能增加分区

      repartition      

        只是coalesce的shuffle等于true的快捷方式. coalesce(numPartitions, shuffle = true)

repartitionAndSortWithinPartitions

    1.3.2 Action 操作

      reduce

        RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素组成两个元素,再被传给输入函数,直到最后只有一个值为止

        与reduceByKey的区别是  reduceByKey是一个转换操作,返回的是RDD, reduce是一个action操作,返回的是数据结果                     

        val rdd = sc.parallelize(1 to 100,2);val value = rdd.reduce((v1,v2)=>v1+v2)println(value)//输出结果 5050

      collect

        将一个RDD的所有元素以数组的形式发回driver端.注意这个RDD必须是足够小的数据集,否则很容易将driver端的内存撑爆

      count

        返回一个RDD的元素的个数

      first

        返回一个RDD的第一个元素

      take(n)

        返回一个RDD的前N个元素

      takeSample(withReplacementnum, [seed])

        返回一个RDD的百分比抽样(withReplacement标识元素是否放回RDD以供多次使用)

           takeOrdered(n[ordering])

        返回一个RDD按照设定的排序规则后的前N个元素

      countByKey

        只支持键值对类型,返回一个RDD的按照Key分组后的每组计数

      saveAsTextFile(path)

        将一个RDD的全部元素写入一个文本方式的本地文件,HDFS或其它任何Hadoop支持的存储系统中.(每行等于每个元素调用toString()的结果)

      saveAsSequenceFile(path)

        将一个RDD的全部元素写入一个二进制方式的本地文件,HDFS或其它任何Hadoop支持的存储系统中.

        在Scala中,它还可以用于隐式转换为可写类型的类型(Spark包含对基本类型的转换,如Int、Double、String等)。

      saveAsObjectFile(path)

        将一个RDD的全部元素使用Java序列化以简单的格式编写数据集的元素(可以使用SparkContext.objectFile()加载这些元素)。

      foreach(func)

        在数据集的每个元素上运行函数func。这通常用于处理副作用,如更新累加器或与外部存储系统交互

        注意:不可以修改foreach()之外的累加器之外的变量,见前面集群下的变量与闭包一节

  1.4 Shuffle过程

    Spark的某些操作,会引起一个Shuffle过程.Shuffle是指不同节点上的不同分区数据整合重新分区分组的机制.

    所以Shuffle是一个代价很高的操作,因为它会导致executor和不同的机器节点之间进行数据复制.

    1.4.1 Shuffle简述

      以reduceByKey为例,将原始数据中key相同的记录聚合为一个组.这里挑战是原始数据很可能是存在不同分区不同机器的(参考MapReduce执行过程)

      Spark-Shuffle与MapReduce-Shuffle的区别

        MapReduce-Shuffle结果是分区有序,分区内再按Key排序

        Spark-Shuffle结果是分区有序,但分区内Key无序.

          要对Spark-Shuffle的分区内再排序,有以下方法:

           mapPartitions 在已有的每个分区上再使用.sort排序

           repartitionAndSortWithinPartitions  重建分区,并排序

           sortBy提前对RDD本身做一个全范围排序

    1.4.2 RDD中引起Shuffle的操作

       repartition类操作 例如:repartitioncoalesce

       _ByKey操作(除了counting相关操作)例如:groupByKeyreduceByKey

       join 例如:cogroupjoin

      1.4.3 Shuffle的性能影响

      Shuffle本身是同时高耗内存,高耗磁盘IO,高耗网络IO的昂贵操作.

        Spark会启动一系列的MapReduce(Hadoop MapReduce),产生大量的数据缓冲区与归并排序,大量的pill文件与归并Merge等等

转载于:https://www.cnblogs.com/NightPxy/p/9245707.html

[Spark]-RDD详解之变量操作相关推荐

  1. spark RDD详解及源码分析

    spark RDD详解及源码分析 @(SPARK)[spark] spark RDD详解及源码分析 一基础 一什么是RDD 二RDD的适用范围 三一些特性 四RDD的创建 1由一个已经存在的scala ...

  2. Spark RDD、DataFrame原理及操作详解

    RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...

  3. 2021年大数据Spark(十二):Spark Core的RDD详解

    目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...

  4. Spark核心编程系列(一)——RDD详解

    目录 Spark核心编程系列--RDD详解(一) RDD概念 RDD与IO之间的关系 RDD的核心属性 RDD执行原理 基础编程 RDD创建 RDD的并行度与分区 参考 Spark核心编程系列--RD ...

  5. spark 调度模块详解及源码分析

    spark 调度模块详解及源码分析 @(SPARK)[spark] spark 调度模块详解及源码分析 一概述 一三个主要的类 1class DAGScheduler 2trait TaskSched ...

  6. Spark分区详解!DT大数据梦工厂王家林老师亲自讲解!

    Spark分区详解!DT大数据梦工厂王家林老师亲自讲解! http://www.tudou.com/home/_79823675/playlist?qq-pf-to=pcqq.group 一.分片和分 ...

  7. python变量定义大全_详解python变量与数据类型

    这篇文章我们学习 Python 变量与数据类型 变量 变量来源于数学,是计算机语言中能储存计算结果或能表示值抽象概念,变量可以通过变量名访问.在 Python 中 变量命名规定,必须是大小写英文,数字 ...

  8. 详解JavaScript变量类型判断及domReady原理 写得很好

    原文:详解JavaScript变量类型判断及domReady原理 我们知道,在开发JavaScript时候,经常要判断JavaScript变量类型,此 JavaScript教程 详细介绍JS变量的判断 ...

  9. linux系统下grub.cfg详解和实例操作

    linux系统下grub.cfg详解和实例操作 简介 grub是引导操作系统的程序,它会根据自己的配置文件,去引导内核,当内核被加载到内存以后,内核会根据grub配置文件中的配置,找到根分区所使用的文 ...

最新文章

  1. 页面布局让footer居页面底部_网站各页面该如何布局关键词优化提升排名?
  2. c语言温度查表程序,温度计C语言程序.doc
  3. 《硝烟中的Scrum和XP》作者新作 《精益开发实战》
  4. 【转】WPF入门教程系列六——布局介绍与Canvas(一)
  5. MyBatis映射文件1(增删改、insert获取自增主键值)
  6. 坦克大战-C语言-详注版
  7. word2007里插入分节符
  8. python程序员面试自我介绍_程序员面试要准备哪些方面的内容?
  9. 2022华为机试真题 C++ 实现 【字符串子序列II】
  10. Origin2022安装教程
  11. MATLAB 中 simulink 里的 scope显示图像格式的设置
  12. 电容传感器FDC2214单通道应用寄存器计算
  13. 第二章:低功耗器件选型:负载开关 loadswitch
  14. 以太坊联盟链-多节点私链搭建手册
  15. 使用命令的脚本来停止或启动IIS中的某个站点
  16. js 生成二维码并下载
  17. 理解各种设计模式原则及区别丨浅谈Nginx中核心设计模式-责任链模式丨C++后端开发丨Linux服务器开发丨web服务器
  18. NFT游戏初创公司Mythical Games旗下游戏将与巴宝莉等进行合作
  19. IOS开发之常用第三方框架(完善中)
  20. cubase elements12中文免费版 详细安装流程

热门文章

  1. 晶振选型时的问题库(z)
  2. angular 使用data-bs-datepicker时的一个小问题及解决
  3. 参考文献_参考文献:
  4. stepinto stepout stepover的区别
  5. 单片机小白学步系列(二十) IO口原理
  6. 电脑入门学习最佳方法
  7. 在Fedora 20下创建桌面快捷方式:
  8. oracle装了客户端怎么登陆账号,分享Oracle 11G Client 客户端安装步骤(图文详解)...
  9. java正则表达式判断手机号_正则表达式学习之简单手机号和邮箱练习
  10. 什么是基金净值、单位净值、累计净值