项目github地址:bitcarmanlee easy-algorithm-interview-and-practice
欢迎大家star,留言,一起学习进步

aggregate算是spark中比较常用的一个函数,理解起来会比较费劲一些,现在通过几个详细的例子带大家来着重理解一下aggregate的用法。

1.先看看aggregate的函数签名

在spark的源码中,可以看到aggregate函数的签名如下:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

可以看出,这个函数是个柯里化的方法,输入参数分为了两部分:(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)

2.aggregate的用法

函数签名比较复杂,可能有的小伙伴看着就晕菜了。别捉急,我们再来看看函数前面的注释,关于此函数的用法我们就会比较清楚。

  /*** Aggregate the elements of each partition, and then the results for all the partitions, using* given combine functions and a neutral "zero value". This function can return a different result* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are* allowed to modify and return their first argument instead of creating a new U to avoid memory* allocation.** @param zeroValue the initial value for the accumulated result of each partition for the*                  `seqOp` operator, and also the initial value for the combine results from*                  different partitions for the `combOp` operator - this will typically be the*                  neutral element (e.g. `Nil` for list concatenation or `0` for summation)* @param seqOp an operator used to accumulate results within a partition* @param combOp an associative operator used to combine results from different partitions*/

翻译过来就是:aggregate先对每个分区的元素做聚集,然后对所有分区的结果做聚集,聚集过程中,使用的是给定的聚集函数以及初始值"zero value"。这个函数能返回一个与原始RDD不同的类型U,因此,需要一个合并RDD类型T到结果类型U的函数,还需要一个合并类型U的函数。这两个函数都可以修改和返回他们的第一个参数,而不是重新新建一个U类型的参数以避免重新分配内存。
参数zeroValue:seqOp运算符的每个分区的累积结果的初始值以及combOp运算符的不同分区的组合结果的初始值 - 这通常将是初始元素(例如“Nil”表的列表 连接或“0”表示求和)
参数seqOp: 每个分区累积结果的聚集函数。
参数combOp: 一个关联运算符用于组合不同分区的结果

3.求平均值

看来了上面的原理介绍,接下来我们看干货。
首先可以看网上最多的一个例子:

val list = List(1,2,3,4,5,6,7,8,9)
val (mul, sum, count) = sc.parallelize(list, 2).aggregate((1, 0, 0))((acc, number) => (acc._1 * number, acc._2 + number, acc._3 + 1),(x, y) => (x._1 * y._1, x._2 + y._2, x._3 + y._3))(sum / count, mul)

在常见的求均值的基础上稍作了变动,sum是求和,count是累积元素的个数,mul是求各元素的乘积。
解释一下具体过程:
1.初始值是(1, 0 ,0)
2.number是函数中的T,也就是List中的元素,此时类型为Int。而acc的类型为(Int, Int, Int)。acc._1 * num是各元素相乘(初始值为1),acc._2 + number为各元素相加。
3.sum / count为计算平均数。

4.另外的例子

为了加深理解,看另外一个的例子。

     val raw = List("a", "b", "d", "f", "g", "h", "o", "q", "x", "y")val (biggerthanf, lessthanf) = sc.parallelize(raw, 1).aggregate((0, 0))((cc, str) => {var biggerf = cc._1var lessf = cc._2if (str.compareTo("f") >= 0) biggerf = cc._1 + 1else if(str.compareTo("f") < 0) lessf = cc._2 + 1(biggerf, lessf)},(x, y) => (x._1 + y._1, x._2 + y._2))

这个例子中,我们想做的就是统计一下在raw这个list中,比"f"大与比"f"小的元素分别有多少个。代码本身的逻辑也比较简单,就不再更多解释。

5.aggregateByKey与combineByKey的比较

aggregate是针对序列的操作,aggregateByKey则是针对k,v对的操作。顾名思义,aggregateByKey则是针对key做aggregate操作。spark中函数的原型如下:

  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] = self.withScope {aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)}

都是针对k,v对的操作,spark中还有一个combineByKey的操作:

  def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)}

为了看清楚两个的联系,我们再看看 aggregateByKey里面的真正实现:

  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] = self.withScope {// Serialize the zero value to a byte array so that we can get a new clone of it on each keyval zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)val zeroArray = new Array[Byte](zeroBuffer.limit)zeroBuffer.get(zeroArray)lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))// We will clean the combiner closure later in `combineByKey`val cleanedSeqOp = self.context.clean(seqOp)combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),cleanedSeqOp, combOp, partitioner)}

从上面这段源码可以清晰看出,aggregateByKey调用的就是combineByKey方法。seqOp方法就是mergeValue,combOp方法则是mergeCombiners,cleanedSeqOp(createZero(), v)是createCombiner, 也就是传入的seqOp函数, 只不过其中一个值是传入的zeroValue而已!
因此, 当createCombiner和mergeValue函数的操作相同, aggregateByKey更为合适!

spark aggregate函数详解相关推荐

  1. Spark: sortBy和sortByKey函数详解

    在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外.在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数.sortBy是对标准的RDD进行排序,它是 ...

  2. Spark RDD 论文详解(七)讨论

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  3. Spark RDD 论文详解(三)Spark 编程接口

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  4. Spark RDD 论文详解(二)RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  5. Spark RDD 论文详解(四)表达 RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  6. Spark RDD 论文详解(五)实现

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  7. blankcount函数python,Python pandas常用函数详解

    本文研究的主要是pandas常用函数,具体介绍如下. 1 import语句 2 文件读取 df = pd.read_csv(path='file.csv') 参数:header=None 用默认列名, ...

  8. C语言网络编程:accept函数详解

    文章目录 前言 函数描述 代码实例 如何得到客户端的IP 和 端口号 前言 当使用tcp服务器使用socket创建通信文件描述符,bind绑定了文件描述符,服务器ip和端口号,listen将服务器端的 ...

  9. 【FFmpeg】函数详解(三)

    FFmpeg函数详解 14.av_write_frame 15.av_interleaved_write_frame 16.av_write_trailer 17.avio_close 18.av_i ...

  10. 【FFmpeg】函数详解(二)

    FFmpeg函数详解 9.av_dump_format 10.avio_open 11.avformat_write_header 12.avcodec_send_frame 13.avcodec_r ...

最新文章

  1. UI设计师收好!哪些会说故事的插画素材!
  2. 调用门、堆栈切换与调用过程返回
  3. Python求解进制问题(阿里巴巴2015笔试题)
  4. 女子监狱第一季/全集Orange Is the New Black迅雷下载
  5. android 脚本录制工具,安卓自动化脚本录制工具
  6. docx 2003 的补丁
  7. ipv4 pxe 联想start_PC开机出现Start pxe over ipv4解决办法 PC重启后显示start pxe over IPv4...
  8. 桌面误删文件恢复用什么软件?
  9. Not Adjacent Matrix、Same Differences、Arranging The Sheep
  10. c语言图书管理系统用什么软件,编写c语言的软件 纯C语言编写图书管理系统.doc...
  11. 日置HIOKI PW3198电能质量分析仪
  12. python_绘制玫瑰图_南丁格尔图
  13. 2019年10月19日星期六
  14. 蓝牙协议栈认证:什么是蓝牙Core Layer的BQB认证要求
  15. 6.4 深度负反馈放大电路放大倍数的分析
  16. 2020年日历电子版(打印版)_2020年日历表(竖版-A4纸打印版)
  17. 各大AI研究院共35场NLP算法岗面经奉上
  18. 【BZOJ3811】玛里苟斯(线性基)
  19. 网络研讨室_Excel问题,修复和网络研讨会
  20. 为什么总有iPhone游戏那么好玩呢? iSlash,Slice It

热门文章

  1. [数据结构]链表的实现在PHP中
  2. php excel数据导出
  3. Linux C编程与Shell编程在开发实用工具方面的相同点总结
  4. 用 CSS 隐藏页面元素
  5. virtualenv 安装及使用[转]
  6. internet与Internet的区别
  7. ArrayList非线程安全
  8. 《深入剖析Tomcat》一3.3 小结
  9. 0918类对象重载,作业5
  10. init进程 解析Android启动脚本init.rc 修改它使不启动android init.rc中启动一个sh文件...