spark常用函数比较
2019独角兽企业重金招聘Python工程师标准>>>
算法分类:转换(transformation)和执行(action)
查看算子使用demo
coalesce & repartition & partitionBy
reparation是coalesce的特殊情况 ,reparation会将coalesce中的shuffle参数设置为true,会使用HashPartitioner
重新混洗分区,如果原有分区数据不均匀可以用reparation来重新混洗分区,使数据均匀分布,重新混洗过的分区和新的分区时宽依赖的关系
coalesce shuffle参数为false的情况 不会重新混洗分区,它是合并分区,比如把原来1000个分区合并成100个,父rdd和子rdd是窄依赖,
coalesce当shuffle参数设置为false时,如果设置的新partition数量大于之前的,则按照之前的分区数量重新分区。如果shuffle参数设置为true则效果和repartition一致。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)
}
partitionBy需要指定分区函数和分区数量
var rdd2=rdd.partitionBy(new HashPartitioner(2))
range
// range函数是闭开区间[)
range(1,4,1)
//输出:1,2,3
// to 函数是闭闭区间[]
sc.makeRDD(1 to 5,2)
// 输出:1,2,3,4,5
zip & zipWithIndex & zipWithUniqueId
zip
1.如果两个RDD分区数不同,则抛出异常:Can’t zip RDDs with unequal numbers of partitions
2.如果两个RDD的元素个数不同,则抛出异常:Can only zip RDDs with same number of elements in each partition
zipPartitions
zipPartitions函数将多个RDD按照partition组合成为新的RDD。
该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。
var rdd1=sparkSession.range(1,4,1).rdd
var rdd2=sparkSession.range(4,7,1).rdd
var rdd3=sparkSession.range(7,10,1).rdd
// zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及每个partition的元素数量都相同,否则会抛出异常。
var rdd5=rdd1 zip rdd2 zip rdd3
/*** +-----+---+* | _1| _2|* +-----+---+* |[1,4]| 7|* |[2,5]| 8|* |[3,6]| 9|* +-----+---+*/
// 该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
var rdd6=rdd1.zipWithIndex/*** +---+---+* | _1| _2|* +---+---+* | 1| 0|* | 2| 1|* | 3| 2|* +---+---+*/
var rdd7=sparkSession.range(1,10,2).rdd
// 该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
// 每个分区中第一个元素的唯一ID值为:该分区索引号
// 每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)
var rdd8=rdd7.zipWithUniqueId()/*** +---+---+* | _1| _2|* +---+---+* | 1| 0|* | 3| 2|* | 5| 1|* | 7| 3|* | 9| 5|* +---+---+*/
mapPartitionsWithIndex
var rdd1 = sparkSession.sparkContext.makeRDD(Array((1, "A"), (2, "B"), (3, "C"), (4, "D")),2)// 函数作用同mapPartitions相同,不过提供了两个参数,第一个参数为分区的索引var rdd2 = rdd1.mapPartitionsWithIndex {(partIdx, iter) => {var part_map = scala.collection.mutable.Map[String, List[(Int, String)]]()while (iter.hasNext) {var part_name = "part_" + partIdxvar elem = iter.next()if (part_map.contains(part_name)) {var elems = part_map(part_name)elems ::= elempart_map(part_name)=elems} else {part_map(part_name) = List[(Int, String)] {elem}}}part_map.iterator}}.collect()/*** +------+--------------+* | _1| _2|* +------+--------------+* |part_0|[[2,B], [1,A]]|* |part_1|[[4,D], [3,C]]|* +------+--------------+*/
map & mapValues
var rdd1=sparkSession.sparkContext.makeRDD(Array((1, "A"), (2, "B"), (3, "C"), (4, "D")),2)
// 对[K,V]整体操作
var rdd3=rdd1.map(_+"_").foreach(println(_))
/*** (1,A)_* (3,C)_* (2,B)_* (4,D)_*/
var rdd2=rdd1.mapValues(_+"_")/*** +---+---+* | _1| _2|* +---+---+* | 1| A_|* | 2| B_|* | 3| C_|* | 4| D_|* +---+---+*/// 键值对转换rdd1.map(_.swap).foreach(println(_))/*** (C,3)* (D,4)* (A,1)* (B,2)*/// 使用map实现mapValues rdd1.map(x=>(x._1,x._2+"_")).foreach(println(_))/*** (1,A_)* (2,B_)* (3,C_)* (4,D_)*/
flodByKey
val rdd4=sparkSession.sparkContext.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
val rdd5=rdd4.foldByKey(2)(_+_).collect()/*** +---+---+* | _1| _2|* +---+---+* | B| 5|* | A| 4|* | C| 3|* +---+---+*/
groupByKey & reduceByKey & aggregateByKey & flodByKey
reduceByKey现在map过程中先进行聚合,再到reduce端聚合,减少数据太大带来的压力,减小RPC过程中的传输压力。groupByKey是直接在reduce端进行聚合的,所以效率比reduceByKey低。
foldByKey和reduceByKey的功能是相似的,都是在map端先进行聚合,再到reduce聚合。不同的是flodByKey需要传入一个参数。该参数是计算的初始值。
groupByKey是对每个key进行合并操作,但只生成一个sequence,groupByKey本身不能自定义操作函数。spark只能先将所有的键值对都移动,这样的后果是集群节点之间的开销很大,导致传输延时,详情。
val words = Array("one", "two", "two", "three", "three", "three")
val wordsRDD = sparkSession.sparkContext.parallelize(words).map(word => (word, 1))
val wordsCountWithGroup = wordsRDD.groupByKey().map(w => (w._1, w._2.sum)).collect()
val wordsCountWithReduce = wordsRDD.reduceByKey(_ + _).collect()
val wordsCountWithAggregate=wordsRDD.aggregateByKey(0)((u:Int,v)=>u+v,_+_).foreach(println)// aggregate简写seqOp和comOp使用同一个函数
val wordsCountWithFlod=wordsRDD.flodByKey(0)(_+_)
val wordsCountWithCombe=wordsRDD.combineByKey((v: Int) => v,(c: Int, v: Int) => c+v,(c1: Int, c2: Int) => c1 + c2).collect
combineByKey
注意:
- 同一个partition才会走mergeValue
- 不同partition才会走mergeCombiners
/*** 参考:* https://www.jianshu.com/p/d7552ea4f882* https://cloud.tencent.com/developer/ask/98711* 该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。** def combineByKey[C](* createCombiner: V => C,* mergeValue: (C, V) => C,* mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {* combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)* }** 参数的含义如下:* createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C* mergeValue:在每个分区上执行;合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C,* mergeCombiners:将不同分区的结果合并;合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C* numPartitions:结果RDD分区数,默认保持原有的分区数* partitioner:分区函数,默认为HashPartitioner* mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true* serializer:序列化类,默认为null*/// 对各个科目求平均值val scores = sparkSession.sparkContext.makeRDD(List(("chinese", 88) , ("chinese", 90) , ("math", 60), ("math", 87)),2)var avgScoresRdd=scores.combineByKey((x:Int)=>(x,1),(c:(Int,Int),x:Int)=>(c._1+x,c._2+1),(c1:(Int,Int),c2:(Int,Int))=>(c1._1+c2._1,c1._2+c2._2))sparkSession.createDataFrame(avgScoresRdd).show()var avgScores=avgScoresRdd.map{ case (key, value) => (key, value._1 / value._2.toFloat) }//.map(x=>(x,(x._1/x._2))
cogroup & union
cogroup相当于SQL中的全外连接full outer join,返回左右RDD中的记录,关联不上的为空。可指定分区数和分区函数,返回的是key和每个RDD的迭代器
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
var rdd1 = sparkSession.sparkContext.makeRDD(Array(("A","1"),("B","2")),2)
var rdd2 = sparkSession.sparkContext.makeRDD(Array(("A","3"),("C","4")),2)
var rdd3 = sparkSession.sparkContext.makeRDD(Array(("A","5"),("C","6"),("D","8")),2)
rdd1.cogroup(rdd2,rdd3).collect().foreach(x=>println("("+x._1+","+x._2._1+","+x._2._2+x._2._3+")"))/*** output:* (B,CompactBuffer(2),CompactBuffer()CompactBuffer())* (D,CompactBuffer(),CompactBuffer()CompactBuffer(8))* (A,CompactBuffer(1),CompactBuffer(3)CompactBuffer(5))* (C,CompactBuffer(),CompactBuffer(4)CompactBuffer(6))*/
rdd1.union(rdd2).collect().foreach(x=>println("("+x._1+","+x._2)+")")
jion
// join相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果,join只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
// leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
// rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
// cogroup相当于SQL中的全外连接full outer join,返回左右RDD中的记录,关联不上的为空。
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
注意:
rdd1.leftOuterJoin(rdd2)和rdd2.rightOuterJoin(rdd1)的结果是相同的,但是输出格式是不一致的,不管是left jion还是right jion,输出结果都是先输出左边的rdd对应的列,再输出右边的RDD对象的列
union & intersection & subtract
subtractByKey和基本转换操作中的subtract类似,返回在主RDD中出现,并且不在otherRDD中出现的元素,可指定输出分区数量和分区函数。
transformation
- map/mapValues/flatMap/mapPartitions/mapPartitionsWithIndex
- filter
- distinct:并局部无序而整体有序返回
action
rdd.foreach
rdd.first
rdd.take(10)
: 从第一个分区的第一行数据开始取,不排序rdd.takeOrdered(10)
:与top函数类似,但是与top函数的排序方式相反rdd.top(10)
:默认按照降序的方式取前10个元素,可自定义排序规则rdd.sortBy
(x=>x._2,true):按照RDD第二列进行升序排列(false为降序)rdd.countByValue():
countByValue()函数与tuple元组中的(k,v)中的v 没有关系,这点要搞清楚,countByValue是针对Rdd中的每一个元素对象。rdd.aggregate(1)({(x:Int,y:Int)=>x+y},{(sum1:Int,sum2:Int)=>sum1+sum2})
rdd. fold(1)()(x:Int,y:Int)=>x+y)
: aggregate简写seqOp和comOp使用同一个函数saveAsTextFile,saveAsObjectFile,saveAsSequenceFile
rdd.takeSample
sparkSql
object aggregatesFun extends Catalogs_Tutorial{import org.apache.spark.sql.functions._questionsDataFrame.filter("id > 400 and id< 450").filter("owner_userid is not null").join(dfTags,dfQuestions.col("id").equalTo(dfTags("id"))).groupBy(dfQuestions.col("owner_userid")).agg( avg("score"),max("answer_count"))
// .sparkSession.conf.set("retainGroupColumns",false) // 结果是否展示分组字段.show()
}
+------------+----------+-----------------+
|owner_userid|avg(score)|max(answer_count)|
+------------+----------+-----------------+
| 268| 26.0| 1|
| 136| 57.6| 9|
| 123| 20.0| 3|
+------------+----------+-----------------+
统计函数
- 基本统计函数:avg,mean,max,min,sum
- 高级统计函数:皮尔逊相关性(corr),协方差(cov),频繁项(freqItems),交叉表(crosstabe),行列转换(透视(pivot)),抽样(sample)分层抽样(sampleBy),词频统计(countMinSketch),布隆过滤器
- 显示对dataFrame的统计结果:describe,包含标准差(stddev)和avg,max,min,count
手写wordCount
object LocalWorldCount {def main(args: Array[String]): Unit = {val conf=new SparkConf()conf.setAppName("my first spark local App")conf.setMaster("local")val sc=new SparkContext(conf)val lines=sc.textFile("file:\\E:\\data\\worldCount.txt")val words=lines.flatMap(line=>line.split(" "))val pairs=words.map(word=>(word,1))val worldCount=pairs.reduceByKey(_+_)val sortedWordCount=worldCount.map(pair=>(pair._2,pair._1)).sortByKey(true).map(pair=>(pair._2,pair._1))sortedWordCount.collect.foreach(println)sc.stop()}
}
// 对应sql
lines.
算子选择
mapPartitions/reduceByKey/foreachPartition/
使用filter之后进行coalesce操作。
使用repartitionAndSortWithinPartitions替代repartition与sort类操作。
repartitionAndSortWithinPartitions是Spark官网推荐的一个算子。官方建议,如果是需要在repartition重分区之后还要进行排序,就可以直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。
转载于:https://my.oschina.net/freelili/blog/3037961
spark常用函数比较相关推荐
- spark 常用函数介绍(python)
在开始之前,我先介绍一下,RDD是什么? RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD.从编程的角度来看,RDD可以简单看成是一个数组.和普通数组的区别是,RDD中的 ...
- Spark常用函数讲解之键值RDD转换
摘要: RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集 RDD有两种操作算子: Tran ...
- ajax的常用函数,AJAX 常用函数
AJAX 常用函数 内容精选 换一换 数据探索允许用户在使用SQL脚本发起任务时,在脚本中引入函数,以实现特定的功能.其中函数包含自定义函数和内置函数两种:自定义函数指用户通过脚本(当前暂只支持Pyt ...
- php内置常用函数是哪些,PHP常用函数有哪些
PHP常用函数有哪些 导语:PHP主要通过函数实现一些功能,那PHP常用函数有哪些呢?下面就由小编为大家介绍一下,欢迎大家阅读! 数学函数 1.abs(): 求绝对值 $abs = abs(-4.2) ...
- Lua基本语法-书写规范以及自带常用函数
Lua基本语法-书写规范和常用函数 本文提供全流程,中文翻译. Chinar坚持将简单的生活方式,带给世人! (拥有更好的阅读体验 -- 高分辨率用户请根据需求调整网页缩放比例) 1 String O ...
- SQLserver 常用函数适用方法(转载)
SQL Server 常用函数使用方法(持续更新) 之前就想要把一些 SQL 的常用函数记录下来,不过一直没有实行...嘿嘿... 直到今天用到substring()这个函数,C# 里面这个方法起始值 ...
- python Pool常用函数用法总结
在本篇内容里小编给大家整理的是一篇关于python Pool常用函数用法总结内容,有需要的朋友们可以学习下. 1.说明 apply_async(func[,args[,kwds]):使用非堵塞调用fu ...
- ACM——常用函数总结
常用函数总结: 一.全排列:next_permutation 二.读写优化 三.返回容器内最大最小值 四.复制函数 五.容器删除函数 六.容器填充函数 七.查找函数 八.字符串转换整数 九.欧拉筛 十 ...
- e class connect.php,剖析帝国CMS核心文件e/class/connect.php中的常用函数
帝国CMS下的e/class/connect.php文件中包含了帝国CMS常用的一些核心功能函数,加深对connect.php文件中常用函数的了解对于帝国CMS的二次开发来说非常重要. 现将帝国CMS ...
最新文章
- K项目轶事之被客户通报批评
- 【机器学习入门】深入浅出聚类算法!如何对王者英雄聚类分析,探索英雄之间的秘密...
- RBAC模型:表设计分析
- php一点按钮就下载功能源码,php实现强制文件下载方法的源码参考
- 漫步ASP.NET MVC的处理管线
- javascript进行遍历
- flutter图片预览_flutter - 图片预览放大滑动(photo_view)
- ahjesus自定义隐式转换和显示转换
- ti嵌入式linux设计外包,基于嵌入式Linux的PMP系统设计与实现
- java的dataset怎么用,C# DataSet的基本用法
- 开发Awesomes系列合集
- 用ArrayList实现简单队列和栈
- 拖拽化、低代码、可视化布局学习资料搜集
- 如何用计算机弹黑人抬棺简谱,光遇黑人抬棺竖琴简谱 数字简谱简单弹奏教学...
- 「玩转Python」突破封锁继续爬取百万妹子图
- 一些有用的英语学习资料
- 分数四则运算 python
- win10可以上网但显示无法连接到Internet
- MySQL Binlog 解析工具 Maxwell 详解
- python opencv入门 光流法(41)
热门文章
- 1t硬盘怎么分区最好_网友问题解答:?最简单的方法教你电脑硬盘怎么分区?
- 搭建JMeter+Jenkins+Ant持续化
- java 第9章_Java基础第9章.ppt
- 计算机机房双电源供电,超高效数据机房可靠性浅析——走进腾讯青浦云计算中心...
- kettle 表输入 显示重复_表输入插件详解
- linux:uabntu日常操作
- 论文阅读——《Robust Superpixel Tracking》
- 【技术综述】万字长文详解Faster RCNN源代码
- 【赠书】如何构建企业级的推荐系统?这本书值得一看
- 【图像分割应用】医学图像分割(二)——心脏分割