RDD 分区器

Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

Hash 分区:对于给定的 key,计算其 hashCode,并除以分区个数取余。

class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be
negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}

Range 分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the
default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found
$partitions.")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
...
}
def numPartitions: Int = rangeBounds.length + 1
private var binarySearch: ((Array[K], K) => Int) =
CollectionsUtils.makeBinarySearch[K]
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k,
rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1 }
if (partition > rangeBounds.length) {
partition = rangeBounds.length
} }
if (ascending) {
partition
} else {
rangeBounds.length - partition
} }
override def equals(other: Any): Boolean = other match {
...
}
override def hashCode(): Int = {
...
}
@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit =
Utils.tryOrIOException {
...
}
@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException
{
...
} }

12 Spark on RDD 分区器相关推荐

  1. Spark的RDD分区器

    RDD 分区器 基本介绍 Spark 目前支持Hash 分区.Range 分区和用户自定义分区.Hash 分区为当前的默认分区.分区器直接决定了RDD 中分区的个数.RDD 中每条数据经过Shuffl ...

  2. 【spark】RDD分区解析

    文章目录 前言 一.集合分区 1.设置分区 2. 数据分配源码解析 二.文件分区 1.设置分区 2.Hadoop 读取文件1.1概念 前言 RDD(Resilient Distributed Data ...

  3. spark RDD分区2GB限制(Size exceeds Integer.MAX_VALUE)

    最近使用spark处理较大的数据文件,遇到了分区2G限制的问题,spark日志会报如下的日志: WARN scheduler.TaskSetManager: Lost task 19.0 in sta ...

  4. Spark的RDD转换算子

    目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...

  5. RDD分区2GB限制

    本文目的 最近使用spark处理较大的数据时,遇到了分区2G限制的问题(ken).找到了解决方法,并且在网上收集了一些资料,记录在这里,作为备忘. 问题现象 遇到这个问题时,spark日志会报如下的日 ...

  6. Spark:RDD编程总结(概述、算子、分区、共享变量)

    目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...

  7. spark更改分区_spark RDD分区是否可以指定分区

    更多详细内容 数据分区: 在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能. mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件,它是不可避免的,但是网络传输 ...

  8. Spark分区器HashPartitioner和RangePartitioner代码详解

    转载: https://www.iteblog.com/archives/1522.html 在Spark中分区器直接决定了RDD中分区的个数:也决定了RDD中每条数据经过Shuffle过程属于哪个分 ...

  9. Spark自定义分区器

    spark目前支持两个分区器,分别是HashPartitioner和RangePartitioner. 均继承自Partitioner,有共同方法 - def numPartitions --分区器的 ...

最新文章

  1. verilog中如何拆分一个数
  2. Leetcode 之Count and Say(35)
  3. Prism安装、MVVM基础概念及一个简单的样例
  4. 企业网站 源码 服务邮箱:_企业网站建设对于服务器的选择至关重要
  5. 第 3 章 镜像 - 015 - 调试 Dockerfile
  6. 深入分析glibc内存释放时的死锁bug
  7. NUC980开发板Linux系统EC20模块 移植 串口 PPP拨号
  8. [高级软件调试方法] SoftProbe调试方法及实现
  9. c语言判断二叉树是不是二叉排序树_判断
  10. 用EEupdate修改Intel网卡类型
  11. 百度旋转验证码打码模块,集成鱼刺模块类
  12. 自签名证书和私有CA签名的证书的区别
  13. 量子计算与通讯的基本原理(量子纠缠)
  14. 美国大学工程计算机排名,想从事编程?2019年USNews美国大学计算机工程专业排名值得一看...
  15. Kruise Rollout:灵活可插拔的渐进式发布框架
  16. 出现这十种症状,说明你不适合干程序员
  17. teamviewer14 去商用途提示
  18. 编写训练一年级学生10以内减法的程序
  19. OpenCV实战之人脸美颜美型(四)——肤色检测
  20. 服务器不能复制粘贴文件的处理方式

热门文章

  1. Netsuite案例:达美乐比萨
  2. Armbain系统根分区空间不足处理
  3. C语言 立方体随鼠标转动,HTML5 盒子悬停动效 - 立方体沿鼠标方向翻滚
  4. 成长笔记2:用这15句话夸孩子,决定他们一生思维高度
  5. 谷歌金山词霸中自定义TTS语音的实现(告别难听的默认男声!)
  6. UbuntuKylin 安装教程
  7. Linux常用命令(Hadoop)
  8. [附源码]计算机毕业设计JAVA网上花店系统
  9. Java使用 Delayed 实现延迟任务
  10. int a; int* a; int** a; int (*a)[]; int (*a)(int)