Spark算子:RDD行动Action操作–aggregate、fold、lookup;reduce/fold/aggregate区别
关键字:Spark算子、Spark函数、Spark RDD行动Action、aggregate、fold、lookup
aggregate
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U
var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.mapPartitionsWithIndex{
(partIdx,iter) => {
var part_map = scala.collection.mutable.Map[String,List[Int]]()
while(iter.hasNext){
var part_name = "part_" + partIdx;
var elem = iter.next()
if(part_map.contains(part_name)) {
var elems = part_map(part_name)
elems ::= elem
part_map(part_name) = elems
} else {
part_map(part_name) = List[Int]{elem}
}
}
part_map.iterator}
}.collect
res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))
##第一个分区中包含5,4,3,2,1
##第二个分区中包含10,9,8,7,6
var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.aggregate(1)({(x : Int,y : Int) => x + y}, {(a : Int,b : Int) => a + b})
res0: Int = 58
结果为什么是58,看下面的计算过程:
##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1
##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16
## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41
##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1
##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58
再比如:
scala> rdd1.aggregate(2)(
| {(x : Int,y : Int) => x + y},
| {(a : Int,b : Int) => a * b}
| )
res18: Int = 1428
##这次zeroValue=2
##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428
因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。
fold
def fold(zeroValue: T)(op: (T, T) ⇒ T): T
fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。
scala> rdd1.fold(1)(
| (x,y) => x + y
| )
res19: Int = 58##结果同上面使用aggregate的第一个例子一样,即:
scala> rdd1.aggregate(1)(
| {(x,y) => x + y},
| {(a,b) => a + b}
| )
res20: Int = 58var rdd1 = sc.makeRDD(1 to 10,4)
rdd1.fold(3)(_+_)
res4: Int = 70[(3+1+2+3)+(3+4+5+6)+(3+7+8)+(3+9)]+3 = 55+(4+1)*3 = 75 ;4+1是4个分区计算4次,每次加3;最后多个分区结果合并需要再加一次3var rdd1 = sc.makeRDD(1 to 10,3)
rdd1.fold(5)(_+_)
res5: Int = 75 = 55+(3+1)*5
lookup
def lookup(key: K): Seq[V]
lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21scala> rdd1.lookup("A")
res0: Seq[Int] = WrappedArray(0, 2)scala> rdd1.lookup("B")
res1: Seq[Int] = WrappedArray(1, 2)
转自:http://lxw1234.com/archives/2015/07/394.htm
-----
reduce/fold/aggregate 三个方法操作都是对RDD进行的聚合操作。
foldByKey与aggregateByKey,fold与aggregate用法相近,作用相似!
foldByKey是aggregateByKey的简化,fold是aggregate的简化。
1、reduce()与fold()方法是对同种元素类型数据的RDD进行操作,即必须同构。其返回值返回一个同样类型的新元素。
val nums = Array(1,2,3,4,5,6,7,8,9)
val numsRdd = sc.parallelize(nums,3)
val reduce = numsRdd.reduce((a,b) => a+b)
reduce: Int = 45
2、fold()与reduce()类似,接收与reduce接收的函数签名相同的函数,另外再加上一个初始值作为第一次调用的结果。(例如,加法初始值应为0,乘法初始值应为1)
val rdd = sc.makeRDD(List("a","a","b","b"),4)
val res = rdd.fold("")(_+_) //结果不固定
res: String = baab
或者
res: String = abba
具体案例请参考: Spark算子[10]:foldByKey、fold 源码实例详解
3、aggregate() 方法可以对两个不同类型的元素进行聚合,即支持异构。
它先聚合每一个分区里的元素,然后将所有结果返回回来,再用一个给定的conbine方法以及给定的初始值zero value进行聚合。
Spark算子:RDD行动Action操作–aggregate、fold、lookup;reduce/fold/aggregate区别相关推荐
- spark学习-Spark算子Transformations和Action使用大全(Transformations章(一))
spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...
- spark学习-Spark算子Transformations和Action使用大全(Action章)
spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...
- spark学习-Spark算子Transformations和Action使用大全(Transformations章(二))
spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...
- spark算子大全glom_2小时入门Spark之RDD编程
公众号后台回复关键字:pyspark,获取本项目github地址. 本节将介绍RDD数据结构的常用函数.包括如下内容: 创建RDD 常用Action操作 常用Transformation操作 常用Pa ...
- 五-中, Spark 算子 吐血总结(转化+行动算子共三十七个)
文章目录 五-中, Spark 算子吐血总结 5.1.4.3 RDD 转换算子(Transformation) 1. Value类型 1.1 `map` 1.2 `mapPartitions` 1.3 ...
- 理解Spark中RDD(Resilient Distributed Dataset)
文章目录 1 RDD 基础 1.1 分区 1.2 不可变 1.3 并行执行 2 RDD 结构 2.1 SparkContext.SparkConf 2.2 Partitioner 2.3 Depend ...
- spark常用RDD算子 汇总(java和scala版本)
github: https://github.com/zhaikaishun/spark_tutorial spark RDD的算子挺多,有时候如何灵活的使用,该如何用一下子想不起来,这一段时间将s ...
- spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍
参考文章:spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍 spark常见的RDD 1. 函数概览 2. 常见的Transformations 操 ...
- Spark的RDD行动算子
目录 基本概念 算子介绍 1. reduce 2. collect 3. count 4. first 5. take 6. takeOrdered 案例实操1-6 7. aggregate 8. f ...
- Spark:RDD编程总结(概述、算子、分区、共享变量)
目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...
最新文章
- Vbox在Linux 5上安装Oracle 11gR2 RAC
- ubuntu 用命令行设置chrome的proxy
- Dom Tree Node, Render Tree Node 结构
- 3月12日云栖精选夜读 | 安全多方计算新突破!阿里首次实现“公开可验证” 的安全方案...
- nacos 配置中心和注册中心依赖后报错,提示 org.apache.http.impl.client.HttpClientBuilder 这个类找不到
- 网吧管理十大漏洞嚗光
- kruskal 重构树(讲解 + 例题)
- 转!!ftp的主动模式(port)与被动模式(PASV)
- 数据库系统实训——实验七——触发器
- Express-static
- Acrobat Pro DC 教程,如何填写并签署 PDF 表格?
- matlab下载安装教程
- iperf详细使用方法
- mysql按月创建分表_MySQL之按月拆分主表并按月分表写入数据提高数据查询速度...
- 37 岁老码农找工作,现身说法...
- 找出直系亲属 牛客网
- png图片格式转换器_如何将图像转换为PNG格式
- slideup_jQuery slideUp,slideDown,slideToggle
- html水平线段hr标记详解,HTML标记【水平分隔线hr标记的使用】!
- 计算机组成原理学习之路(一)——数据表示
热门文章
- 《电子元器件的可靠性》——第3章可靠性试验
- 最新 | 诺奖得主涉嫌论文造假
- python代码编程教学无限循环_代码陷入无限循环
- DRAM发展年历——电容方向
- Python三维绘图——Matplotlib
- lbp2900打印机linux驱动下载,佳能LBP2900+驱动下载-佳能LBP2900+打印机驱动下载 V3.30官方版--pc6下载站...
- 编译原理第六七章总结
- android谷歌地图_Android Google地图:添加天气数据
- uc浏览器怎么播放html5,uc视频社区 手机UC浏览器不能看视频了怎么办?
- Facebook登陆错误Invalid Scopes