关键字:Spark算子、Spark函数、Spark RDD行动Action、aggregate、fold、lookup

aggregate

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.mapPartitionsWithIndex{
(partIdx,iter) => {
var part_map = scala.collection.mutable.Map[String,List[Int]]()
while(iter.hasNext){
var part_name = "part_" + partIdx;
var elem = iter.next()
if(part_map.contains(part_name)) {
var elems = part_map(part_name)
elems ::= elem
part_map(part_name) = elems
} else {
part_map(part_name) = List[Int]{elem}
}
}
part_map.iterator}
}.collect
res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))

##第一个分区中包含5,4,3,2,1

##第二个分区中包含10,9,8,7,6

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.aggregate(1)({(x : Int,y : Int) => x + y}, {(a : Int,b : Int) => a + b})
res0: Int = 58

结果为什么是58,看下面的计算过程:

##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1

##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16

## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41

##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1

##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58

再比如:

scala> rdd1.aggregate(2)(
| {(x : Int,y : Int) => x + y},
| {(a : Int,b : Int) => a * b}
| )
res18: Int = 1428

##这次zeroValue=2

##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17

##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42

##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428

因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。

fold

def fold(zeroValue: T)(op: (T, T) ⇒ T): T

fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

scala> rdd1.fold(1)(
| (x,y) => x + y
| )
res19: Int = 58##结果同上面使用aggregate的第一个例子一样,即:
scala> rdd1.aggregate(1)(
| {(x,y) => x + y},
| {(a,b) => a + b}
| )
res20: Int = 58var rdd1 = sc.makeRDD(1 to 10,4)
rdd1.fold(3)(_+_)
res4: Int = 70[(3+1+2+3)+(3+4+5+6)+(3+7+8)+(3+9)]+3 = 55+(4+1)*3 = 75 ;4+1是4个分区计算4次,每次加3;最后多个分区结果合并需要再加一次3var rdd1 = sc.makeRDD(1 to 10,3)
rdd1.fold(5)(_+_)
res5: Int = 75 = 55+(3+1)*5

lookup

def lookup(key: K): Seq[V]

lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21scala> rdd1.lookup("A")
res0: Seq[Int] = WrappedArray(0, 2)scala> rdd1.lookup("B")
res1: Seq[Int] = WrappedArray(1, 2)
转自:http://lxw1234.com/archives/2015/07/394.htm

-----

reduce/fold/aggregate 三个方法操作都是对RDD进行的聚合操作。

foldByKey与aggregateByKey,fold与aggregate用法相近,作用相似!
foldByKey是aggregateByKey的简化,fold是aggregate的简化。

1、reduce()与fold()方法是对同种元素类型数据的RDD进行操作,即必须同构。其返回值返回一个同样类型的新元素。

val nums = Array(1,2,3,4,5,6,7,8,9)
val numsRdd = sc.parallelize(nums,3)
val reduce = numsRdd.reduce((a,b) => a+b)
reduce: Int = 45

2、fold()与reduce()类似,接收与reduce接收的函数签名相同的函数,另外再加上一个初始值作为第一次调用的结果。(例如,加法初始值应为0,乘法初始值应为1)

val rdd = sc.makeRDD(List("a","a","b","b"),4)
val res = rdd.fold("")(_+_) //结果不固定
res: String = baab
或者
res: String = abba

具体案例请参考: Spark算子[10]:foldByKey、fold 源码实例详解

3、aggregate() 方法可以对两个不同类型的元素进行聚合,即支持异构。
它先聚合每一个分区里的元素,然后将所有结果返回回来,再用一个给定的conbine方法以及给定的初始值zero value进行聚合。

Spark算子:RDD行动Action操作–aggregate、fold、lookup;reduce/fold/aggregate区别相关推荐

  1. spark学习-Spark算子Transformations和Action使用大全(Transformations章(一))

    spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...

  2. spark学习-Spark算子Transformations和Action使用大全(Action章)

    spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...

  3. spark学习-Spark算子Transformations和Action使用大全(Transformations章(二))

    spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...

  4. spark算子大全glom_2小时入门Spark之RDD编程

    公众号后台回复关键字:pyspark,获取本项目github地址. 本节将介绍RDD数据结构的常用函数.包括如下内容: 创建RDD 常用Action操作 常用Transformation操作 常用Pa ...

  5. 五-中, Spark 算子 吐血总结(转化+行动算子共三十七个)

    文章目录 五-中, Spark 算子吐血总结 5.1.4.3 RDD 转换算子(Transformation) 1. Value类型 1.1 `map` 1.2 `mapPartitions` 1.3 ...

  6. 理解Spark中RDD(Resilient Distributed Dataset)

    文章目录 1 RDD 基础 1.1 分区 1.2 不可变 1.3 并行执行 2 RDD 结构 2.1 SparkContext.SparkConf 2.2 Partitioner 2.3 Depend ...

  7. spark常用RDD算子 汇总(java和scala版本)

    github: https://github.com/zhaikaishun/spark_tutorial  spark RDD的算子挺多,有时候如何灵活的使用,该如何用一下子想不起来,这一段时间将s ...

  8. spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍

    参考文章:spark的RDD中的action(执行)和transformation(转换)两种操作中常见函数介绍 spark常见的RDD 1. 函数概览 2. 常见的Transformations 操 ...

  9. Spark的RDD行动算子

    目录 基本概念 算子介绍 1. reduce 2. collect 3. count 4. first 5. take 6. takeOrdered 案例实操1-6 7. aggregate 8. f ...

  10. Spark:RDD编程总结(概述、算子、分区、共享变量)

    目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...

最新文章

  1. Vbox在Linux 5上安装Oracle 11gR2 RAC
  2. ubuntu 用命令行设置chrome的proxy
  3. Dom Tree Node, Render Tree Node 结构
  4. 3月12日云栖精选夜读 | 安全多方计算新突破!阿里首次实现“公开可验证” 的安全方案...
  5. nacos 配置中心和注册中心依赖后报错,提示 org.apache.http.impl.client.HttpClientBuilder 这个类找不到
  6. 网吧管理十大漏洞嚗光
  7. kruskal 重构树(讲解 + 例题)
  8. 转!!ftp的主动模式(port)与被动模式(PASV)
  9. 数据库系统实训——实验七——触发器
  10. Express-static
  11. Acrobat Pro DC 教程,如何填写并签署 PDF 表格?
  12. matlab下载安装教程
  13. iperf详细使用方法
  14. mysql按月创建分表_MySQL之按月拆分主表并按月分表写入数据提高数据查询速度...
  15. 37 岁老码农找工作,现身说法...
  16. 找出直系亲属 牛客网
  17. png图片格式转换器_如何将图像转换为PNG格式
  18. slideup_jQuery slideUp,slideDown,slideToggle
  19. html水平线段hr标记详解,HTML标记【水平分隔线hr标记的使用】!
  20. 计算机组成原理学习之路(一)——数据表示

热门文章

  1. 《电子元器件的可靠性》——第3章可靠性试验
  2. 最新 | 诺奖得主涉嫌论文造假
  3. python代码编程教学无限循环_代码陷入无限循环
  4. DRAM发展年历——电容方向
  5. Python三维绘图——Matplotlib
  6. lbp2900打印机linux驱动下载,佳能LBP2900+驱动下载-佳能LBP2900+打印机驱动下载 V3.30官方版--pc6下载站...
  7. 编译原理第六七章总结
  8. android谷歌地图_Android Google地图:添加天气数据
  9. uc浏览器怎么播放html5,uc视频社区 手机UC浏览器不能看视频了怎么办?
  10. Facebook登陆错误Invalid Scopes