Spark Core Aggregator
Spark Core Aggregator
本文要介绍的是Spark Core中的Aggregator这个类。这个类的用处非常大,为什么这么说呢?我们都知道Spark支持传统的MapReduce模型,并基于这种模型提供了比Hadoop更多更高层次的计算接口。比如Spark Core PairRDD中非常常用的:
- reduceByKey 提供聚合函数,将k-v对集合将相同key值的value聚合,这个方法会先在map端执行减少shuffle量,然后在reduce端执行
- aggregateByKey 与reduceByKey类似,不过会将k-v对集合聚合变形为新的k-u类型的RDD。需要提供两个方法:seqOp[(U,V)=>U]在单个分区内将原始V类型的值merge到U类型的汇总值方法,以及combineOp[(U,U)=>]在不同分区间将聚合结果merge的方法。
- groupByKey 将原本的RDD,根据key值进行分组,返回RDD[K, Iterable[V]]结果。顺带说一嘴:这个方法不会做map端的combine操作(因为实际上数据结果并没有减少,反而因为需要插入到hash table中会增加老年代的内存压力)
- ……
而这几种方法底层都是依赖于combineByKeyWithClassTag(在Spark1.4中是combineByKey,新版本接口增加了ClassTag,支持原生类型)。这个函数的实现方式如以下代码所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
... // 省略参数检查
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
|
该函数依赖于六个参数,计算逻辑如图
- createCombiner: V => C 提供聚合过程的初始值的函数
- mergeValue: (C, V) => C 将V值merge到聚合值中的方法
- mergeCombiners: (C, C) => C 将聚合值进行聚合的方法
- partitioner: Partitioner 指定分区方法
- mapSideCombine: Boolean = true 告诉ShuffleRDD(其实是告诉它依赖的ShuffleDependency)是否需要做Map端的combine 过程
- serializer: Serializer = null 序列化,一般不用指定
如图可以看出来,由于Spark懒式计算的原则,现在只是生成了MapPartitionsRDD 或者 ShuffledRDD,当遇到类似collect / count 等操作的时候这些RDD就会依赖DAGScheduler 的调度,递归的先执行依赖,然后一步步执行完成。当然,我们的主角Aggregator 在计算Shuffled Dependency的时候就会完全发挥出作用来了。具体它是怎么做的呢?
我们都知道Shuffle RDD 的行为是新创建一个stage,然后顺次执行它依赖的stage,并且读取最终执行完的结果。而从上面我们可以看到,我们将Aggregator 设置到了Shuffle Dependency 中了,所以我们猜测这个聚合操作就应该在读取执行结果的前后过程中。果然,在HashShuffleWriter 和SortShuffleWriter(后者用的更多些)的write方法中,在BlockStoreShuffleReader 这个类的read 方法中,我们找到了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// SortShuffleWriter 的write方法
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
// 需要map端做聚合,则先做聚合,然后将结果迭代器传给外部排序器,完成聚合后排序
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// 无聚合逻辑
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
... // 省略
}
// HashShuffleWriter 的write方法
override def write(records: Iterator[Product2[K, V]]): Unit = {
val iter = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
// map端的combine逻辑
dep.aggregator.get.combineValuesByKey(records, context)
} else {
records
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
records
}
for (elem <- iter) {
val bucketId = dep.partitioner.getPartition(elem._1)
shuffle.writers(bucketId).write(elem._1, elem._2)
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val blockFetcherItr = new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
... //省略无关逻辑
// 如果结果需要进行聚合
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
// 将map端combine的结果再次进行聚合
val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else {
// map端并没有进行聚合
// 直接根据Value做聚合操作,这里V的类型是Nothing,代表此处并不关心原始的RDD
// 的Value类型是什么,直接传入combineValuesByKey里面,进行聚合,返回最后我
// 们需要的[K, C]类型迭代器即可
val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
}
...
// 省略了判断结果是否需要排序
// 如果结果需要排序的话 则将聚合的结果迭代器传递给排序器,使用外部排序收集结果,与SortShuffleWriter类似
// 如果不排序直接返回聚合结果
|
在这里我们看到了,通过调用Aggregator的以下两个方法就完成了Shuffle过程中map端和reduce端的聚合操作:
- def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext):Iterator[(K, C)] 直接对依赖RDD 的值进行聚合
- def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext):Iterator[(K, C)] 对依赖RDD 产出的聚合结果进行再次聚合
这两个函数的逻辑非常相似,我只把第一个函数贴出来分析里面进行的操作
1
2
3
4
5
6
7
|
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = {
// 使用了ExternalAppendOnlyMap 进行聚合操作
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
combiners.insertAll(iter)
updateMetrics(context, combiners)
combiners.iterator
}
|
我们看到,这里Spark使用了一个ExternalAppendOnlyMap 这样的Map 结构,将所有的数据insertAll 到这个结构里面来,结果调用iterator 方法就可以获得最终结果了。我们可以理解,如果是一个Map 的话,我们可以将所有的key 作为map 的key,然后迭代取上一轮的结果,判断key 如果存在于map 中的话,将value 取出,跟新的数据做聚合,然后写回map中即可。原来Aggregator 的实现也很简单嘛,跟我们单机版的并没有什么不同。但现在有一个最重要的问题:如果数据量太大,内存中存储不下了怎么办?
这种情况在大数据场景下几乎是必然发生的问题。别担心,ExternalAppendOnlyMap 这个名字中的External 就告诉我们它肯定是可以向磁盘进行Spill 的,它也是一个Spark Spill 到磁盘的机制很典型的例子。我们来看看它是如何优雅的处理这个问题的。
如左图,我们可以看出ExternalAppendOnlyMap 持续地读取数据,然后在SizeTrackingAppendOnlyMap (使用开放寻址和二次探测的方式实现的不可删除的HashMap,为什么要这么实现?留作一个问题,大家请自己思考。)中不断做聚合操作。如果数据一旦超出规定的阈值,就将currentMap按照hash 值排序后spill 到磁盘上(按照hash 排序很重要!!!),然后创建一个新的map继续重复这样的操作。
但大家就会问了,这样子的话,一个key 可能会存在在内存中、多个DiskIterator 中,那其实并没有完成真正的数据聚合啊?这时候,当insertAll 完成之后,我们将会调用iterator 方法,在这里是真正完成聚合的关键所在(右图所示)。iterator 返回了一个基于内存中currentMap 和DiskIterator 两部分数据的多路归并迭代器。这个迭代器,每次在调用next 方法的时候都会在内部的优先级队列(按每个迭代器最小hash值作为比较对象的堆结构),寻找最小的hash值且key值相等的所有元素(因为我们每个map 都是排序过的,所以这总能实现),进行merge,将所有符合要求的元素merge完成后返回。这样便完成了最终的聚合操作。
所以我们总结下:
- groupByKey/ reduceByKey/ aggregateByKey/ foldByKey 等都是用Aggregator 实现的。其中groupByKey 没有做Map 端的combine,且分组操作比较重,如果只是要做聚合操作,那建议用后三种操作。
- Aggregator 作用的位置是ShuffleWriter 和ShuffleReader 的write 和read 过程,分别完成map 端的combine 和 reduce端的聚合。
- Aggregator 内部实现考虑到了内存不足,进行磁盘spill 的场景,它采用多个基于开放寻址的不可删除SizeTrackingAppendOnlyMap 进行聚合,然后内存超过阈值是进行spill,最后迭代器中多路归并完成聚合操作。
对我们的优化的意义:
- 不要进行groupByKey.map(_._2.size) 类似这样的操作来统计每个key的count数,因为groupByKey 操作非常重,这种情况用其它聚合方式
- Aggregator 使用的还是基于近于java的HashMap的方式进行内存中的聚合的,这个方式是比较消耗内存的,所以在这种过程中很容易发生多次磁盘Spill,容易在老年代生成很多对象,容易发生GC,导致性能问题。所以在这种情况下就要求
- 合理进行分区,要对自己数据进行更多的测试,分区数量要足够,否则很容易出现性能问题
- 如果Shuffle数据量太大的话,建议不要使用这种方式,可以使用repartitionAndSortWithinPartition这种函数做特异性优化。
Spark Core Aggregator相关推荐
- sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)
本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...
- Spark源码和调优简介 Spark Core
作者:calvinrzluo,腾讯 IEG 后台开发工程师 本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了 ...
- 2021年大数据Spark(十八):Spark Core的RDD Checkpoint
目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...
- SparkSQL 与 Spark Core的关系
不多说,直接上干货! SparkSQL 与 Spark Core的关系 Spark SQL构建在Spark Core之上,专门用来处理结构化数据(不仅仅是SQL). Spark SQL在Spark C ...
- rdd数据存内存 数据量_spark系列:spark core 数据交互技术点(数据模型)
spark core实现了spark的基本功能:存储交互.任务调度.内存管理.错误恢复等:本片文章主要介绍与数据交互相关的核心技术点. 本文目录: RDD特性及交互 shuffle操作及调优 RDD持 ...
- 分布式实时计算—Spark—Spark Core
原文作者:bingoabin 原文地址:Spark Core 目录 一.Spark Core 1. 主要功能 2. Spark Core子框架 3. Spark架构 4. Spark计算模型 二.组件 ...
- Spark Core
Spark Core DAG概念 有向无环图 Spark会根据用户提交的计算逻辑中的RDD的转换(变换方法)和动作(action方法)来生成RDD之间的依赖关系 ...
- Spark Core项目实战(3) | 页面单跳转化率统计
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...
- Spark Core:Scala单词计数
Spark Core:Scala单词计数 文章目录 Spark Core:Scala单词计数 1.实验描述 2.实验环境 3.相关技能 4.知识点 5.实验效果 6.实验步骤 8.总结 1.实验描述 ...
最新文章
- Nginx+tomcat集群的session共享问题
- python搜索路径顺序_Python module之搜索路径
- Python实训day01pm【练习题、文件编写、列表的使用】
- bash脚本编程之一 条件判断及算术运算
- java自动转换_java类型转换详解(自动转换和强制转换)
- ceph常用命令-pool相关命令
- WordPress简约博客主题模板Chen主题V1.2
- mapreduce实现——腾讯大数据QQ共同好友推荐系统【你可能认识的人】
- 配置xml文件来实现FlightGear通信,接收与发送数据
- 园区内智慧出行标准化白皮书(2022年)
- 什么是CDN及CDN加速原理
- 终于在linux上用wine装上qq了
- Android产品研发(十六)--开发者选项
- 电商产品设计:后台订单管理设计
- 直播APP开发需要具备哪些功能?
- 学习CSS的background属性及其取值(实践)
- HTML标签、认识浏览器
- 正则表达式判断是否符合USD格
- Factor_mimicking_portfolio(模仿因子的投资组合):EAP.fama_macbeth.Factor_mimicking_portfolio
- 服务器金蝶上机日志在哪个文件夹,金蝶软件如何查看某用户的上机日志
热门文章
- SparkMllib介绍
- Uncaught TypeError: url.indexOf is not a function
- 2017驾驶证替人销分新处罚!
- #define中的“\”作用
- 【oracle】oracle按照某字段指定顺序排序
- GDAL编译报错ogr_sfcgal.h:34:34:fatal error:SFCGAL/capi/sfcgal_c/h:No such file or directory
- 关于UPR的使用及各Profiler参数
- Python图书管理系统(课设)
- videojs 加入关闭按钮
- “getfacl: Removing leading '/' from absolute path names”解决办法