2019独角兽企业重金招聘Python工程师标准>>>

算法分类:转换(transformation)和执行(action)

查看算子使用demo

coalesce & repartition & partitionBy

reparation是coalesce的特殊情况 ,reparation会将coalesce中的shuffle参数设置为true,会使用HashPartitioner重新混洗分区,如果原有分区数据不均匀可以用reparation来重新混洗分区,使数据均匀分布,重新混洗过的分区和新的分区时宽依赖的关系

coalesce shuffle参数为false的情况 不会重新混洗分区,它是合并分区,比如把原来1000个分区合并成100个,父rdd和子rdd是窄依赖,

coalesce当shuffle参数设置为false时,如果设置的新partition数量大于之前的,则按照之前的分区数量重新分区。如果shuffle参数设置为true则效果和repartition一致。

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)
}

partitionBy需要指定分区函数和分区数量

var rdd2=rdd.partitionBy(new HashPartitioner(2))
range
// range函数是闭开区间[)
range(1,4,1)
//输出:1,2,3
// to 函数是闭闭区间[]
sc.makeRDD(1 to 5,2)
// 输出:1,2,3,4,5
zip & zipWithIndex & zipWithUniqueId

zip

1.如果两个RDD分区数不同,则抛出异常:Can’t zip RDDs with unequal numbers of partitions

2.如果两个RDD的元素个数不同,则抛出异常:Can only zip RDDs with same number of elements in each partition

zipPartitions

zipPartitions函数将多个RDD按照partition组合成为新的RDD。

该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求

var rdd1=sparkSession.range(1,4,1).rdd
var rdd2=sparkSession.range(4,7,1).rdd
var rdd3=sparkSession.range(7,10,1).rdd
// zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及每个partition的元素数量都相同,否则会抛出异常。
var rdd5=rdd1 zip rdd2 zip rdd3
/*** +-----+---+* |   _1| _2|* +-----+---+* |[1,4]|  7|* |[2,5]|  8|* |[3,6]|  9|* +-----+---+*/
// 该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
var rdd6=rdd1.zipWithIndex/*** +---+---+* | _1| _2|* +---+---+* |  1|  0|* |  2|  1|* |  3|  2|* +---+---+*/
var rdd7=sparkSession.range(1,10,2).rdd
// 该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
// 每个分区中第一个元素的唯一ID值为:该分区索引号
// 每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)
var rdd8=rdd7.zipWithUniqueId()/*** +---+---+* | _1| _2|* +---+---+* |  1|  0|* |  3|  2|* |  5|  1|* |  7|  3|* |  9|  5|* +---+---+*/
mapPartitionsWithIndex
  var rdd1 = sparkSession.sparkContext.makeRDD(Array((1, "A"), (2, "B"), (3, "C"), (4, "D")),2)// 函数作用同mapPartitions相同,不过提供了两个参数,第一个参数为分区的索引var rdd2 = rdd1.mapPartitionsWithIndex {(partIdx, iter) => {var part_map = scala.collection.mutable.Map[String, List[(Int, String)]]()while (iter.hasNext) {var part_name = "part_" + partIdxvar elem = iter.next()if (part_map.contains(part_name)) {var elems = part_map(part_name)elems ::= elempart_map(part_name)=elems} else {part_map(part_name) = List[(Int, String)] {elem}}}part_map.iterator}}.collect()/*** +------+--------------+* |    _1|            _2|* +------+--------------+* |part_0|[[2,B], [1,A]]|* |part_1|[[4,D], [3,C]]|* +------+--------------+*/
map & mapValues
var rdd1=sparkSession.sparkContext.makeRDD(Array((1, "A"), (2, "B"), (3, "C"), (4, "D")),2)
// 对[K,V]整体操作
var rdd3=rdd1.map(_+"_").foreach(println(_))
/*** (1,A)_* (3,C)_* (2,B)_* (4,D)_*/
var rdd2=rdd1.mapValues(_+"_")/*** +---+---+* | _1| _2|* +---+---+* |  1| A_|* |  2| B_|* |  3| C_|* |  4| D_|* +---+---+*/// 键值对转换rdd1.map(_.swap).foreach(println(_))/*** (C,3)* (D,4)* (A,1)* (B,2)*/// 使用map实现mapValues rdd1.map(x=>(x._1,x._2+"_")).foreach(println(_))/*** (1,A_)* (2,B_)* (3,C_)* (4,D_)*/
flodByKey
val rdd4=sparkSession.sparkContext.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
val rdd5=rdd4.foldByKey(2)(_+_).collect()/*** +---+---+* | _1| _2|* +---+---+* |  B|  5|* |  A|  4|* |  C|  3|* +---+---+*/
groupByKey & reduceByKey & aggregateByKey & flodByKey

reduceByKey现在map过程中先进行聚合,再到reduce端聚合,减少数据太大带来的压力,减小RPC过程中的传输压力。groupByKey是直接在reduce端进行聚合的,所以效率比reduceByKey低。

foldByKey和reduceByKey的功能是相似的,都是在map端先进行聚合,再到reduce聚合。不同的是flodByKey需要传入一个参数。该参数是计算的初始值。

groupByKey是对每个key进行合并操作,但只生成一个sequence,groupByKey本身不能自定义操作函数。spark只能先将所有的键值对都移动,这样的后果是集群节点之间的开销很大,导致传输延时,详情。

val words = Array("one", "two", "two", "three", "three", "three")
val wordsRDD = sparkSession.sparkContext.parallelize(words).map(word => (word, 1))
val wordsCountWithGroup = wordsRDD.groupByKey().map(w => (w._1, w._2.sum)).collect()
val wordsCountWithReduce = wordsRDD.reduceByKey(_ + _).collect()
val wordsCountWithAggregate=wordsRDD.aggregateByKey(0)((u:Int,v)=>u+v,_+_).foreach(println)// aggregate简写seqOp和comOp使用同一个函数
val wordsCountWithFlod=wordsRDD.flodByKey(0)(_+_)
val wordsCountWithCombe=wordsRDD.combineByKey((v: Int) => v,(c: Int, v: Int) => c+v,(c1: Int, c2: Int) => c1 + c2).collect
combineByKey

注意:

  1. 同一个partition才会走mergeValue
  2. 不同partition才会走mergeCombiners
/*** 参考:* https://www.jianshu.com/p/d7552ea4f882* https://cloud.tencent.com/developer/ask/98711* 该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。** def combineByKey[C](* createCombiner: V => C,* mergeValue: (C, V) => C,* mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {* combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)* }** 参数的含义如下:* createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C* mergeValue:在每个分区上执行;合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C,* mergeCombiners:将不同分区的结果合并;合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C* numPartitions:结果RDD分区数,默认保持原有的分区数* partitioner:分区函数,默认为HashPartitioner* mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true* serializer:序列化类,默认为null*/// 对各个科目求平均值val scores = sparkSession.sparkContext.makeRDD(List(("chinese", 88) , ("chinese", 90) , ("math", 60), ("math", 87)),2)var avgScoresRdd=scores.combineByKey((x:Int)=>(x,1),(c:(Int,Int),x:Int)=>(c._1+x,c._2+1),(c1:(Int,Int),c2:(Int,Int))=>(c1._1+c2._1,c1._2+c2._2))sparkSession.createDataFrame(avgScoresRdd).show()var avgScores=avgScoresRdd.map{ case (key, value) => (key, value._1 / value._2.toFloat) }//.map(x=>(x,(x._1/x._2))
cogroup & union

cogroup相当于SQL中的全外连接full outer join,返回左右RDD中的记录,关联不上的为空。可指定分区数和分区函数,返回的是key和每个RDD的迭代器

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
var rdd1 = sparkSession.sparkContext.makeRDD(Array(("A","1"),("B","2")),2)
var rdd2 = sparkSession.sparkContext.makeRDD(Array(("A","3"),("C","4")),2)
var rdd3 = sparkSession.sparkContext.makeRDD(Array(("A","5"),("C","6"),("D","8")),2)
rdd1.cogroup(rdd2,rdd3).collect().foreach(x=>println("("+x._1+","+x._2._1+","+x._2._2+x._2._3+")"))/*** output:* (B,CompactBuffer(2),CompactBuffer()CompactBuffer())* (D,CompactBuffer(),CompactBuffer()CompactBuffer(8))* (A,CompactBuffer(1),CompactBuffer(3)CompactBuffer(5))* (C,CompactBuffer(),CompactBuffer(4)CompactBuffer(6))*/
rdd1.union(rdd2).collect().foreach(x=>println("("+x._1+","+x._2)+")")
jion
// join相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果,join只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
// leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
// rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
// cogroup相当于SQL中的全外连接full outer join,返回左右RDD中的记录,关联不上的为空。
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

注意:

rdd1.leftOuterJoin(rdd2)和rdd2.rightOuterJoin(rdd1)的结果是相同的,但是输出格式是不一致的,不管是left jion还是right jion,输出结果都是先输出左边的rdd对应的列,再输出右边的RDD对象的列

union & intersection & subtract

subtractByKey和基本转换操作中的subtract类似,返回在主RDD中出现,并且不在otherRDD中出现的元素,可指定输出分区数量和分区函数。

transformation
  • map/mapValues/flatMap/mapPartitions/mapPartitionsWithIndex
  • filter
  • distinct:并局部无序而整体有序返回
action
  • rdd.foreach
  • rdd.first
  • rdd.take(10): 从第一个分区的第一行数据开始取,不排序
  • rdd.takeOrdered(10):与top函数类似,但是与top函数的排序方式相反
  • rdd.top(10):默认按照降序的方式取前10个元素,可自定义排序规则
  • rdd.sortBy(x=>x._2,true):按照RDD第二列进行升序排列(false为降序)
  • rdd.countByValue():countByValue()函数与tuple元组中的(k,v)中的v 没有关系,这点要搞清楚,countByValue是针对Rdd中的每一个元素对象。
  • rdd.aggregate(1)({(x:Int,y:Int)=>x+y},{(sum1:Int,sum2:Int)=>sum1+sum2})
  • rdd. fold(1)()(x:Int,y:Int)=>x+y): aggregate简写seqOp和comOp使用同一个函数
  • saveAsTextFile,saveAsObjectFile,saveAsSequenceFile
  • rdd.takeSample
sparkSql
object aggregatesFun extends Catalogs_Tutorial{import org.apache.spark.sql.functions._questionsDataFrame.filter("id > 400 and id< 450").filter("owner_userid is not null").join(dfTags,dfQuestions.col("id").equalTo(dfTags("id"))).groupBy(dfQuestions.col("owner_userid")).agg( avg("score"),max("answer_count"))
//    .sparkSession.conf.set("retainGroupColumns",false) // 结果是否展示分组字段.show()
}
+------------+----------+-----------------+
|owner_userid|avg(score)|max(answer_count)|
+------------+----------+-----------------+
|         268|      26.0|                1|
|         136|      57.6|                9|
|         123|      20.0|                3|
+------------+----------+-----------------+
统计函数
  • 基本统计函数:avg,mean,max,min,sum
  • 高级统计函数:皮尔逊相关性(corr),协方差(cov),频繁项(freqItems),交叉表(crosstabe),行列转换(透视(pivot)),抽样(sample)分层抽样(sampleBy),词频统计(countMinSketch),布隆过滤器
  • 显示对dataFrame的统计结果:describe,包含标准差(stddev)和avg,max,min,count
手写wordCount
object LocalWorldCount {def main(args: Array[String]): Unit = {val conf=new SparkConf()conf.setAppName("my first spark local App")conf.setMaster("local")val sc=new SparkContext(conf)val lines=sc.textFile("file:\\E:\\data\\worldCount.txt")val words=lines.flatMap(line=>line.split(" "))val pairs=words.map(word=>(word,1))val worldCount=pairs.reduceByKey(_+_)val sortedWordCount=worldCount.map(pair=>(pair._2,pair._1)).sortByKey(true).map(pair=>(pair._2,pair._1))sortedWordCount.collect.foreach(println)sc.stop()}
}
// 对应sql
lines.
算子选择

mapPartitions/reduceByKey/foreachPartition/

使用filter之后进行coalesce操作。

使用repartitionAndSortWithinPartitions替代repartition与sort类操作。

repartitionAndSortWithinPartitions是Spark官网推荐的一个算子。官方建议,如果是需要在repartition重分区之后还要进行排序,就可以直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。

转载于:https://my.oschina.net/freelili/blog/3037961

spark常用函数比较相关推荐

  1. spark 常用函数介绍(python)

    在开始之前,我先介绍一下,RDD是什么? RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD.从编程的角度来看,RDD可以简单看成是一个数组.和普通数组的区别是,RDD中的 ...

  2. Spark常用函数讲解之键值RDD转换

    摘要: RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集 RDD有两种操作算子:         Tran ...

  3. ajax的常用函数,AJAX 常用函数

    AJAX 常用函数 内容精选 换一换 数据探索允许用户在使用SQL脚本发起任务时,在脚本中引入函数,以实现特定的功能.其中函数包含自定义函数和内置函数两种:自定义函数指用户通过脚本(当前暂只支持Pyt ...

  4. php内置常用函数是哪些,PHP常用函数有哪些

    PHP常用函数有哪些 导语:PHP主要通过函数实现一些功能,那PHP常用函数有哪些呢?下面就由小编为大家介绍一下,欢迎大家阅读! 数学函数 1.abs(): 求绝对值 $abs = abs(-4.2) ...

  5. Lua基本语法-书写规范以及自带常用函数

    Lua基本语法-书写规范和常用函数 本文提供全流程,中文翻译. Chinar坚持将简单的生活方式,带给世人! (拥有更好的阅读体验 -- 高分辨率用户请根据需求调整网页缩放比例) 1 String O ...

  6. SQLserver 常用函数适用方法(转载)

    SQL Server 常用函数使用方法(持续更新) 之前就想要把一些 SQL 的常用函数记录下来,不过一直没有实行...嘿嘿... 直到今天用到substring()这个函数,C# 里面这个方法起始值 ...

  7. python Pool常用函数用法总结

    在本篇内容里小编给大家整理的是一篇关于python Pool常用函数用法总结内容,有需要的朋友们可以学习下. 1.说明 apply_async(func[,args[,kwds]):使用非堵塞调用fu ...

  8. ACM——常用函数总结

    常用函数总结: 一.全排列:next_permutation 二.读写优化 三.返回容器内最大最小值 四.复制函数 五.容器删除函数 六.容器填充函数 七.查找函数 八.字符串转换整数 九.欧拉筛 十 ...

  9. e class connect.php,剖析帝国CMS核心文件e/class/connect.php中的常用函数

    帝国CMS下的e/class/connect.php文件中包含了帝国CMS常用的一些核心功能函数,加深对connect.php文件中常用函数的了解对于帝国CMS的二次开发来说非常重要. 现将帝国CMS ...

最新文章

  1. K项目轶事之被客户通报批评
  2. 【机器学习入门】深入浅出聚类算法!如何对王者英雄聚类分析,探索英雄之间的秘密...
  3. RBAC模型:表设计分析
  4. php一点按钮就下载功能源码,php实现强制文件下载方法的源码参考
  5. 漫步ASP.NET MVC的处理管线
  6. javascript进行遍历
  7. flutter图片预览_flutter - 图片预览放大滑动(photo_view)
  8. ahjesus自定义隐式转换和显示转换
  9. ti嵌入式linux设计外包,基于嵌入式Linux的PMP系统设计与实现
  10. java的dataset怎么用,C# DataSet的基本用法
  11. 开发Awesomes系列合集
  12. 用ArrayList实现简单队列和栈
  13. 拖拽化、低代码、可视化布局学习资料搜集
  14. 如何用计算机弹黑人抬棺简谱,光遇黑人抬棺竖琴简谱 数字简谱简单弹奏教学...
  15. 「玩转Python」突破封锁继续爬取百万妹子图
  16. 一些有用的英语学习资料
  17. 分数四则运算 python
  18. win10可以上网但显示无法连接到Internet
  19. MySQL Binlog 解析工具 Maxwell 详解
  20. python opencv入门 光流法(41)

热门文章

  1. 1t硬盘怎么分区最好_网友问题解答:?最简单的方法教你电脑硬盘怎么分区?
  2. 搭建JMeter+Jenkins+Ant持续化
  3. java 第9章_Java基础第9章.ppt
  4. 计算机机房双电源供电,超高效数据机房可靠性浅析——走进腾讯青浦云计算中心...
  5. kettle 表输入 显示重复_表输入插件详解
  6. linux:uabntu日常操作
  7. 论文阅读——《Robust Superpixel Tracking》
  8. 【技术综述】万字长文详解Faster RCNN源代码
  9. 【赠书】如何构建企业级的推荐系统?这本书值得一看
  10. 【图像分割应用】医学图像分割(二)——心脏分割