Spark Core Aggregator

本文要介绍的是Spark Core中的Aggregator这个类。这个类的用处非常大,为什么这么说呢?我们都知道Spark支持传统的MapReduce模型,并基于这种模型提供了比Hadoop更多更高层次的计算接口。比如Spark Core PairRDD中非常常用的:

  • reduceByKey  提供聚合函数,将k-v对集合将相同key值的value聚合,这个方法会先在map端执行减少shuffle量,然后在reduce端执行
  • aggregateByKey 与reduceByKey类似,不过会将k-v对集合聚合变形为新的k-u类型的RDD。需要提供两个方法:seqOp[(U,V)=>U]在单个分区内将原始V类型的值merge到U类型的汇总值方法,以及combineOp[(U,U)=>]在不同分区间将聚合结果merge的方法。
  • groupByKey 将原本的RDD,根据key值进行分组,返回RDD[K, Iterable[V]]结果。顺带说一嘴:这个方法不会做map端的combine操作(因为实际上数据结果并没有减少,反而因为需要插入到hash table中会增加老年代的内存压力)
  • ……

而这几种方法底层都是依赖于combineByKeyWithClassTag(在Spark1.4中是combineByKey,新版本接口增加了ClassTag,支持原生类型)。这个函数的实现方式如以下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    ... // 省略参数检查
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

该函数依赖于六个参数,计算逻辑如图

  1. createCombiner: V => C 提供聚合过程的初始值的函数
  2. mergeValue: (C, V) => C 将V值merge到聚合值中的方法
  3. mergeCombiners: (C, C) => C 将聚合值进行聚合的方法
  4. partitioner: Partitioner 指定分区方法
  5. mapSideCombine: Boolean = true 告诉ShuffleRDD(其实是告诉它依赖的ShuffleDependency)是否需要做Map端的combine 过程
  6. serializer: Serializer = null 序列化,一般不用指定

如图可以看出来,由于Spark懒式计算的原则,现在只是生成了MapPartitionsRDD 或者 ShuffledRDD,当遇到类似collect / count 等操作的时候这些RDD就会依赖DAGScheduler 的调度,递归的先执行依赖,然后一步步执行完成。当然,我们的主角Aggregator 在计算Shuffled Dependency的时候就会完全发挥出作用来了。具体它是怎么做的呢?

我们都知道Shuffle RDD 的行为是新创建一个stage,然后顺次执行它依赖的stage,并且读取最终执行完的结果。而从上面我们可以看到,我们将Aggregator 设置到了Shuffle Dependency 中了,所以我们猜测这个聚合操作就应该在读取执行结果的前后过程中。果然,在HashShuffleWriter 和SortShuffleWriter(后者用的更多些)的write方法中,在BlockStoreShuffleReader 这个类的read 方法中,我们找到了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

// SortShuffleWriter 的write方法  
override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if (dep.mapSideCombine) {
      // 需要map端做聚合,则先做聚合,然后将结果迭代器传给外部排序器,完成聚合后排序
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
     // 无聚合逻辑
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    }
    sorter.insertAll(records)
    ... // 省略
  }
// HashShuffleWriter 的write方法  
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    val iter = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        //  map端的combine逻辑
        dep.aggregator.get.combineValuesByKey(records, context)
      } else {
        records
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      records
    }
    for (elem <- iter) {
      val bucketId = dep.partitioner.getPartition(elem._1)
      shuffle.writers(bucketId).write(elem._1, elem._2)
    }
  }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

  /** Read the combined key-values for this reduce task */
  override def read(): Iterator[Product2[K, C]] = {
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      blockManager.shuffleClient,
      blockManager,
      mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
    ... //省略无关逻辑
    // 如果结果需要进行聚合
    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        // 将map端combine的结果再次进行聚合
        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
        dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
      } else {
        // map端并没有进行聚合
        // 直接根据Value做聚合操作,这里V的类型是Nothing,代表此处并不关心原始的RDD
        // 的Value类型是什么,直接传入combineValuesByKey里面,进行聚合,返回最后我
        // 们需要的[K, C]类型迭代器即可
        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
        dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
    }
    ...
    // 省略了判断结果是否需要排序
    // 如果结果需要排序的话 则将聚合的结果迭代器传递给排序器,使用外部排序收集结果,与SortShuffleWriter类似
    // 如果不排序直接返回聚合结果

在这里我们看到了,通过调用Aggregator的以下两个方法就完成了Shuffle过程中map端和reduce端的聚合操作:

  1. def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext):Iterator[(K, C)] 直接对依赖RDD 的值进行聚合
  2. def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext):Iterator[(K, C)] 对依赖RDD 产出的聚合结果进行再次聚合

这两个函数的逻辑非常相似,我只把第一个函数贴出来分析里面进行的操作

1
2
3
4
5
6
7

  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = {
    // 使用了ExternalAppendOnlyMap 进行聚合操作
    val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }

我们看到,这里Spark使用了一个ExternalAppendOnlyMap 这样的Map 结构,将所有的数据insertAll 到这个结构里面来,结果调用iterator 方法就可以获得最终结果了。我们可以理解,如果是一个Map 的话,我们可以将所有的key 作为map 的key,然后迭代取上一轮的结果,判断key 如果存在于map 中的话,将value 取出,跟新的数据做聚合,然后写回map中即可。原来Aggregator 的实现也很简单嘛,跟我们单机版的并没有什么不同。但现在有一个最重要的问题:如果数据量太大,内存中存储不下了怎么办?

这种情况在大数据场景下几乎是必然发生的问题。别担心,ExternalAppendOnlyMap 这个名字中的External 就告诉我们它肯定是可以向磁盘进行Spill 的,它也是一个Spark Spill 到磁盘的机制很典型的例子。我们来看看它是如何优雅的处理这个问题的。

如左图,我们可以看出ExternalAppendOnlyMap 持续地读取数据,然后在SizeTrackingAppendOnlyMap (使用开放寻址和二次探测的方式实现的不可删除的HashMap,为什么要这么实现?留作一个问题,大家请自己思考。)中不断做聚合操作。如果数据一旦超出规定的阈值,就将currentMap按照hash 值排序后spill 到磁盘上(按照hash 排序很重要!!!),然后创建一个新的map继续重复这样的操作。

但大家就会问了,这样子的话,一个key 可能会存在在内存中、多个DiskIterator 中,那其实并没有完成真正的数据聚合啊?这时候,当insertAll 完成之后,我们将会调用iterator 方法,在这里是真正完成聚合的关键所在(右图所示)。iterator 返回了一个基于内存中currentMap 和DiskIterator 两部分数据的多路归并迭代器。这个迭代器,每次在调用next 方法的时候都会在内部的优先级队列(按每个迭代器最小hash值作为比较对象的堆结构),寻找最小的hash值且key值相等的所有元素(因为我们每个map 都是排序过的,所以这总能实现),进行merge,将所有符合要求的元素merge完成后返回。这样便完成了最终的聚合操作。

所以我们总结下:

  1. groupByKey/ reduceByKey/ aggregateByKey/ foldByKey 等都是用Aggregator 实现的。其中groupByKey 没有做Map 端的combine,且分组操作比较重,如果只是要做聚合操作,那建议用后三种操作。
  2. Aggregator 作用的位置是ShuffleWriter 和ShuffleReader 的write 和read 过程,分别完成map 端的combine 和 reduce端的聚合。
  3. Aggregator 内部实现考虑到了内存不足,进行磁盘spill 的场景,它采用多个基于开放寻址的不可删除SizeTrackingAppendOnlyMap 进行聚合,然后内存超过阈值是进行spill,最后迭代器中多路归并完成聚合操作。

对我们的优化的意义:

  1. 不要进行groupByKey.map(_._2.size) 类似这样的操作来统计每个key的count数,因为groupByKey 操作非常重,这种情况用其它聚合方式
  2. Aggregator 使用的还是基于近于java的HashMap的方式进行内存中的聚合的,这个方式是比较消耗内存的,所以在这种过程中很容易发生多次磁盘Spill,容易在老年代生成很多对象,容易发生GC,导致性能问题。所以在这种情况下就要求
    1. 合理进行分区,要对自己数据进行更多的测试,分区数量要足够,否则很容易出现性能问题
    2. 如果Shuffle数据量太大的话,建议不要使用这种方式,可以使用repartitionAndSortWithinPartition这种函数做特异性优化。

Spark Core Aggregator相关推荐

  1. sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)

    本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...

  2. Spark源码和调优简介 Spark Core

    作者:calvinrzluo,腾讯 IEG 后台开发工程师 本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了 ...

  3. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

  4. SparkSQL 与 Spark Core的关系

    不多说,直接上干货! SparkSQL 与 Spark Core的关系 Spark SQL构建在Spark Core之上,专门用来处理结构化数据(不仅仅是SQL). Spark SQL在Spark C ...

  5. rdd数据存内存 数据量_spark系列:spark core 数据交互技术点(数据模型)

    spark core实现了spark的基本功能:存储交互.任务调度.内存管理.错误恢复等:本片文章主要介绍与数据交互相关的核心技术点. 本文目录: RDD特性及交互 shuffle操作及调优 RDD持 ...

  6. 分布式实时计算—Spark—Spark Core

    原文作者:bingoabin 原文地址:Spark Core 目录 一.Spark Core 1. 主要功能 2. Spark Core子框架 3. Spark架构 4. Spark计算模型 二.组件 ...

  7. Spark Core

    Spark Core     DAG概念         有向无环图         Spark会根据用户提交的计算逻辑中的RDD的转换(变换方法)和动作(action方法)来生成RDD之间的依赖关系 ...

  8. Spark Core项目实战(3) | 页面单跳转化率统计

      大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...

  9. Spark Core:Scala单词计数

    Spark Core:Scala单词计数 文章目录 Spark Core:Scala单词计数 1.实验描述 2.实验环境 3.相关技能 4.知识点 5.实验效果 6.实验步骤 8.总结 1.实验描述 ...

最新文章

  1. Nginx+tomcat集群的session共享问题
  2. python搜索路径顺序_Python module之搜索路径
  3. Python实训day01pm【练习题、文件编写、列表的使用】
  4. bash脚本编程之一 条件判断及算术运算
  5. java自动转换_java类型转换详解(自动转换和强制转换)
  6. ceph常用命令-pool相关命令
  7. WordPress简约博客主题模板Chen主题V1.2
  8. mapreduce实现——腾讯大数据QQ共同好友推荐系统【你可能认识的人】
  9. 配置xml文件来实现FlightGear通信,接收与发送数据
  10. 园区内智慧出行标准化白皮书(2022年)
  11. 什么是CDN及CDN加速原理
  12. 终于在linux上用wine装上qq了
  13. Android产品研发(十六)--开发者选项
  14. 电商产品设计:后台订单管理设计
  15. 直播APP开发需要具备哪些功能?
  16. 学习CSS的background属性及其取值(实践)
  17. HTML标签、认识浏览器
  18. 正则表达式判断是否符合USD格
  19. Factor_mimicking_portfolio(模仿因子的投资组合):EAP.fama_macbeth.Factor_mimicking_portfolio
  20. 服务器金蝶上机日志在哪个文件夹,金蝶软件如何查看某用户的上机日志

热门文章

  1. SparkMllib介绍
  2. Uncaught TypeError: url.indexOf is not a function
  3. 2017驾驶证替人销分新处罚!
  4. #define中的“\”作用
  5. 【oracle】oracle按照某字段指定顺序排序
  6. GDAL编译报错ogr_sfcgal.h:34:34:fatal error:SFCGAL/capi/sfcgal_c/h:No such file or directory
  7. 关于UPR的使用及各Profiler参数
  8. Python图书管理系统(课设)
  9. videojs 加入关闭按钮
  10. “getfacl: Removing leading '/' from absolute path names”解决办法