转载:

https://www.iteblog.com/archives/1522.html

  在Spark中分区器直接决定了RDD中分区的个数;也决定了RDD中每条数据经过Shuffle过程属于哪个分区;也决定了Reduce的个数。这三点看起来是不同的方面的,但其深层的含义是一致的。

  我们需要注意的是,只有Key-Value类型的RDD才有分区的,非Key-Value类型的RDD分区的值是None的。

  在Spark中,存在两类分区函数:HashPartitioner和RangePartitioner,它们都是继承自Partitioner,主要提供了每个RDD有几个分区(numPartitions)以及对于给定的值返回一个分区ID(0~numPartitions-1),也就是决定这个值是属于那个分区的。

文章目录

  • 1 HashPartitioner分区
  • 2 RangePartitioner分区
  • 3 确认边界

HashPartitioner分区

  HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。实现如下:

class HashPartitioner(partitions: Int) extends Partitioner {

  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {

    case null => 0

    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)

  }

  override def equals(other: Any): Boolean = other match {

    case h: HashPartitioner =>

      h.numPartitions == numPartitions

    case _ =>

      false

  }

  override def hashCode: Int = numPartitions

}

RangePartitioner分区

  从HashPartitioner分区的实现原理我们可以看出,其结果可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据,这显然不是我们需要的。而RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。

  前面讨论过,RangePartitioner分区器的主要作用就是将一定范围内的数映射到某一个分区内,所以它的实现中分界的算法尤为重要。这个算法对应的函数是rangeBounds。这个函数主要经历了两个过程:以Spark 1.1版本为界,Spark 1.1版本社区对rangeBounds函数进行了一次重大的重构。

  因为在Spark 1.1版本之前,RangePartitioner分区对整个数据集进行了2次的扫描:一次是计算RDD中元素的个数;一次是进行采样。具体的代码如下:

// An array of upper bounds for the first (partitions - 1) partitions

private val rangeBounds: Array[K] = {

    if (partitions == 1) {

      Array()

    } else {

      val rddSize = rdd.count()

      val maxSampleSize = partitions * 20.0

      val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)

      val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted

      if (rddSample.length == 0) {

        Array()

      } else {

        val bounds = new Array[K](partitions - 1)

        for (i <- 0 until partitions - 1) {

          val index = (rddSample.length - 1) * (i + 1) / partitions

          bounds(i) = rddSample(index)

        }

        bounds

      }

    }

}

  注意看里面的rddSize的计算和rdd.sample的计算。所以如果你进行一次sortByKey操作就会对RDD扫描三次!而我们都知道,分区函数性能对整个Spark作业的性能是有直接的影响,而且影响很大,直接影响作业运行的总时间,所以社区不得不对RangePartitioner中的rangeBounds算法进行重构。

  在阅读新版本的RangePartitioner之前,建议先去了解一下Reservoir sampling(水塘抽样),因为其中的实现用到了Reservoir sampling算法进行采样。
采样总数

  在新的rangeBounds算法总,采样总数做了一个限制,也就是最大只采样1e6的样本(也就是1000000):

val sampleSize = math.min(20.0 * partitions, 1e6)

所以如果你的分区个数为5,则采样样本数量为100.0

父RDD中每个分区采样样本数

  按照我们的思路,正常情况下,父RDD每个分区需要采样的数据量应该是sampleSize/rdd.partitions.size,但是我们看代码的时候发现父RDD每个分区需要采样的数据量是正常数的3倍。

val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt

这是因为父RDD各分区中的数据量可能会出现倾斜的情况,乘于3的目的就是保证数据量小的分区能够采样到足够的数据,而对于数据量大的分区会进行第二次采样。

采样算法

  这个地方就是RangePartitioner分区的核心了,其内部使用的就是水塘抽样,而这个抽样特别适合那种总数很大而且未知,并无法将所有的数据全部存放到主内存中的情况。也就是我们不需要事先知道RDD中元素的个数(不需要调用rdd.count()了!)。其主要实现如下:

val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)

def sketch[K : ClassTag](

      rdd: RDD[K],

      sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {

    val shift = rdd.id

    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object

    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>

      val seed = byteswap32(idx ^ (shift << 16))

      val (sample, n) = SamplingUtils.reservoirSampleAndCount(

        iter, sampleSizePerPartition, seed)

      Iterator((idx, n, sample))

    }.collect()

    val numItems = sketched.map(_._2.toLong).sum

    (numItems, sketched)

}

def reservoirSampleAndCount[T: ClassTag](

      input: Iterator[T],

      k: Int,

      seed: Long = Random.nextLong())

    : (Array[T], Int) = {

    val reservoir = new Array[T](k)

    // Put the first k elements in the reservoir.

    var i = 0

    while (i < k && input.hasNext) {

      val item = input.next()

      reservoir(i) = item

      i += 1

    }

    // If we have consumed all the elements, return them. Otherwise do the replacement.

    if (i < k) {

      // If input size < k, trim the array to return only an array of input size.

      val trimReservoir = new Array[T](i)

      System.arraycopy(reservoir, 0, trimReservoir, 0, i)

      (trimReservoir, i)

    } else {

      // If input size > k, continue the sampling process.

      val rand = new XORShiftRandom(seed)

      while (input.hasNext) {

        val item = input.next()

        val replacementIndex = rand.nextInt(i)

        if (replacementIndex < k) {

          reservoir(replacementIndex) = item

        }

        i += 1

      }

      (reservoir, i)

}

}

  RangePartitioner.sketch的第一个参数是rdd.map(_._1),也就是把父RDD的key传进来,因为分区只需要对Key进行操作即可。该函数返回值是val (numItems, sketched) ,其中numItems相当于记录rdd元素的总数;而sketched的类型是Array[(Int, Int, Array[K])],记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据。

  sketch函数对父RDD中的每个分区进行采样,并记录下分区的ID和分区中数据总和。

  reservoirSampleAndCount函数就是典型的水塘抽样实现,唯一不同的是该算法还记录下i的值,这个就是该分区中元素的总和。

  我们之前讨论过,父RDD各分区中的数据量可能不均匀,在极端情况下,有些分区内的数据量会占有整个RDD的绝大多数的数据,如果按照水塘抽样进行采样,会导致该分区所采样的数据量不足,所以我们需要对该分区再一次进行采样,而这次采样使用的就是rdd的sample函数。实现如下:

val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)

val candidates = ArrayBuffer.empty[(K, Float)]

val imbalancedPartitions = mutable.Set.empty[Int]

sketched.foreach { case (idx, n, sample) =>

  if (fraction * n > sampleSizePerPartition) {

    imbalancedPartitions += idx

  } else {

    // The weight is 1 over the sampling probability.

    val weight = (n.toDouble / sample.size).toFloat

    for (key <- sample) {

      candidates += ((key, weight))

    }

  }

}

if (imbalancedPartitions.nonEmpty) {

  // Re-sample imbalanced partitions with the desired sampling probability.

  val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)

  val seed = byteswap32(-rdd.id - 1)

  val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()

  val weight = (1.0 / fraction).toFloat

  candidates ++= reSampled.map(x => (x, weight))

}

我们可以看到,重新采样的采样因子和Spark 1.1之前的采样因子一致。对于满足于fraction * n > sampleSizePerPartition条件的分区,我们对其再一次采样。所有采样完的数据全部存放在candidates 中。

确认边界

从上面的采样算法可以看出,对于不同的分区weight的值是不一样的,这个值对应的就是每个分区的采样间隔。

def determineBounds[K : Ordering : ClassTag](

    candidates: ArrayBuffer[(K, Float)],

    partitions: Int): Array[K] = {

  val ordering = implicitly[Ordering[K]]

  val ordered = candidates.sortBy(_._1)

  val numCandidates = ordered.size

  val sumWeights = ordered.map(_._2.toDouble).sum

  val step = sumWeights / partitions

  var cumWeight = 0.0

  var target = step

  val bounds = ArrayBuffer.empty[K]

  var i = 0

  var j = 0

  var previousBound = Option.empty[K]

  while ((i < numCandidates) && (j < partitions - 1)) {

    val (key, weight) = ordered(i)

    cumWeight += weight

    if (cumWeight > target) {

      // Skip duplicate values.

      if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {

        bounds += key

        target += step

        j += 1

        previousBound = Some(key)

      }

    }

    i += 1

  }

  bounds.toArray

}

这个函数最后返回的就是分区的划分边界。

注意,按照理想情况,选定的划分边界需要保证划分后的分区中数据量是均匀的,但是这个算法中如果将cumWeight > target修改成cumWeight >= target的时候会保证各分区之间数据量更加均衡。可以看这里https://issues.apache.org/jira/browse/SPARK-10184

定位分区ID

分区类的一个重要功能就是对给定的值计算其属于哪个分区。这个算法并没有太大的变化。

def getPartition(key: Any): Int = {

  val k = key.asInstanceOf[K]

  var partition = 0

  if (rangeBounds.length <= 128) {

    // If we have less than 128 partitions naive search

    while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {

      partition += 1

    }

  } else {

    // Determine which binary search method to use only once.

    partition = binarySearch(rangeBounds, k)

    // binarySearch either returns the match location or -[insertion point]-1

    if (partition < 0) {

      partition = -partition-1

    }

    if (partition > rangeBounds.length) {

      partition = rangeBounds.length

    }

  }

  if (ascending) {

    partition

  } else {

    rangeBounds.length - partition

  }

}

如果分区边界数组的大小小于或等于128的时候直接变量数组,否则采用二分查找法确定key属于某个分区。

Spark分区器HashPartitioner和RangePartitioner代码详解相关推荐

  1. 优酷播放器右侧的导航html,优酷新版播放器站外调用代码详解

    优酷现在更新的优酷播放器,增加了开灯关灯特效,蓝叶看着很漂亮,就花了点时间,把这个调用代码找了出来,现在就分享个需要的同学.优酷附带开灯关灯特效新版播放器的站外调用代码如下: 复制代码的时候,把代码复 ...

  2. WMV网页播放器代码详解

    WMV网页播放器代码详解 WMV是流媒体,用专门的代码播放,效果会更好一些. 这里只举WMV(MediaPlayer9.0及以后)的网页内嵌播放器代码.(默认0为否,-1或1为是) 程序代码: < ...

  3. html5代码转换为视频,HTML5中的视频代码详解

    摘要 腾兴网为您分享:HTML5中的视频代码详解,智学网,云闪付,易推广,小红书等软件知识,以及360win10,流量魔盒,fitbit,上港商城,安卓2.3.7,全民惠,五年级下册英语单词表图片,t ...

  4. yii mysql 事务处理_Yii2中事务的使用实例代码详解

    前言 一般我们做业务逻辑,都不会仅仅关联一个数据表,所以,会面临事务问题. 数据库事务(Database Transaction) ,是指作为单个逻辑工作单元执行的一系列操作,要么完全地执行,要么完全 ...

  5. python装饰器函数-Python函数装饰器常见使用方法实例详解

    本文实例讲述了Python函数装饰器常见使用方法.分享给大家供大家参考,具体如下: 一.装饰器 首先,我们要了解到什么是开放封闭式原则? 软件一旦上线后,对修改源代码是封闭的,对功能的扩张是开放的,所 ...

  6. sgd 参数 详解_代码笔记--PC-DARTS代码详解

    DARTS是可微分网络架构搜搜索,PC-DARTS是DARTS的拓展,通过部分通道连接的方法在网络搜索过程中减少计算时间的内存占用.接下来将会结合论文和开源代码来详细介绍PC-DARTS. 1 总体框 ...

  7. python装饰器详解-Python装饰器基础概念与用法详解

    本文实例讲述了Python装饰器基础概念与用法.分享给大家供大家参考,具体如下: 装饰器基础 前面快速介绍了装饰器的语法,在这里,我们将深入装饰器内部工作机制,更详细更系统地介绍装饰器的内容,并学习自 ...

  8. BilSTM 实体识别_NLP-入门实体命名识别(NER)+Bilstm-CRF模型原理Pytorch代码详解——最全攻略

    最近在系统地接触学习NER,但是发现这方面的小帖子还比较零散.所以我把学习的记录放出来给大家作参考,其中汇聚了很多其他博主的知识,在本文中也放出了他们的原链.希望能够以这篇文章为载体,帮助其他跟我一样 ...

  9. BilSTM 实体识别_NLP入门实体命名识别(NER)+BilstmCRF模型原理Pytorch代码详解——最全攻略...

    来自 | 知乎   作者 | seven链接 | https://zhuanlan.zhihu.com/p/79552594编辑 | 机器学习算法与自然语言处理公众号本文仅作学术分享,如有侵权,请联系 ...

最新文章

  1. R语言使用lm函数拟合多元线性回归模型、假定预测变量没有交互作用(Multiple linear regression)
  2. STM32最小系统电路
  3. js中innerText,innerHTML的用法
  4. Iterator迭代器原理
  5. 清华尹成python爬虫百度云_爬虫:利用python完成百度贴吧数据采集
  6. ps 2键盘代码 c语言,MicroBlaze中断编程——以PS/2键盘输入为例
  7. C语言实现ICMP协议,并进行PING测试
  8. oracle数字日期43841怎么转,安装0racle已崩溃,求助大佬
  9. 运行与windows命令(cmd)的说明与示例
  10. python画正方形并涂色_关于python使用cv画矩形并填充颜色同时填充文字
  11. 科立捷默认频率_科立捷电子产品技术参数.doc
  12. 51单片机学习笔记(7)——74HC138三八译码器
  13. 单片微机计算机原理与接口技术高峰,单片微机原理与接口技术(第2版)
  14. linux上打开tif格式图片,tif图片格式介绍及其打开方式
  15. 最新 9个免费建站空间 网站均免备案
  16. devicemapper介绍
  17. 云知声临门一脚不敢踹:科大讯飞指其数据造假,业绩持续增长存疑
  18. 云适配签约中远海运特运 构建移动BPM平台
  19. ACMjava杨辉三角形与二项式定理递推实现与组合实现
  20. 网站经常出现502错误怎么办

热门文章

  1. 《软件架构师的12项修炼》读书笔记-技术之天花板
  2. Ubuntu Linux中配置Mplayer万能播放器
  3. 利用jquery getJSON 调用ashx实现ajax调用
  4. Java String的intern
  5. 【Java深入研究】9、HashMap源码解析(jdk 1.8)
  6. Noip 2016 愤怒的小鸟 题解
  7. Java之强引用、 软引用、 弱引用、虚引用
  8. Artech的MVC4框架学习——第三章controller的激活
  9. Gartner市场分析报告显示2010年全球安全软件市场增长12%
  10. 联海网站开发-操作说明-会员系统及诵读之星