定义

定义可参考RDD的API

aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U’s, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
zeroValue
the initial value for the accumulated result of each partition for the seqOp operator, and also the initial value for the combine results from different partitions for the combOp operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)
seqOp
an operator used to accumulate results within a partition
combOp
an associative operator used to combine results from different partitions

实验1-熟悉使用

api讲的比较清楚了,该函数用来聚集每个分区的元素,并用合并函数和zeroValue来聚集分区结果。并给予我们两个函数,seqOp和CombOp

实验程序

打开spark-shell,我们执行实验1(当复制并粘贴以下代码实验时请将注释去掉

//该函数用来将每个分区的index展示出来
def myfunc[T](index:Int,iter:Iterator[T]):Iterator[(Int,T)]={
var res = List[(Int,T)]()
for(x<-iter)
res.::=(index,x)
res.iterator
}
val data = sc.parallelize(1 to 10,3)
data.mapPartitionsWithIndex(myfunc).collect
data.aggregate(0)((a,b)=>if(a>b) a else b ,_+_)

实验结果

结果分析

实验2-zeroValue

api讲解如下:zeroValue值为seqOp函数的初始值,同时也是combOp函数的初始值。

实验程序

打开spark-shell,我们执行实验2(当复制并粘贴以下代码实验时请将注释去掉

//seqOp函数
def seqOp(arg1:Int,arg2:Int):Int={
var res:Int=arg2
if(arg1>arg2)
res=arg1
println("seqOp:"+arg1+","+arg2+"=>"+res)
res
}
//combOp函数
def combOp(arg1:Int,arg2:Int):Int={
println("combOp:"+arg1+","+arg2+"=>"+(arg1+arg2))
arg1+arg2
}
//将每个分区index显示出来
def myfunc[T](index:Int,iter:Iterator[T]):Iterator[(Int,T)]={
var res = List[(Int,T)]()
for(x<-iter)
res.::=(index,x)
res.iterator
}
val data = sc.parallelize(1 to 10,3)
data.mapPartitionsWithIndex(myfunc).collect
data.aggregate(11)(seqOp,combOp)

实验结果

结果分析

当然,该实验的zeroValue取值比较极端,大家可换成5或者6试一试


参考博客:
[1]:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#aggregate
[2]:http://www.iteblog.com/archives/1268

RDD之aggregate相关推荐

  1. Spark RDD使用详解5--Action算子

    本质上在Actions算子中通过SparkContext执行提交作业的runJob操作,触发了RDD DAG的执行.  根据Action算子的输出空间将Action算子进行分类:无输出. HDFS. ...

  2. Spark 键值对RDD操作

    https://www.cnblogs.com/yongjian/p/6425772.html 概述 键值对RDD是Spark操作中最常用的RDD,它是很多程序的构成要素,因为他们提供了并行操作各个键 ...

  3. rdd转换成java数据结构_Spark RDD转换成其他数据结构

    在Spark推荐系统编程中,一般都是通过文件加载成RDD: //在这里默认 (userId, itemId, preference) val fields = sparkContext.textFil ...

  4. Spark:RDD编程总结(概述、算子、分区、共享变量)

    目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...

  5. Spark算子总结版

    Spark的算子的分类 从大方向来说,Spark 算子大致可以分为以下两类: 1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理. Transformat ...

  6. kali视频学习笔记

    DAY1 系统安装 1. 用u盘烧录KALI镜像,不含live开头,含amd64,4G 2. 用u盘启动安装图形界面,选简单中文-汉语,默认KFCE,全工具 3. 改密码,sudo passwd ro ...

  7. Spark的算子的分类

    从大方向来说Spark 算子大致可以分为以下两类: Transformation 变换/转换算子这种变换并不触发提交作业完成作业中间过程处理.Transformation 操作是延迟计算的也就是说从一 ...

  8. 由spark.sql.shuffle.partitions混洗分区浅谈下spark的分区

    背景 spark的分区无处不在,但是编程的时候又很少直接设置,本文想通过一个例子说明从spark读取数据到内存中后的分区数,然后经过shuffle操作后的分区数,最后再通过主动设置repartitio ...

  9. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

最新文章

  1. mac电脑 显示隐藏文件 取消显示隐藏文件
  2. 动态规划之两个字符串的最大子序列
  3. HP LasterJet 3050驱动安装技巧,面向win7,8,8.1,10
  4. Python多线程编程的一个掉进去不太容易爬出来的坑
  5. 小组文化——洗洗睡了的故事
  6. java编程两个超长正整数相减_【每日编程237期】数字分类
  7. java vector编程_java中Vector实现方法和功能还有例子详细讲解一下!谢谢!
  8. tensorflow pb ckpt pbtxt
  9. 曾仕强主讲:易经的奥秘(全文讲义)
  10. 相关性,互相关,自相关,相关系数
  11. PS魔棒工具的使用方法
  12. python处理千万级数据_python实现千万级+点云数据三维坐标球面坐标互转
  13. SAN海量存储解决方案
  14. Java 编写程序,求出几何形状(长方形、正方形、圆形)的周长和面积。
  15. 会议OA项目之我的会议(会议排座送审)
  16. 商城购物系统【用户登录注册,购物页面,购物车页面,订单页面】
  17. 庆祝EDA夺冠之余,我们来讨论讨论程序员一般想要new一个什么样的对象
  18. 【时间序列】时间序列曲线平滑+预测(LSTM)
  19. 单片机断电记忆方法C语言,怎样使单片机程序断电保留上次的数据?
  20. 怎么修改打印机服务器权限,打印机管理_怎样设置打印机管理权限

热门文章

  1. 虚拟化运维多云监控—云安
  2. CSS3 属性样式总结记录(图文)
  3. 计算机二级mysql_全国计算机二级MySQL试题(总)
  4. 崔希凡JavaWeb视频教程_day20jdbc分页-成品源码资料
  5. 攻防世界PWN之boi题解
  6. 计算机系统引导顺序各选项,双系统启动顺序怎么设置和修复 双系统启动顺序设置及修复方法【图文】...
  7. Java poi之word文本替换
  8. VMware Horizon 7的GPU方案【保姆版】
  9. 于学军理事长莅临2023玉米深加工产业展,行业翘楚齐聚
  10. luogu P1646 happiness